diff --git a/sipsimple/core/_core.subscription.pxi b/sipsimple/core/_core.subscription.pxi index 8f37761e..7331d287 100644 --- a/sipsimple/core/_core.subscription.pxi +++ b/sipsimple/core/_core.subscription.pxi @@ -1,959 +1,962 @@ import re cdef class Subscription: expire_warning_time = 30 #public methods def __cinit__(self, *args, **kwargs): self.state = "NULL" pj_timer_entry_init(&self._timeout_timer, 0, self, _Subscription_cb_timer) self._timeout_timer_active = 0 pj_timer_entry_init(&self._refresh_timer, 1, self, _Subscription_cb_timer) self._refresh_timer_active = 0 self.extra_headers = frozenlist() self.peer_address = None self.call_id = None def __init__(self, SIPURI request_uri not None, FromHeader from_header not None, ToHeader to_header not None, ContactHeader contact_header not None, object event, RouteHeader route_header not None, Credentials credentials=None, int refresh=300): global _subs_cb cdef PJSTR from_header_str cdef PJSTR to_header_str cdef PJSTR contact_str cdef PJSTR request_uri_str cdef pj_str_t event_pj cdef pjsip_cred_info *cred_info cdef PJSIPUA ua = _get_ua() cdef int status if self._obj != NULL or self.state != "NULL": raise SIPCoreError("Subscription.__init__() was already called") if refresh <= 0: raise ValueError("refresh argument needs to be a non-negative integer") - if event not in ua._events.keys(): - raise ValueError('Unknown event "%s"' % event) + + supported_events = list(ua._events.keys()) + if event not in supported_events: + raise ValueError('Unknown event %s, supported events: %s' % (event.decode(), (e.decode() for e in supported_events))) + self.contact_header = FrozenContactHeader.new(contact_header) self.event = event self.route_header = FrozenRouteHeader.new(route_header) self.route_header.uri.parameters.dict["lr"] = None # always send lr parameter in Route header self.route_header.uri.parameters.dict["hide"] = None # always hide Route header if credentials is not None: self.credentials = FrozenCredentials.new(credentials) self.refresh = refresh from_header_parameters = from_header.parameters.copy() from_header_parameters.pop("tag", None) from_header.parameters = {} from_header_str = PJSTR(from_header.body.encode()) to_header_parameters = to_header.parameters.copy() to_header_parameters.pop("tag", None) to_header.parameters = {} to_header_str = PJSTR(to_header.body.encode()) contact_str = PJSTR(str(contact_header.body).encode()) request_uri_str = PJSTR(str(request_uri).encode()) _str_to_pj_str(self.event, &event_pj) with nogil: status = pjsip_dlg_create_uac(pjsip_ua_instance(), &from_header_str.pj_str, &contact_str.pj_str, &to_header_str.pj_str, &request_uri_str.pj_str, &self._dlg) if status != 0: raise PJSIPError("Could not create dialog for SUBSCRIBE", status) # Increment dialog session count so that it's never destroyed by PJSIP with nogil: status = pjsip_dlg_inc_session(self._dlg, &ua._module) if status != 0: raise PJSIPError("Could not increment dialog session count", status) self.call_id = _pj_str_to_str(self._dlg.call_id.id) if contact_header.expires is not None: self._dlg.local.contact.expires = contact_header.expires if contact_header.q is not None: self._dlg.local.contact.q1000 = int(contact_header.q*1000) contact_parameters = contact_header.parameters.copy() contact_parameters.pop("q", None) contact_parameters.pop("expires", None) _dict_to_pjsip_param(contact_parameters, &self._dlg.local.contact.other_param, self._dlg.pool) _dict_to_pjsip_param(from_header_parameters, &self._dlg.local.info.other_param, self._dlg.pool) _dict_to_pjsip_param(to_header_parameters, &self._dlg.remote.info.other_param, self._dlg.pool) self.from_header = FrozenFromHeader_create(self._dlg.local.info) self.to_header = FrozenToHeader.new(to_header) with nogil: status = pjsip_evsub_create_uac(self._dlg, &_subs_cb, &event_pj, PJSIP_EVSUB_NO_EVENT_ID, &self._obj) if status != 0: raise PJSIPError("Could not create SUBSCRIBE", status) pjsip_evsub_set_mod_data(self._obj, ua._event_module.id, self) _BaseRouteHeader_to_pjsip_route_hdr(self.route_header, &self._route_header, self._dlg.pool) pj_list_init( &self._route_set) pj_list_insert_after( &self._route_set, &self._route_header) with nogil: status = pjsip_dlg_set_route_set(self._dlg, &self._route_set) if status != 0: raise PJSIPError("Could not set route on SUBSCRIBE", status) if self.credentials is not None: cred_info = self.credentials.get_cred_info() with nogil: status = pjsip_auth_clt_set_credentials(&self._dlg.auth_sess, 1, cred_info) if status != 0: raise PJSIPError("Could not set credentials for SUBSCRIBE", status) def __dealloc__(self): cdef PJSIPUA ua = self._get_ua() if ua is not None: self._cancel_timers(ua, 1, 1) if self._obj != NULL: pjsip_evsub_set_mod_data(self._obj, ua._event_module.id, NULL) with nogil: pjsip_evsub_terminate(self._obj, 0) self._obj = NULL if self._dlg != NULL and ua is not None: with nogil: pjsip_dlg_dec_session(self._dlg, &ua._module) self._dlg = NULL def subscribe(self, list extra_headers not None=list(), object content_type=None, object body=None, object timeout=None): cdef object prev_state = self.state cdef PJSIPUA ua = self._get_ua() with nogil: pjsip_dlg_inc_lock(self._dlg) try: if self.state == "TERMINATED": raise SIPCoreError('This method may not be called in the "TERMINATED" state') if (content_type is not None and body is None) or (content_type is None and body is not None): raise ValueError("Both or none of content_type and body arguments need to be specified") if timeout is not None: if timeout <= 0: raise ValueError("Timeout value cannot be negative") self._subscribe_timeout.sec = int(timeout) self._subscribe_timeout.msec = (timeout * 1000) % 1000 else: self._subscribe_timeout.sec = 0 self._subscribe_timeout.msec = 0 if extra_headers is not None: self.extra_headers = frozenlist([header.frozen_type.new(header) for header in extra_headers]) self.content_type = content_type self.body = body self._send_subscribe(ua, self.refresh, &self._subscribe_timeout, self.extra_headers, content_type, body) self._cancel_timers(ua, 0, 1) if prev_state == "NULL": _add_event("SIPSubscriptionWillStart", dict(obj=self)) finally: with nogil: pjsip_dlg_dec_lock(self._dlg) def end(self, object timeout=None): cdef pj_time_val end_timeout cdef PJSIPUA ua = self._get_ua() with nogil: pjsip_dlg_inc_lock(self._dlg) try: if self.state == "TERMINATED": return if self.state == "NULL": raise SIPCoreError('This method may not be called in the "NULL" state') if timeout is not None: if timeout <= 0: raise ValueError("Timeout value cannot be negative") end_timeout.sec = int(timeout) end_timeout.msec = (timeout * 1000) % 1000 else: end_timeout.sec = 0 end_timeout.msec = 0 self._want_end = 1 self._cancel_timers(ua, 1, 1) _add_event("SIPSubscriptionWillEnd", dict(obj=self)) try: self._send_subscribe(ua, 0, &end_timeout, self.extra_headers, None, None) except PJSIPError, e: self._term_reason = e.args[0] if self._obj != NULL: with nogil: pjsip_evsub_terminate(self._obj, 1) finally: with nogil: pjsip_dlg_dec_lock(self._dlg) # private methods cdef PJSIPUA _get_ua(self): cdef PJSIPUA ua try: ua = _get_ua() except SIPCoreError: self._obj = NULL self._timeout_timer_active = 0 self._refresh_timer_active = 0 self.state = "TERMINATED" return None else: return ua cdef int _cancel_timers(self, PJSIPUA ua, int cancel_timeout, int cancel_refresh) except -1: if cancel_timeout and self._timeout_timer_active: pjsip_endpt_cancel_timer(ua._pjsip_endpoint._obj, &self._timeout_timer) self._timeout_timer_active = 0 if cancel_refresh and self._refresh_timer_active: pjsip_endpt_cancel_timer(ua._pjsip_endpoint._obj, &self._refresh_timer) self._refresh_timer_active = 0 cdef int _send_subscribe(self, PJSIPUA ua, int expires, pj_time_val *timeout, object extra_headers, object content_type, object body) except -1: cdef pjsip_tx_data *tdata cdef pj_str_t body_pj cdef object content_type_spl cdef PJSTR content_type_str cdef PJSTR content_subtype_str cdef int status if body is not None: content_type_spl = content_type.split("/") if len(content_type_spl) != 2: raise ValueError('Supplied content_type argument does not contain a "/" character') content_type_str = PJSTR(content_type_spl[0].encode()) content_subtype_str = PJSTR(content_type_spl[1].encode()) - _str_to_pj_str(body.encode(), &body_pj) + _str_to_pj_str(body, &body_pj) with nogil: status = pjsip_evsub_initiate(self._obj, NULL, expires, &tdata) if status != 0: raise PJSIPError("Could not create SUBSCRIBE message", status) _add_headers_to_tdata(tdata, extra_headers) if body is not None: tdata.msg.body = pjsip_msg_body_create(tdata.pool, &content_type_str.pj_str, &content_subtype_str.pj_str, &body_pj) with nogil: status = pjsip_evsub_send_request(self._obj, tdata) if status != 0: raise PJSIPError("Could not send SUBSCRIBE message", status) self._cancel_timers(ua, 1, 0) if timeout.sec or timeout.msec: status = pjsip_endpt_schedule_timer(ua._pjsip_endpoint._obj, &self._timeout_timer, timeout) if status == 0: self._timeout_timer_active = 1 self._expires = self.refresh # callback methods cdef int _cb_state(self, PJSIPUA ua, object state, int code, object reason, dict headers) except -1: # PJSIP holds the dialog lock when this callback is entered cdef object prev_state = self.state cdef int expires cdef int status cdef pj_time_val end_timeout self.state = state if state == "ACCEPTED" and prev_state == "SENT": try: contact_header = headers['Contact'][0] except LookupError: self._term_code = 1400 self._term_reason = "Contact header missing" with nogil: pjsip_evsub_terminate(self._obj, 1) return 0 _add_event("SIPSubscriptionDidStart", dict(obj=self)) try: expires = int(headers["Expires"]) except (KeyError, ValueError): return 0 if expires == 0: self._want_end = 1 self._cancel_timers(ua, 1, 1) end_timeout.sec = 1 end_timeout.msec = 0 _add_event("SIPSubscriptionWillEnd", dict(obj=self)) try: self._send_subscribe(ua, 0, &end_timeout, self.extra_headers, None, None) except PJSIPError, e: self._term_reason = e.args[0] if self._obj != NULL: with nogil: pjsip_evsub_terminate(self._obj, 1) return 0 elif state == "TERMINATED": pjsip_evsub_set_mod_data(self._obj, ua._event_module.id, NULL) self._cancel_timers(ua, 1, 1) self._obj = NULL if self._want_end: _add_event("SIPSubscriptionDidEnd", dict(obj=self)) else: min_expires = headers.get('Min-Expires') if self._term_reason is not None: _add_event("SIPSubscriptionDidFail", dict(obj=self, code=self._term_code, reason=self._term_reason, min_expires=min_expires)) else: subscription_state = headers.get('Subscription-State') if subscription_state is not None and subscription_state.state == 'terminated': reason = subscription_state.reason _add_event("SIPSubscriptionDidFail", dict(obj=self, code=code, reason=reason, min_expires=min_expires)) if prev_state != state: _add_event("SIPSubscriptionChangedState", dict(obj=self, prev_state=prev_state, state=state)) cdef int _cb_got_response(self, PJSIPUA ua, pjsip_rx_data *rdata) except -1: # PJSIP holds the dialog lock when this callback is entered cdef dict event_dict = dict() cdef int expires = self._expires cdef int status cdef pj_time_val refresh _pjsip_msg_to_dict(rdata.msg_info.msg, event_dict) self.to_header = FrozenToHeader_create(rdata.msg_info.to_hdr) if self.state != "TERMINATED": try: contact_header = event_dict["headers"]["Contact"][0] except LookupError: return 0 try: expires = int(event_dict["headers"]["Expires"]) except (KeyError, ValueError): expires = self._expires if expires == 0: return 0 if self.state != "TERMINATED" and not self._want_end: self._cancel_timers(ua, 1, 0) refresh.sec = max(1, expires - self.expire_warning_time, expires/2) refresh.msec = 0 status = pjsip_endpt_schedule_timer(ua._pjsip_endpoint._obj, &self._refresh_timer, &refresh) if status == 0: self._refresh_timer_active = 1 cdef int _cb_notify(self, PJSIPUA ua, pjsip_rx_data *rdata) except -1: # PJSIP holds the dialog lock when this callback is entered cdef dict event_dict = dict() cdef dict notify_dict = dict(obj=self) _pjsip_msg_to_dict(rdata.msg_info.msg, event_dict) body = event_dict["body"] content_type = event_dict["headers"].get("Content-Type", None) event = event_dict["headers"].get("Event", None) if event is None or event.event != self.event or (body is not None and content_type is not None and content_type.content_type not in ua.events[event.event]): return 0 notify_dict["request_uri"] = event_dict["request_uri"] notify_dict["from_header"] = event_dict["headers"].get("From", None) notify_dict["to_header"] = event_dict["headers"].get("To", None) notify_dict["headers"] = event_dict["headers"] notify_dict["body"] = body notify_dict["content_type"] = content_type.content_type if content_type and body else None notify_dict["event"] = event.event _add_event("SIPSubscriptionGotNotify", notify_dict) cdef int _cb_timeout_timer(self, PJSIPUA ua): # Timer callback, dialog lock is not held by PJSIP global sip_status_messages with nogil: pjsip_dlg_inc_lock(self._dlg) try: self._term_code = PJSIP_SC_TSX_TIMEOUT self._term_reason = sip_status_messages[PJSIP_SC_TSX_TIMEOUT] if self._obj != NULL: with nogil: pjsip_evsub_terminate(self._obj, 1) finally: with nogil: pjsip_dlg_dec_lock(self._dlg) cdef int _cb_refresh_timer(self, PJSIPUA ua): # Timer callback, dialog lock is not held by PJSIP with nogil: pjsip_dlg_inc_lock(self._dlg) try: self._send_subscribe(ua, self.refresh, &self._subscribe_timeout, self.extra_headers, self.content_type, self.body) except PJSIPError, e: self._term_reason = e.args[0] if self._obj != NULL: with nogil: pjsip_evsub_terminate(self._obj, 1) finally: with nogil: pjsip_dlg_dec_lock(self._dlg) cdef class IncomingSubscription: # properties property content_type: def __get__(self): if self._content_type is None: return None return "%s/%s" % (self._content_type.str, self._content_subtype.str) property content: def __get__(self): if self._content is None: return None return self._content.str def __cinit__(self): self.state = None self.peer_address = None self.call_id = None def __dealloc__(self): cdef PJSIPUA ua = self._get_ua(0) self._initial_response = NULL self._initial_tsx = NULL if self._obj != NULL: pjsip_evsub_set_mod_data(self._obj, ua._event_module.id, NULL) with nogil: pjsip_evsub_terminate(self._obj, 0) self._obj = NULL if self._dlg != NULL and ua is not None: with nogil: pjsip_dlg_dec_session(self._dlg, &ua._module) self._dlg = NULL cdef int init(self, PJSIPUA ua, pjsip_rx_data *rdata, str event) except -1: global _incoming_subs_cb cdef int status cdef str transport cdef FrozenSIPURI request_uri cdef FrozenContactHeader contact_header cdef PJSTR contact_str cdef dict event_dict cdef pjsip_expires_hdr *expires_header cdef char *error_message expires_header = pjsip_msg_find_hdr(rdata.msg_info.msg, PJSIP_H_EXPIRES, NULL) if expires_header == NULL: self._expires = 3600 else: self._expires = min(expires_header.ivalue, 3600) self._set_state("incoming") self.event = event self.peer_address = EndpointAddress(rdata.pkt_info.src_name, rdata.pkt_info.src_port) event_dict = dict(obj=self) _pjsip_msg_to_dict(rdata.msg_info.msg, event_dict) transport = rdata.tp_info.transport.type_name.lower() request_uri = event_dict["request_uri"] if _is_valid_ip(pj_AF_INET(), request_uri.host.encode()): contact_header = FrozenContactHeader(request_uri) else: contact_header = FrozenContactHeader(FrozenSIPURI(host=_pj_str_to_str(rdata.tp_info.transport.local_name.host), user=request_uri.user, port=rdata.tp_info.transport.local_name.port, parameters=(frozendict(transport=transport) if transport != "udp" else frozendict()))) contact_str = PJSTR(str(contact_header.body).encode()) with nogil: status = pjsip_dlg_create_uas_and_inc_lock(pjsip_ua_instance(), rdata, &contact_str.pj_str, &self._dlg) if status != 0: error_message = "Could not create dialog for incoming SUBSCRIBE" else: pjsip_dlg_inc_session(self._dlg, &ua._module) # Increment dialog session count so it's never destroyed by PJSIP # setting the transport to rdata.tp_info.transport doesn't work as the NOTIFY has to be sent to the Contact URI and the transports can conflict if status != 0: raise PJSIPError(error_message, status) self._initial_tsx = pjsip_rdata_get_tsx(rdata) self.call_id = _pj_str_to_str(self._dlg.call_id.id) with nogil: status = pjsip_evsub_create_uas(self._dlg, &_incoming_subs_cb, rdata, 0, &self._obj) pjsip_dlg_dec_lock(self._dlg) if status != 0: pjsip_tsx_terminate(self._initial_tsx, 500) self._initial_tsx = NULL self._dlg = NULL error_message = "Could not create incoming SUBSCRIBE session" else: pjsip_evsub_set_mod_data(self._obj, ua._event_module.id, self) status = pjsip_dlg_create_response(self._dlg, rdata, 500, NULL, &self._initial_response) if status != 0: pjsip_tsx_terminate(self._initial_tsx, 500) self._initial_tsx = NULL error_message = "Could not create response for incoming SUBSCRIBE" if status != 0: raise PJSIPError(error_message, status) _add_event("SIPIncomingSubscriptionGotSubscribe", event_dict) return 0 def reject(self, int code): cdef PJSIPUA ua = self._get_ua(1) with nogil: pjsip_dlg_inc_lock(self._dlg) try: if self.state != "incoming": raise SIPCoreInvalidStateError('Can only reject an incoming SUBSCRIBE in the "incoming" state, '+ 'object is currently in the "%s" state' % self.state) if not (300 <= code < 700): raise ValueError("Invalid negative SIP response code: %d" % code) self._send_initial_response(code) pjsip_evsub_set_mod_data(self._obj, ua._event_module.id, NULL) with nogil: pjsip_evsub_terminate(self._obj, 0) self._obj = NULL self._set_state("terminated") _add_event("SIPIncomingSubscriptionDidEnd", dict(obj=self)) finally: with nogil: pjsip_dlg_dec_lock(self._dlg) def accept_pending(self): cdef PJSIPUA ua = self._get_ua(1) with nogil: pjsip_dlg_inc_lock(self._dlg) try: if self.state != "incoming": raise SIPCoreInvalidStateError('Can only accept an incoming SUBSCRIBE as pending in the "incoming" state, '+ 'object is currently in the "%s" state' % self.state) self._send_initial_response(202) self._set_state("pending") if self._expires > 0: self._send_notify() else: # cleanup will be done by _cb_tsx self._terminate(ua, "timeout", 0) finally: with nogil: pjsip_dlg_dec_lock(self._dlg) def accept(self, str content_type=None, str content=None): global _re_content_type cdef object content_type_match cdef PJSIPUA ua = self._get_ua(1) with nogil: pjsip_dlg_inc_lock(self._dlg) try: if self.state not in ("incoming", "pending"): raise SIPCoreInvalidStateError('Can only accept an incoming SUBSCRIBE in the "incoming" or "pending" state, object is currently in the "%s" state' % self.state) if (content_type is None and content is not None) or (content_type is not None and content is None): raise ValueError('Either both or neither of the "content_type" and "content" arguments should be specified') if content_type is not None: content_type_match = _re_content_type.match(content_type) if content_type_match is None: raise ValueError("content_type parameter is not properly formatted") self._content_type = PJSTR(content_type_match.group(1).encode()) self._content_subtype = PJSTR(content_type_match.group(2).encode()) self._content = PJSTR(content.encode()) if self.state == "incoming": self._send_initial_response(200) self._set_state("active") if self._expires > 0: self._send_notify() else: # cleanup will be done by _cb_tsx self._terminate(ua, "timeout", 0) finally: with nogil: pjsip_dlg_dec_lock(self._dlg) def push_content(self, str content_type not None, str content not None): global _re_content_type cdef object content_type_match cdef PJSIPUA ua = self._get_ua(1) with nogil: pjsip_dlg_inc_lock(self._dlg) try: if self.state != "active": raise SIPCoreInvalidStateError('Can only push the content for a SUBSCRIBE session in the "active" state, ' 'object is currently in the "%s" state' % self.state) content_type_match = _re_content_type.match(content_type) if content_type_match is None: raise ValueError("content_type parameter is not properly formatted") self._content_type = PJSTR(content_type_match.group(1).encode()) self._content_subtype = PJSTR(content_type_match.group(2).encode()) self._content = PJSTR(content.encode()) self._send_notify() finally: with nogil: pjsip_dlg_dec_lock(self._dlg) def end(self, reason=None): cdef PJSIPUA ua = self._get_ua(0) with nogil: pjsip_dlg_inc_lock(self._dlg) try: if self.state == "terminated": return if self.state not in ("pending", "active"): raise SIPCoreInvalidStateError('Can only end an incoming SUBSCRIBE session in the "pending" or '+ '"active" state, object is currently in the "%s" state' % self.state) self._terminate(ua, reason, 1) finally: with nogil: pjsip_dlg_dec_lock(self._dlg) cdef int _set_state(self, str state) except -1: cdef str prev_state prev_state = self.state self.state = state if prev_state != state and prev_state is not None: _add_event("SIPIncomingSubscriptionChangedState", dict(obj=self, prev_state=prev_state, state=state)) cdef PJSIPUA _get_ua(self, int raise_exception): cdef PJSIPUA ua try: ua = _get_ua() except SIPCoreError: self._obj = NULL self._initial_response = NULL self._initial_tsx = NULL self._set_state("terminated") if raise_exception: raise else: return None else: return ua cdef int _send_initial_response(self, int code) except -1: cdef PJSIPUA ua = self._get_ua(1) cdef int status with nogil: status = pjsip_dlg_modify_response(self._dlg, self._initial_response, code, NULL) if status != 0: raise PJSIPError("Could not modify response", status) # pjsip_dlg_modify_response() increases ref count unncessarily with nogil: pjsip_tx_data_dec_ref(self._initial_response) if code / 100 == 2: pjsip_msg_add_hdr(self._initial_response.msg, pjsip_expires_hdr_create(self._initial_response.pool, self._expires)) with nogil: status = pjsip_dlg_send_response(self._dlg, self._initial_tsx, self._initial_response) if status != 0: raise PJSIPError("Could not send response", status) self._initial_response = NULL self._initial_tsx = NULL if self._expires > 0: with nogil: # Start TIMER_TYPE_UAS_TIMEOUT, which PJSIP doesn't do for the initial SUBSCRIBE pjsip_evsub_set_timer(self._obj, 2, self._expires) cdef int _send_notify(self, str reason=None) except -1: cdef pjsip_evsub_state state cdef pj_str_t reason_pj cdef pj_str_t *reason_p cdef pjsip_tx_data *tdata cdef int status reason_p = NULL if self.state == "pending": state = PJSIP_EVSUB_STATE_PENDING elif self.state == "active": state = PJSIP_EVSUB_STATE_ACTIVE else: state = PJSIP_EVSUB_STATE_TERMINATED if reason is not None: _str_to_pj_str(reason.encode(), &reason_pj) reason_p = &reason_pj with nogil: status = pjsip_evsub_notify(self._obj, state, NULL, reason_p, &tdata) if status != 0: raise PJSIPError("Could not create NOTIFY request", status) if self.state == "active" and None not in (self._content_type, self._content_subtype, self._content): tdata.msg.body = pjsip_msg_body_create(tdata.pool, &self._content_type.pj_str, &self._content_subtype.pj_str, &self._content.pj_str) with nogil: status = pjsip_evsub_send_request(self._obj, tdata) if status != 0: raise PJSIPError("Could not send NOTIFY request", status) event_dict = dict(obj=self) _pjsip_msg_to_dict(tdata.msg, event_dict) _add_event("SIPIncomingSubscriptionSentNotify", event_dict) return 0 cdef int _terminate(self, PJSIPUA ua, str reason, int do_cleanup) except -1: cdef int status self._set_state("terminated") try: self._send_notify(reason) except SIPCoreError: pass if do_cleanup: pjsip_evsub_set_mod_data(self._obj, ua._event_module.id, NULL) self._obj = NULL _add_event("SIPIncomingSubscriptionDidEnd", dict(obj=self)) # callback methods cdef int _cb_rx_refresh(self, PJSIPUA ua, pjsip_rx_data *rdata) except -1: # PJSIP holds the dialog lock when this callback is entered cdef int status cdef pjsip_expires_hdr *expires_header cdef int expires cdef dict event_dict event_dict = dict(obj=self) _pjsip_msg_to_dict(rdata.msg_info.msg, event_dict) expires_header = pjsip_msg_find_hdr(rdata.msg_info.msg, PJSIP_H_EXPIRES, NULL) if expires_header == NULL: self._expires = 3600 else: if expires_header.ivalue == 0: _add_event("SIPIncomingSubscriptionGotUnsubscribe", event_dict) # cleanup will be done by _cb_tsx self._terminate(ua, None, 0) return 200 else: self._expires = min(expires_header.ivalue, 3600) _add_event("SIPIncomingSubscriptionGotRefreshingSubscribe", event_dict) try: self._send_notify() except SIPCoreError, e: _add_event("SIPIncomingSubscriptionNotifyDidFail", dict(obj=self, code=0, reason=e.args[0])) if self.state == "active": return 200 else: return 202 cdef int _cb_server_timeout(self, PJSIPUA ua) except -1: # PJSIP holds the dialog lock when this callback is entered _add_event("SIPIncomingSubscriptionDidTimeout", dict(obj=self)) self._terminate(ua, "timeout", 1) cdef int _cb_tsx(self, PJSIPUA ua, pjsip_event *event) except -1: # PJSIP holds the dialog lock when this callback is entered cdef pjsip_rx_data *rdata cdef dict event_dict cdef int status_code if (event != NULL and event.type == PJSIP_EVENT_TSX_STATE and event.body.tsx_state.tsx.role == PJSIP_ROLE_UAC and _pj_str_to_str(event.body.tsx_state.tsx.method.name) == "NOTIFY" and event.body.tsx_state.tsx.state == PJSIP_TSX_STATE_COMPLETED): event_dict = dict(obj=self) rdata = event.body.tsx_state.src.rdata if rdata != NULL: if self.peer_address is None: self.peer_address = EndpointAddress(rdata.pkt_info.src_name, rdata.pkt_info.src_port) else: self.peer_address.ip = rdata.pkt_info.src_name self.peer_address.port = rdata.pkt_info.src_port status_code = event.body.tsx_state.tsx.status_code if event.body.tsx_state.type==PJSIP_EVENT_RX_MSG and status_code/100==2: _pjsip_msg_to_dict(rdata.msg_info.msg, event_dict) _add_event("SIPIncomingSubscriptionNotifyDidSucceed", event_dict) else: if event.body.tsx_state.type == PJSIP_EVENT_RX_MSG: _pjsip_msg_to_dict(rdata.msg_info.msg, event_dict) else: event_dict["code"] = status_code event_dict["reason"] = _pj_str_to_str(event.body.tsx_state.tsx.status_text) _add_event("SIPIncomingSubscriptionNotifyDidFail", event_dict) if status_code in (408, 481) or status_code/100==7: # PJSIP will terminate the subscription and the dialog will be destroyed self._terminate(ua, None, 1) elif (event != NULL and event.type == PJSIP_EVENT_TSX_STATE and event.body.tsx_state.tsx.role == PJSIP_ROLE_UAC and _pj_str_to_str(event.body.tsx_state.tsx.method.name) == "NOTIFY" and event.body.tsx_state.tsx.state == PJSIP_TSX_STATE_TERMINATED): event_dict = dict(obj=self) status_code = event.body.tsx_state.tsx.status_code if status_code == 408: # Local timeout, PJSIP will terminate the subscription and the dialog will be destroyed event_dict["code"] = status_code event_dict["reason"] = _pj_str_to_str(event.body.tsx_state.tsx.status_text) _add_event("SIPIncomingSubscriptionNotifyDidFail", event_dict) self._terminate(ua, None, 1) elif (event != NULL and event.type == PJSIP_EVENT_TSX_STATE and event.body.tsx_state.tsx.role == PJSIP_ROLE_UAS and _pj_str_to_str(event.body.tsx_state.tsx.method.name) == "SUBSCRIBE" and event.body.tsx_state.tsx.state == PJSIP_TSX_STATE_COMPLETED and event.body.tsx_state.type == PJSIP_EVENT_TX_MSG): event_dict = dict(obj=self) _pjsip_msg_to_dict(event.body.tsx_state.src.tdata.msg, event_dict) _add_event("SIPIncomingSubscriptionAnsweredSubscribe", event_dict) if self.state == "terminated" and self._obj != NULL: pjsip_evsub_set_mod_data(self._obj, ua._event_module.id, NULL) self._obj = NULL # callback functions cdef void _Subscription_cb_state(pjsip_evsub *sub, pjsip_event *event) with gil: cdef void *subscription_void cdef Subscription subscription cdef object state cdef int code = 0 cdef object reason = None cdef pjsip_rx_data *rdata = NULL cdef PJSIPUA ua try: ua = _get_ua() except: return try: subscription_void = pjsip_evsub_get_mod_data(sub, ua._event_module.id) if subscription_void == NULL: return subscription = subscription_void state = pjsip_evsub_get_state_name(sub) if (event != NULL and event.type == PJSIP_EVENT_TSX_STATE and (event.body.tsx_state.tsx.state == PJSIP_TSX_STATE_COMPLETED or event.body.tsx_state.tsx.state == PJSIP_TSX_STATE_TERMINATED)): if state == "TERMINATED": if event.body.tsx_state.tsx.role == PJSIP_ROLE_UAC: code = event.body.tsx_state.tsx.status_code reason = _pj_str_to_str(event.body.tsx_state.tsx.status_text) else: code = 0 reason = None if event.body.tsx_state.type == PJSIP_EVENT_RX_MSG and _pj_str_to_str(event.body.tsx_state.tsx.method.name) in ("SUBSCRIBE", "NOTIFY"): rdata = event.body.tsx_state.src.rdata headers_dict = dict() if rdata != NULL: rdata_dict = dict() _pjsip_msg_to_dict(rdata.msg_info.msg, rdata_dict) headers_dict = rdata_dict.get('headers', {}) subscription._cb_state(ua, state, code, reason, headers_dict) except: ua._handle_exception(1) cdef void _Subscription_cb_tsx(pjsip_evsub *sub, pjsip_transaction *tsx, pjsip_event *event) with gil: cdef void *subscription_void cdef Subscription subscription cdef pjsip_rx_data *rdata cdef PJSIPUA ua try: ua = _get_ua() except: return try: subscription_void = pjsip_evsub_get_mod_data(sub, ua._event_module.id) if subscription_void == NULL: return subscription = subscription_void if (event != NULL and event.type == PJSIP_EVENT_TSX_STATE and event.body.tsx_state.type == PJSIP_EVENT_RX_MSG and event.body.tsx_state.tsx.role == PJSIP_ROLE_UAC and event.body.tsx_state.tsx.state == PJSIP_TSX_STATE_COMPLETED and _pj_str_to_str(event.body.tsx_state.tsx.method.name) == "SUBSCRIBE" and event.body.tsx_state.tsx.status_code / 100 == 2): rdata = event.body.tsx_state.src.rdata if rdata != NULL: if subscription.peer_address is None: subscription.peer_address = EndpointAddress(rdata.pkt_info.src_name, rdata.pkt_info.src_port) else: subscription.peer_address.ip = rdata.pkt_info.src_name subscription.peer_address.port = rdata.pkt_info.src_port subscription._cb_got_response(ua, rdata) except: ua._handle_exception(1) cdef void _Subscription_cb_notify(pjsip_evsub *sub, pjsip_rx_data *rdata, int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body) with gil: cdef void *subscription_void cdef Subscription subscription cdef PJSIPUA ua try: ua = _get_ua() except: return try: subscription_void = pjsip_evsub_get_mod_data(sub, ua._event_module.id) if subscription_void == NULL: return subscription = subscription_void if rdata != NULL: if subscription.peer_address is None: subscription.peer_address = EndpointAddress(rdata.pkt_info.src_name, rdata.pkt_info.src_port) else: subscription.peer_address.ip = rdata.pkt_info.src_name subscription.peer_address.port = rdata.pkt_info.src_port subscription._cb_notify(ua, rdata) except: ua._handle_exception(1) cdef void _Subscription_cb_refresh(pjsip_evsub *sub) with gil: # We want to handle the refresh timer oursevles, ignore the PJSIP provided timer pass cdef void _Subscription_cb_timer(pj_timer_heap_t *timer_heap, pj_timer_entry *entry) with gil: cdef Subscription subscription cdef PJSIPUA ua try: ua = _get_ua() except: return try: if entry.user_data != NULL: subscription = entry.user_data if subscription._dlg == NULL: return if entry.id == 1: subscription._refresh_timer_active = 0 subscription._cb_refresh_timer(ua) else: subscription._timeout_timer_active = 0 subscription._cb_timeout_timer(ua) except: ua._handle_exception(1) cdef void _IncomingSubscription_cb_rx_refresh(pjsip_evsub *sub, pjsip_rx_data *rdata, int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body) with gil: cdef void *subscription_void cdef IncomingSubscription subscription cdef PJSIPUA ua try: ua = _get_ua() except: return try: subscription_void = pjsip_evsub_get_mod_data(sub, ua._event_module.id) if subscription_void == NULL: p_st_code[0] = 481 return subscription = subscription_void if rdata != NULL: if subscription.peer_address is None: subscription.peer_address = EndpointAddress(rdata.pkt_info.src_name, rdata.pkt_info.src_port) else: subscription.peer_address.ip = rdata.pkt_info.src_name subscription.peer_address.port = rdata.pkt_info.src_port p_st_code[0] = subscription._cb_rx_refresh(ua, rdata) except: ua._handle_exception(1) cdef void _IncomingSubscription_cb_server_timeout(pjsip_evsub *sub) with gil: cdef void *subscription_void cdef IncomingSubscription subscription cdef PJSIPUA ua try: ua = _get_ua() except: return try: subscription_void = pjsip_evsub_get_mod_data(sub, ua._event_module.id) if subscription_void == NULL: return subscription = subscription_void subscription._cb_server_timeout(ua) except: ua._handle_exception(1) cdef void _IncomingSubscription_cb_tsx(pjsip_evsub *sub, pjsip_transaction *tsx, pjsip_event *event) with gil: cdef void *subscription_void cdef IncomingSubscription subscription cdef PJSIPUA ua try: ua = _get_ua() except: return try: subscription_void = pjsip_evsub_get_mod_data(sub, ua._event_module.id) if subscription_void == NULL: return subscription = subscription_void subscription._cb_tsx(ua, event) except: ua._handle_exception(1) # globals cdef pjsip_evsub_user _subs_cb _subs_cb.on_evsub_state = _Subscription_cb_state _subs_cb.on_tsx_state = _Subscription_cb_tsx _subs_cb.on_rx_notify = _Subscription_cb_notify _subs_cb.on_client_refresh = _Subscription_cb_refresh cdef pjsip_evsub_user _incoming_subs_cb _incoming_subs_cb.on_rx_refresh = _IncomingSubscription_cb_rx_refresh _incoming_subs_cb.on_server_timeout = _IncomingSubscription_cb_server_timeout _incoming_subs_cb.on_tsx_state = _IncomingSubscription_cb_tsx _re_content_type = re.compile("^([a-zA-Z0-9\-.!%*_+`'~]+)\/([a-zA-Z0-9\-.!%*_+`'~]+)$") diff --git a/sipsimple/core/_engine.py b/sipsimple/core/_engine.py index 126f5efe..043f3e6c 100644 --- a/sipsimple/core/_engine.py +++ b/sipsimple/core/_engine.py @@ -1,139 +1,147 @@ """ Implements a mechanism for starting the SIP core engine based on PJSIP (http://pjsip.org) stack. """ __all__ = ["Engine"] import sys import traceback import atexit from application.notification import NotificationCenter, NotificationData from application.python.types import Singleton from threading import Thread, RLock from sipsimple import log, __version__ from sipsimple.core._core import PJSIPUA, PJ_VERSION, PJ_SVN_REVISION, SIPCoreError class Engine(Thread, metaclass=Singleton): default_start_options = {"ip_address": None, "udp_port": 0, "tcp_port": None, "tls_port": None, "tls_verify_server": False, "tls_ca_file": None, "tls_cert_file": None, "tls_privkey_file": None, "tls_timeout": 3000, "user_agent": "sipsimple-%s-pjsip-%s-r%s" % (__version__, PJ_VERSION, PJ_SVN_REVISION), "log_level": 0, "trace_sip": False, "detect_sip_loops": True, "rtp_port_range": (50000, 50500), "zrtp_cache": None, "codecs": ["G722", "speex", "PCMU", "PCMA"], "video_codecs": ["H264", "H263-1998", "VP8"], "enable_colorbar_device": False, - "events": {b"conference": [b"application/conference-info+xml"], - b"message-summary": [b"application/simple-message-summary"], - b"presence": [b"multipart/related", b"application/rlmi+xml", b"application/pidf+xml"], - b"presence.winfo": [b"application/watcherinfo+xml"], - b"dialog": [b"multipart/related", b"application/rlmi+xml", b"application/dialog-info+xml"], - b"dialog.winfo": [b"application/watcherinfo+xml"], - b"refer": [b"message/sipfrag;version=2.0"], - b"xcap-diff": [b"application/xcap-diff+xml"]}, + "events": {"conference": ["application/conference-info+xml"], + "message-summary": ["application/simple-message-summary"], + "presence": ["multipart/related", "application/rlmi+xml", "application/pidf+xml"], + "presence.winfo": ["application/watcherinfo+xml"], + "dialog": ["multipart/related", "application/rlmi+xml", "application/dialog-info+xml"], + "dialog.winfo": ["application/watcherinfo+xml"], + "refer": ["message/sipfrag;version=2.0"], + "xcap-diff": ["application/xcap-diff+xml"]}, "incoming_events": set(), "incoming_requests": set()} def __init__(self): self.notification_center = NotificationCenter() self._thread_started = False self._thread_stopping = False self._lock = RLock() self._options = None atexit.register(self.stop) super(Engine, self).__init__() self.daemon = True @property def is_running(self): return (hasattr(self, "_ua") and hasattr(self, "_thread_started") and self._thread_started and not self._thread_stopping) def __dir__(self): if hasattr(self, '_ua'): ua_attributes = [attr for attr in dir(self._ua) if not attr.startswith('__') and attr != 'poll'] else: ua_attributes = [] return sorted(set(dir(self.__class__) + list(self.__dict__.keys()) + ua_attributes)) def __getattr__(self, attr): if attr not in ["_ua", "poll"] and hasattr(self, "_ua") and attr in dir(self._ua): return getattr(self._ua, attr) raise AttributeError("'%s' object has no attribute '%s'" % (self.__class__.__name__, attr)) def __setattr__(self, attr, value): if attr not in ["_ua", "poll"] and hasattr(self, "_ua") and attr in dir(self._ua): setattr(self._ua, attr, value) return object.__setattr__(self, attr, value) def start(self, **kwargs): with self._lock: if self._thread_started: raise SIPCoreError("Worker thread was already started once") self._options = kwargs self._thread_started = True super(Engine, self).start() def stop(self): with self._lock: if self._thread_stopping: return if self._thread_started: self._thread_stopping = True # worker thread def run(self): self.notification_center.post_notification('SIPEngineWillStart', sender=self) init_options = Engine.default_start_options.copy() init_options.update(self._options) + for k in list(init_options['events'].keys()): + if isinstance(k, str): + init_options['events'][k.encode()] = init_options['events'][k] + del(init_options['events'][k]) + + for k in list(init_options['events'].keys()): + init_options['events'][k] = list(v.encode() if isinstance(v, str) else v for v in init_options['events'][k]) + try: self._ua = PJSIPUA(self._handle_event, **init_options) except Exception: log.exception('Exception occurred while starting the Engine') exc_type, exc_val, exc_tb = sys.exc_info() exc_tb = "".join(traceback.format_exception(exc_type, exc_val, exc_tb)) self.notification_center.post_notification('SIPEngineGotException', sender=self, data=NotificationData(type=exc_type, value=exc_val, traceback=exc_tb)) self.notification_center.post_notification('SIPEngineDidFail', sender=self) return else: self.notification_center.post_notification('SIPEngineDidStart', sender=self) failed = False while not self._thread_stopping: try: failed = self._ua.poll() except Exception as e: log.exception('Exception occurred while running the Engine') traceback.print_exc() exc_type, exc_val, exc_tb = sys.exc_info() self.notification_center.post_notification('SIPEngineGotException', sender=self, data=NotificationData(type=exc_type, value=exc_val, traceback="".join(traceback.format_exception(exc_type, exc_val, exc_tb)))) failed = True if failed: self.notification_center.post_notification('SIPEngineDidFail', sender=self) break if not failed: self.notification_center.post_notification('SIPEngineWillEnd', sender=self) self._ua.dealloc() del self._ua self.notification_center.post_notification('SIPEngineDidEnd', sender=self) def _handle_event(self, event_name, **kwargs): sender = kwargs.pop("obj", None) if sender is None: sender = self self.notification_center.post_notification(event_name, sender, NotificationData(**kwargs)) diff --git a/sipsimple/core/_helpers.py b/sipsimple/core/_helpers.py index 0475f734..8b22b0d2 100644 --- a/sipsimple/core/_helpers.py +++ b/sipsimple/core/_helpers.py @@ -1,129 +1,129 @@ """Miscellaneous SIP related helpers""" import random import socket import string from application.python.types import MarkerType from application.system import host from sipsimple.core._core import SIPURI from sipsimple.core._engine import Engine __all__ = ['Route', 'ContactURIFactory', 'NoGRUU', 'PublicGRUU', 'TemporaryGRUU', 'PublicGRUUIfAvailable', 'TemporaryGRUUIfAvailable'] class Route(object): _default_ports = dict(udp=5060, tcp=5060, tls=5061) def __init__(self, address, port=None, transport='udp'): - self.address = address + self.address = address.decode() if isinstance(address, bytes) else address self.port = port - self.transport = transport + self.transport = transport.decode() if isinstance(transport, bytes) else transport @property def address(self): return self._address @address.setter def address(self, address): try: socket.inet_aton(address) except: raise ValueError('illegal address: %s' % address) self._address = address @property def port(self): if self._port is None: return 5061 if self.transport == 'tls' else 5060 else: return self._port @port.setter def port(self, port): port = int(port) if port is not None else None if port is not None and not (0 < port < 65536): raise ValueError('illegal port value: %d' % port) self._port = port @property def transport(self): return self._transport @transport.setter def transport(self, transport): if transport not in ('udp', 'tcp', 'tls'): raise ValueError('illegal transport value: %s' % transport) self._transport = transport @property def uri(self): port = None if self._default_ports[self.transport] == self.port else self.port parameters = {} if self.transport == 'udp' else {'transport': self.transport.encode()} return SIPURI(host=self.address, port=port, parameters=parameters) def __repr__(self): return '{0.__class__.__name__}({0.address!r}, port={0.port!r}, transport={0.transport!r})'.format(self) def __str__(self): return str(self.uri) class ContactURIType(MarkerType): pass class NoGRUU(metaclass=ContactURIType): pass class PublicGRUU(metaclass=ContactURIType): pass class TemporaryGRUU(metaclass=ContactURIType): pass class PublicGRUUIfAvailable(metaclass=ContactURIType): pass class TemporaryGRUUIfAvailable(metaclass=ContactURIType): pass class ContactURIFactory(object): def __init__(self, username=None): self.username = username or ''.join(random.sample(string.digits, 8)) self.public_gruu = None self.temporary_gruu = None def __repr__(self): return '{0.__class__.__name__}(username={0.username!r})'.format(self) def __getitem__(self, key): if isinstance(key, tuple): contact_type, key = key if not isinstance(contact_type, ContactURIType): raise KeyError("unsupported contact type: %r" % contact_type) else: contact_type = NoGRUU if not isinstance(key, (str, Route)): raise KeyError("key must be a transport name or Route instance") transport = key if isinstance(key, str) else key.transport parameters = {} if transport == 'udp' else {'transport': transport} if contact_type is PublicGRUU: if self.public_gruu is None: raise KeyError("could not get Public GRUU") uri = SIPURI.new(self.public_gruu) elif contact_type is TemporaryGRUU: if self.temporary_gruu is None: raise KeyError("could not get Temporary GRUU") uri = SIPURI.new(self.temporary_gruu) elif contact_type is PublicGRUUIfAvailable and self.public_gruu is not None: uri = SIPURI.new(self.public_gruu) elif contact_type is TemporaryGRUUIfAvailable and self.temporary_gruu is not None: uri = SIPURI.new(self.temporary_gruu) else: ip = host.default_ip if isinstance(key, str) else host.outgoing_ip_for(key.address) if ip is None: raise KeyError("could not get outgoing IP address") port = getattr(Engine(), '%s_port' % transport, None) if port is None: raise KeyError("unsupported transport: %s" % transport) uri = SIPURI(user=self.username, host=ip, port=port) uri.parameters.update(parameters) return uri diff --git a/sipsimple/session.py b/sipsimple/session.py index 316dd8b3..3b77d599 100644 --- a/sipsimple/session.py +++ b/sipsimple/session.py @@ -1,2740 +1,2740 @@ """ Implements an asynchronous notification based mechanism for establishment, modification and termination of sessions using Session Initiation Protocol (SIP) standardized in RFC3261. """ __all__ = ['Session', 'SessionManager'] import random from threading import RLock from time import time from application.notification import IObserver, Notification, NotificationCenter, NotificationData from application.python import Null, limit from application.python.decorator import decorator, preserve_signature from application.python.types import Singleton from application.system import host from eventlib import api, coros, proc from twisted.internet import reactor from zope.interface import implementer from sipsimple import log from sipsimple.account import AccountManager, BonjourAccount from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import DialogID, Engine, Invitation, Referral, Subscription, PJSIPError, SIPCoreError, SIPCoreInvalidStateError, SIPURI, sip_status_messages, sipfrag_re from sipsimple.core import ContactHeader, FromHeader, Header, ReasonHeader, ReferToHeader, ReplacesHeader, RouteHeader, ToHeader, WarningHeader from sipsimple.core import SDPConnection, SDPMediaStream, SDPSession from sipsimple.core import PublicGRUU, PublicGRUUIfAvailable, NoGRUU from sipsimple.lookup import DNSLookup, DNSLookupError from sipsimple.payloads import ParserError from sipsimple.payloads.conference import ConferenceDocument from sipsimple.streams import MediaStreamRegistry, InvalidStreamError, UnknownStreamError from sipsimple.threading import run_in_twisted_thread from sipsimple.threading.green import Command, run_in_green_thread from sipsimple.util import ISOTimestamp class InvitationDisconnectedError(Exception): def __init__(self, invitation, data): self.invitation = invitation self.data = data class MediaStreamDidNotInitializeError(Exception): def __init__(self, stream, data): self.stream = stream self.data = data class MediaStreamDidFailError(Exception): def __init__(self, stream, data): self.stream = stream self.data = data class SubscriptionError(Exception): def __init__(self, error, timeout, **attributes): self.error = error self.timeout = timeout self.attributes = attributes class SIPSubscriptionDidFail(Exception): def __init__(self, data): self.data = data class InterruptSubscription(Exception): pass class TerminateSubscription(Exception): pass class ReferralError(Exception): def __init__(self, error, code=0): self.error = error self.code = code class TerminateReferral(Exception): pass class SIPReferralDidFail(Exception): def __init__(self, data): self.data = data class IllegalStateError(RuntimeError): pass class IllegalDirectionError(RuntimeError): pass class SIPInvitationTransferDidFail(Exception): def __init__(self, data): self.data = data @decorator def transition_state(required_state, new_state): def state_transitioner(func): @preserve_signature(func) def wrapper(obj, *args, **kwargs): with obj._lock: if obj.state != required_state: raise IllegalStateError('cannot call %s in %s state' % (func.__name__, obj.state)) obj.state = new_state return func(obj, *args, **kwargs) return wrapper return state_transitioner @decorator def check_state(required_states): def state_checker(func): @preserve_signature(func) def wrapper(obj, *args, **kwargs): if obj.state not in required_states: raise IllegalStateError('cannot call %s in %s state' % (func.__name__, obj.state)) return func(obj, *args, **kwargs) return wrapper return state_checker @decorator def check_transfer_state(direction, state): def state_checker(func): @preserve_signature(func) def wrapper(obj, *args, **kwargs): if obj.transfer_handler.direction != direction: raise IllegalDirectionError('cannot transfer in %s direction' % obj.transfer_handler.direction) if obj.transfer_handler.state != state: raise IllegalStateError('cannot transfer in %s state' % obj.transfer_handler.state) return func(obj, *args, **kwargs) return wrapper return state_checker class AddParticipantOperation(object): pass class RemoveParticipantOperation(object): pass @implementer(IObserver) class ReferralHandler(object): def __init__(self, session, participant_uri, operation): self.participant_uri = participant_uri if not isinstance(self.participant_uri, SIPURI): if not self.participant_uri.startswith(('sip:', 'sips:')): self.participant_uri = 'sip:%s' % self.participant_uri try: self.participant_uri = SIPURI.parse(self.participant_uri) except SIPCoreError: notification_center = NotificationCenter() if operation is AddParticipantOperation: notification_center.post_notification('SIPConferenceDidNotAddParticipant', sender=session, data=NotificationData(participant=self.participant_uri, code=0, reason='invalid participant URI')) else: notification_center.post_notification('SIPConferenceDidNotRemoveParticipant', sender=session, data=NotificationData(participant=self.participant_uri, code=0, reason='invalid participant URI')) return self.session = session self.operation = operation self.active = False self.route = None self._channel = coros.queue() self._referral = None def start(self): notification_center = NotificationCenter() if not self.session.remote_focus: if self.operation is AddParticipantOperation: notification_center.post_notification('SIPConferenceDidNotAddParticipant', sender=self.session, data=NotificationData(participant=self.participant_uri, code=0, reason='remote endpoint is not a focus')) else: notification_center.post_notification('SIPConferenceDidNotRemoveParticipant', sender=self.session, data=NotificationData(participant=self.participant_uri, code=0, reason='remote endpoint is not a focus')) self.session = None return notification_center.add_observer(self, sender=self.session) notification_center.add_observer(self, name='NetworkConditionsDidChange') proc.spawn(self._run) def _run(self): notification_center = NotificationCenter() settings = SIPSimpleSettings() try: # Lookup routes account = self.session.account if account is BonjourAccount(): uri = SIPURI.new(self.session._invitation.remote_contact_header.uri) elif account.sip.outbound_proxy is not None and account.sip.outbound_proxy.transport in settings.sip.transport_list: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) elif account.sip.always_use_my_proxy: uri = SIPURI(host=account.id.domain) else: uri = SIPURI.new(self.session.remote_identity.uri) lookup = DNSLookup() try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError as e: timeout = random.uniform(15, 30) raise ReferralError(error='DNS lookup failed: %s' % e) target_uri = SIPURI.new(self.session.remote_identity.uri) timeout = time() + 30 for route in routes: self.route = route remaining_time = timeout - time() if remaining_time > 0: try: contact_uri = account.contact[NoGRUU, route] except KeyError: continue refer_to_header = ReferToHeader(str(self.participant_uri)) refer_to_header.parameters['method'] = 'INVITE' if self.operation is AddParticipantOperation else 'BYE' referral = Referral(target_uri, FromHeader(account.uri, account.display_name), ToHeader(target_uri), refer_to_header, ContactHeader(contact_uri), RouteHeader(route.uri), account.credentials) notification_center.add_observer(self, sender=referral) try: referral.send_refer(timeout=limit(remaining_time, min=1, max=5)) except SIPCoreError: notification_center.remove_observer(self, sender=referral) timeout = 5 raise ReferralError(error='Internal error') self._referral = referral try: while True: notification = self._channel.wait() if notification.name == 'SIPReferralDidStart': break except SIPReferralDidFail as e: notification_center.remove_observer(self, sender=referral) self._referral = None if e.data.code in (403, 405): raise ReferralError(error=sip_status_messages[e.data.code], code=e.data.code) else: # Otherwise just try the next route continue else: break else: self.route = None raise ReferralError(error='No more routes to try') # At this point it is subscribed. Handle notifications and ending/failures. try: self.active = True while True: notification = self._channel.wait() if notification.name == 'SIPReferralGotNotify': if notification.data.event == 'refer' and notification.data.body: match = sipfrag_re.match(notification.data.body) if match: code = int(match.group('code')) reason = match.group('reason') if code/100 > 2: continue if self.operation is AddParticipantOperation: notification_center.post_notification('SIPConferenceGotAddParticipantProgress', sender=self.session, data=NotificationData(participant=self.participant_uri, code=code, reason=reason)) else: notification_center.post_notification('SIPConferenceGotRemoveParticipantProgress', sender=self.session, data=NotificationData(participant=self.participant_uri, code=code, reason=reason)) elif notification.name == 'SIPReferralDidEnd': break except SIPReferralDidFail as e: notification_center.remove_observer(self, sender=self._referral) raise ReferralError(error=e.data.reason, code=e.data.code) else: notification_center.remove_observer(self, sender=self._referral) if self.operation is AddParticipantOperation: notification_center.post_notification('SIPConferenceDidAddParticipant', sender=self.session, data=NotificationData(participant=self.participant_uri)) else: notification_center.post_notification('SIPConferenceDidRemoveParticipant', sender=self.session, data=NotificationData(participant=self.participant_uri)) finally: self.active = False except TerminateReferral: if self._referral is not None: try: self._referral.end(timeout=2) except SIPCoreError: pass else: try: while True: notification = self._channel.wait() if notification.name == 'SIPReferralDidEnd': break except SIPReferralDidFail: pass finally: notification_center.remove_observer(self, sender=self._referral) if self.operation is AddParticipantOperation: notification_center.post_notification('SIPConferenceDidNotAddParticipant', sender=self.session, data=NotificationData(participant=self.participant_uri, code=0, reason='error')) else: notification_center.post_notification('SIPConferenceDidNotRemoveParticipant', sender=self.session, data=NotificationData(participant=self.participant_uri, code=0, reason='error')) except ReferralError as e: if self.operation is AddParticipantOperation: notification_center.post_notification('SIPConferenceDidNotAddParticipant', sender=self.session, data=NotificationData(participant=self.participant_uri, code=e.code, reason=e.error)) else: notification_center.post_notification('SIPConferenceDidNotRemoveParticipant', sender=self.session, data=NotificationData(participant=self.participant_uri, code=e.code, reason=e.error)) finally: notification_center.remove_observer(self, sender=self.session) notification_center.remove_observer(self, name='NetworkConditionsDidChange') self.session = None self._referral = None def _refresh(self): try: contact_header = ContactHeader(self.session.account.contact[NoGRUU, self.route]) except KeyError: pass else: try: self._referral.refresh(contact_header=contact_header, timeout=2) except (SIPCoreError, SIPCoreInvalidStateError): pass @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPReferralDidStart(self, notification): self._channel.send(notification) def _NH_SIPReferralDidEnd(self, notification): self._channel.send(notification) def _NH_SIPReferralDidFail(self, notification): self._channel.send_exception(SIPReferralDidFail(notification.data)) def _NH_SIPReferralGotNotify(self, notification): self._channel.send(notification) def _NH_SIPSessionDidFail(self, notification): self._channel.send_exception(TerminateReferral()) def _NH_SIPSessionWillEnd(self, notification): self._channel.send_exception(TerminateReferral()) def _NH_NetworkConditionsDidChange(self, notification): if self.active: self._refresh() @implementer(IObserver) class ConferenceHandler(object): def __init__(self, session): self.session = session self.active = False self.subscribed = False self._command_proc = None self._command_channel = coros.queue() self._data_channel = coros.queue() self._subscription = None self._subscription_proc = None self._subscription_timer = None notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.session) notification_center.add_observer(self, name='NetworkConditionsDidChange') self._command_proc = proc.spawn(self._run) @run_in_green_thread def add_participant(self, participant_uri): referral_handler = ReferralHandler(self.session, participant_uri, AddParticipantOperation) referral_handler.start() @run_in_green_thread def remove_participant(self, participant_uri): referral_handler = ReferralHandler(self.session, participant_uri, RemoveParticipantOperation) referral_handler.start() def _run(self): while True: command = self._command_channel.wait() handler = getattr(self, '_CH_%s' % command.name) handler(command) def _activate(self): self.active = True command = Command('subscribe') self._command_channel.send(command) return command def _deactivate(self): self.active = False command = Command('unsubscribe') self._command_channel.send(command) return command def _resubscribe(self): command = Command('subscribe') self._command_channel.send(command) return command def _terminate(self): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self.session) notification_center.remove_observer(self, name='NetworkConditionsDidChange') self._deactivate() command = Command('terminate') self._command_channel.send(command) command.wait() self.session = None def _CH_subscribe(self, command): if self._subscription_timer is not None and self._subscription_timer.active(): self._subscription_timer.cancel() self._subscription_timer = None if self._subscription_proc is not None: subscription_proc = self._subscription_proc subscription_proc.kill(InterruptSubscription) subscription_proc.wait() self._subscription_proc = proc.spawn(self._subscription_handler, command) def _CH_unsubscribe(self, command): # Cancel any timer which would restart the subscription process if self._subscription_timer is not None and self._subscription_timer.active(): self._subscription_timer.cancel() self._subscription_timer = None if self._subscription_proc is not None: subscription_proc = self._subscription_proc subscription_proc.kill(TerminateSubscription) subscription_proc.wait() self._subscription_proc = None command.signal() def _CH_terminate(self, command): command.signal() raise proc.ProcExit() def _subscription_handler(self, command): notification_center = NotificationCenter() settings = SIPSimpleSettings() try: # Lookup routes account = self.session.account if account is BonjourAccount(): uri = SIPURI.new(self.session._invitation.remote_contact_header.uri) elif account.sip.outbound_proxy is not None and account.sip.outbound_proxy.transport in settings.sip.transport_list: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) elif account.sip.always_use_my_proxy: uri = SIPURI(host=account.id.domain) else: uri = SIPURI.new(self.session.remote_identity.uri) lookup = DNSLookup() try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError as e: timeout = random.uniform(15, 30) raise SubscriptionError(error='DNS lookup failed: %s' % e, timeout=timeout) target_uri = SIPURI.new(self.session.remote_identity.uri) default_interval = 600 if account is BonjourAccount() else account.sip.subscribe_interval refresh_interval = getattr(command, 'refresh_interval', default_interval) timeout = time() + 30 for route in routes: remaining_time = timeout - time() if remaining_time > 0: try: contact_uri = account.contact[NoGRUU, route] except KeyError: continue subscription = Subscription(target_uri, FromHeader(account.uri, account.display_name), ToHeader(target_uri), ContactHeader(contact_uri), 'conference', RouteHeader(route.uri), credentials=account.credentials, refresh=refresh_interval) notification_center.add_observer(self, sender=subscription) try: subscription.subscribe(timeout=limit(remaining_time, min=1, max=5)) except SIPCoreError: notification_center.remove_observer(self, sender=subscription) timeout = 5 raise SubscriptionError(error='Internal error', timeout=timeout) self._subscription = subscription try: while True: notification = self._data_channel.wait() if notification.sender is subscription and notification.name == 'SIPSubscriptionDidStart': break except SIPSubscriptionDidFail as e: notification_center.remove_observer(self, sender=subscription) self._subscription = None if e.data.code == 407: # Authentication failed, so retry the subscription in some time timeout = random.uniform(60, 120) raise SubscriptionError(error='Authentication failed', timeout=timeout) elif e.data.code == 423: # Get the value of the Min-Expires header timeout = random.uniform(60, 120) if e.data.min_expires is not None and e.data.min_expires > refresh_interval: raise SubscriptionError(error='Interval too short', timeout=timeout, min_expires=e.data.min_expires) else: raise SubscriptionError(error='Interval too short', timeout=timeout) elif e.data.code in (405, 406, 489, 1400): command.signal(e) return else: # Otherwise just try the next route continue else: self.subscribed = True command.signal() break else: # There are no more routes to try, reschedule the subscription timeout = random.uniform(60, 180) raise SubscriptionError(error='No more routes to try', timeout=timeout) # At this point it is subscribed. Handle notifications and ending/failures. try: while True: notification = self._data_channel.wait() if notification.sender is not self._subscription: continue if notification.name == 'SIPSubscriptionGotNotify': if notification.data.event == 'conference' and notification.data.body: try: conference_info = ConferenceDocument.parse(notification.data.body) except ParserError: pass else: notification_center.post_notification('SIPSessionGotConferenceInfo', sender=self.session, data=NotificationData(conference_info=conference_info)) elif notification.name == 'SIPSubscriptionDidEnd': break except SIPSubscriptionDidFail: self._command_channel.send(Command('subscribe')) notification_center.remove_observer(self, sender=self._subscription) except InterruptSubscription as e: if not self.subscribed: command.signal(e) if self._subscription is not None: notification_center.remove_observer(self, sender=self._subscription) try: self._subscription.end(timeout=2) except SIPCoreError: pass except TerminateSubscription as e: if not self.subscribed: command.signal(e) if self._subscription is not None: try: self._subscription.end(timeout=2) except SIPCoreError: pass else: try: while True: notification = self._data_channel.wait() if notification.sender is self._subscription and notification.name == 'SIPSubscriptionDidEnd': break except SIPSubscriptionDidFail: pass finally: notification_center.remove_observer(self, sender=self._subscription) except SubscriptionError as e: if 'min_expires' in e.attributes: command = Command('subscribe', command.event, refresh_interval=e.attributes['min_expires']) else: command = Command('subscribe', command.event) self._subscription_timer = reactor.callLater(e.timeout, self._command_channel.send, command) finally: self.subscribed = False self._subscription = None self._subscription_proc = None @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSubscriptionDidStart(self, notification): self._data_channel.send(notification) def _NH_SIPSubscriptionDidEnd(self, notification): self._data_channel.send(notification) def _NH_SIPSubscriptionDidFail(self, notification): self._data_channel.send_exception(SIPSubscriptionDidFail(notification.data)) def _NH_SIPSubscriptionGotNotify(self, notification): self._data_channel.send(notification) def _NH_SIPSessionDidStart(self, notification): if self.session.remote_focus: self._activate() @run_in_green_thread def _NH_SIPSessionDidFail(self, notification): self._terminate() @run_in_green_thread def _NH_SIPSessionDidEnd(self, notification): self._terminate() def _NH_SIPSessionDidRenegotiateStreams(self, notification): if self.session.remote_focus and not self.active: self._activate() elif not self.session.remote_focus and self.active: self._deactivate() def _NH_NetworkConditionsDidChange(self, notification): if self.active: self._resubscribe() class TransferInfo(object): def __init__(self, referred_by=None, replaced_dialog_id=None): self.referred_by = referred_by self.replaced_dialog_id = replaced_dialog_id @implementer(IObserver) class TransferHandler(object): def __init__(self, session): self.state = None self.direction = None self.new_session = None self.session = session notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.session) notification_center.add_observer(self, sender=self.session._invitation) self._command_channel = coros.queue() self._data_channel = coros.queue() self._proc = proc.spawn(self._run) def _run(self): while True: command = self._command_channel.wait() handler = getattr(self, '_CH_%s' % command.name) handler(command) self.direction = None self.state = None def _CH_incoming_transfer(self, command): self.direction = 'incoming' notification_center = NotificationCenter() refer_to_hdr = command.data.headers.get('Refer-To') target = SIPURI.parse(refer_to_hdr.uri) referred_by_hdr = command.data.headers.get('Referred-By', None) if referred_by_hdr is not None: origin = referred_by_hdr.body else: origin = str(self.session.remote_identity.uri) try: while True: try: notification = self._data_channel.wait() except SIPInvitationTransferDidFail: self.state = 'failed' return else: if notification.name == 'SIPInvitationTransferDidStart': self.state = 'starting' refer_to_uri = SIPURI.new(target) refer_to_uri.headers = {} refer_to_uri.parameters = {} notification_center.post_notification('SIPSessionTransferNewIncoming', self.session, NotificationData(transfer_destination=refer_to_uri)) elif notification.name == 'SIPSessionTransferDidStart': break elif notification.name == 'SIPSessionTransferDidFail': self.state = 'failed' try: self.session._invitation.notify_transfer_progress(notification.data.code, notification.data.reason) except SIPCoreError: return while True: try: notification = self._data_channel.wait() except SIPInvitationTransferDidFail: return self.state = 'started' transfer_info = TransferInfo(referred_by=origin) try: replaces_hdr = target.headers.pop('Replaces') call_id, rest = replaces_hdr.split(';', 1) params = dict((item.split('=') for item in rest.split(';'))) to_tag = params.get('to-tag') from_tag = params.get('from-tag') except (KeyError, ValueError): pass else: transfer_info.replaced_dialog_id = DialogID(call_id, local_tag=from_tag, remote_tag=to_tag) settings = SIPSimpleSettings() account = self.session.account if account is BonjourAccount(): uri = target elif account.sip.outbound_proxy is not None and account.sip.outbound_proxy.transport in settings.sip.transport_list: uri = SIPURI(host=account.sip.outbound_proxy.host, port=account.sip.outbound_proxy.port, parameters={'transport': account.sip.outbound_proxy.transport}) elif account.sip.always_use_my_proxy: uri = SIPURI(host=account.id.domain) else: uri = target lookup = DNSLookup() try: routes = lookup.lookup_sip_proxy(uri, settings.sip.transport_list).wait() except DNSLookupError as e: self.state = 'failed' notification_center.post_notification('SIPSessionTransferDidFail', sender=self.session, data=NotificationData(code=0, reason="DNS lookup failed: {}".format(e))) try: self.session._invitation.notify_transfer_progress(480) except SIPCoreError: return while True: try: self._data_channel.wait() except SIPInvitationTransferDidFail: return return self.new_session = Session(account) notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.new_session) self.new_session.connect(ToHeader(target), routes=routes, streams=[MediaStreamRegistry.AudioStream()], transfer_info=transfer_info) while True: try: notification = self._data_channel.wait() except SIPInvitationTransferDidFail: return if notification.name == 'SIPInvitationTransferDidEnd': return except proc.ProcExit: if self.new_session is not None: notification_center.remove_observer(self, sender=self.new_session) self.new_session = None raise def _CH_outgoing_transfer(self, command): self.direction = 'outgoing' notification_center = NotificationCenter() self.state = 'starting' while True: try: notification = self._data_channel.wait() except SIPInvitationTransferDidFail as e: self.state = 'failed' notification_center.post_notification('SIPSessionTransferDidFail', sender=self.session, data=NotificationData(code=e.data.code, reason=e.data.reason)) return if notification.name == 'SIPInvitationTransferDidStart': self.state = 'started' notification_center.post_notification('SIPSessionTransferDidStart', sender=self.session) elif notification.name == 'SIPInvitationTransferDidEnd': self.state = 'ended' self.session.end() notification_center.post_notification('SIPSessionTransferDidEnd', sender=self.session) return def _terminate(self): notification_center = NotificationCenter() notification_center.remove_observer(self, sender=self.session._invitation) notification_center.remove_observer(self, sender=self.session) self._proc.kill() self._proc = None self._command_channel = None self._data_channel = None self.session = None @run_in_twisted_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPInvitationTransferNewIncoming(self, notification): self._command_channel.send(Command('incoming_transfer', data=notification.data)) def _NH_SIPInvitationTransferNewOutgoing(self, notification): self._command_channel.send(Command('outgoing_transfer', data=notification.data)) def _NH_SIPInvitationTransferDidStart(self, notification): self._data_channel.send(notification) def _NH_SIPInvitationTransferDidFail(self, notification): self._data_channel.send_exception(SIPInvitationTransferDidFail(notification.data)) def _NH_SIPInvitationTransferDidEnd(self, notification): self._data_channel.send(notification) def _NH_SIPInvitationTransferGotNotify(self, notification): if notification.data.event == 'refer' and notification.data.body: match = sipfrag_re.match(notification.data.body) if match: code = int(match.group('code')) reason = match.group('reason') notification.center.post_notification('SIPSessionTransferGotProgress', sender=self.session, data=NotificationData(code=code, reason=reason)) def _NH_SIPSessionTransferDidStart(self, notification): if notification.sender is self.session and self.state == 'starting': self._data_channel.send(notification) def _NH_SIPSessionTransferDidFail(self, notification): if notification.sender is self.session and self.state == 'starting': self._data_channel.send(notification) def _NH_SIPSessionGotRingIndication(self, notification): if notification.sender is self.new_session and self.session is not None: try: self.session._invitation.notify_transfer_progress(180) except SIPCoreError: pass def _NH_SIPSessionGotProvisionalResponse(self, notification): if notification.sender is self.new_session and self.session is not None: try: self.session._invitation.notify_transfer_progress(notification.data.code, notification.data.reason) except SIPCoreError: pass def _NH_SIPSessionDidStart(self, notification): if notification.sender is self.new_session: notification.center.remove_observer(self, sender=notification.sender) self.new_session = None if self.session is not None: notification.center.post_notification('SIPSessionTransferDidEnd', sender=self.session) if self.state == 'started': try: self.session._invitation.notify_transfer_progress(200) except SIPCoreError: pass self.state = 'ended' self.session.end() def _NH_SIPSessionDidEnd(self, notification): if notification.sender is self.new_session: # If any stream fails to start we won't get SIPSessionDidFail, we'll get here instead notification.center.remove_observer(self, sender=notification.sender) self.new_session = None if self.session is not None: notification.center.post_notification('SIPSessionTransferDidFail', sender=self.session, data=NotificationData(code=500, reason='internal error')) if self.state == 'started': try: self.session._invitation.notify_transfer_progress(500) except SIPCoreError: pass self.state = 'failed' else: self._terminate() def _NH_SIPSessionDidFail(self, notification): if notification.sender is self.new_session: notification.center.remove_observer(self, sender=notification.sender) self.new_session = None if self.session is not None: notification.center.post_notification('SIPSessionTransferDidFail', sender=self.session, data=NotificationData(code=notification.data.code or 500, reason=notification.data.reason)) if self.state == 'started': try: self.session._invitation.notify_transfer_progress(notification.data.code or 500, notification.data.reason) except SIPCoreError: pass self.state = 'failed' else: self._terminate() class OptionalTag(str): def __eq__(self, other): return other is None or super(OptionalTag, self).__eq__(other) def __ne__(self, other): return not self == other def __repr__(self): return '{}({})'.format(self.__class__.__name__, super(OptionalTag, self).__repr__()) @implementer(IObserver) class SessionReplaceHandler(object): def __init__(self, session): self.session = session def start(self): notification_center = NotificationCenter() notification_center.add_observer(self, sender=self.session) notification_center.add_observer(self, sender=self.session.replaced_session) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPSessionDidStart(self, notification): notification.center.remove_observer(self, sender=self.session) notification.center.remove_observer(self, sender=self.session.replaced_session) self.session.replaced_session.end() self.session.replaced_session = None self.session = None def _NH_SIPSessionDidFail(self, notification): if notification.sender is self.session: notification.center.remove_observer(self, sender=self.session) notification.center.remove_observer(self, sender=self.session.replaced_session) self.session.replaced_session = None self.session = None _NH_SIPSessionDidEnd = _NH_SIPSessionDidFail @implementer(IObserver) class Session(object): media_stream_timeout = 15 short_reinvite_timeout = 5 def __init__(self, account): self.account = account self.direction = None self.end_time = None self.on_hold = False self.proposed_streams = None self.route = None self.state = None self.start_time = None self.streams = None self.transport = None self.local_focus = False self.remote_focus = False self.greenlet = None self.conference = None self.replaced_session = None self.transfer_handler = None self.transfer_info = None self._channel = coros.queue() self._hold_in_progress = False self._invitation = None self._local_identity = None self._remote_identity = None self._lock = RLock() def init_incoming(self, invitation, data): notification_center = NotificationCenter() remote_sdp = invitation.sdp.proposed_remote self.proposed_streams = [] if remote_sdp: for index, media_stream in enumerate(remote_sdp.media): if media_stream.port != 0: for stream_type in MediaStreamRegistry: try: stream = stream_type.new_from_sdp(self, remote_sdp, index) - except UnknownStreamError: + except UnknownStreamError as e: continue except InvalidStreamError as e: log.error("Invalid stream: {}".format(e)) break except Exception as e: log.exception("Exception occurred while setting up stream from SDP: {}".format(e)) break else: stream.index = index self.proposed_streams.append(stream) break self.direction = 'incoming' self.state = 'incoming' self.transport = invitation.transport self._invitation = invitation self.conference = ConferenceHandler(self) self.transfer_handler = TransferHandler(self) if 'isfocus' in invitation.remote_contact_header.parameters: self.remote_focus = True if 'Referred-By' in data.headers or 'Replaces' in data.headers: self.transfer_info = TransferInfo() if 'Referred-By' in data.headers: self.transfer_info.referred_by = data.headers['Referred-By'].body if 'Replaces' in data.headers: replaces_header = data.headers.get('Replaces') # Because we only allow the remote tag to be optional, it can only match established dialogs and early outgoing dialogs, but not early incoming dialogs, # which according to RFC3891 should be rejected with 481 (which will happen automatically by never matching them). if replaces_header.early_only or replaces_header.from_tag == '0': replaced_dialog_id = DialogID(replaces_header.call_id, local_tag=replaces_header.to_tag, remote_tag=OptionalTag(replaces_header.from_tag)) else: replaced_dialog_id = DialogID(replaces_header.call_id, local_tag=replaces_header.to_tag, remote_tag=replaces_header.from_tag) session_manager = SessionManager() try: replaced_session = next(session for session in session_manager.sessions if session.dialog_id == replaced_dialog_id) except StopIteration: invitation.send_response(481) return else: # Any matched dialog at this point is either established, terminated or early outgoing. if replaced_session.state in ('terminating', 'terminated'): invitation.send_response(603) return elif replaced_session.dialog_id.remote_tag is not None and replaces_header.early_only: # The replaced dialog is established, but the early-only flag is set invitation.send_response(486) return self.replaced_session = replaced_session self.transfer_info.replaced_dialog_id = replaced_dialog_id replace_handler = SessionReplaceHandler(self) replace_handler.start() notification_center.add_observer(self, sender=invitation) notification_center.post_notification('SIPSessionNewIncoming', sender=self, data=NotificationData(streams=self.proposed_streams[:], headers=data.headers)) @transition_state(None, 'connecting') @run_in_green_thread def connect(self, to_header, routes, streams, is_focus=False, transfer_info=None, extra_headers=None): self.greenlet = api.getcurrent() notification_center = NotificationCenter() settings = SIPSimpleSettings() connected = False received_code = 0 received_reason = None unhandled_notifications = [] extra_headers = extra_headers or [] if {'to', 'from', 'via', 'contact', 'route', 'record-route'}.intersection(header.name.lower() for header in extra_headers): raise RuntimeError('invalid header in extra_headers: To, From, Via, Contact, Route and Record-Route headers are not allowed') self.direction = 'outgoing' self.proposed_streams = streams self.route = routes[0] self.transport = self.route.transport self.local_focus = is_focus self._invitation = Invitation() display_name = self.account.display_name.decode() if self.account.display_name else None self._local_identity = FromHeader(self.account.uri, display_name) self._remote_identity = to_header self.conference = ConferenceHandler(self) self.transfer_handler = TransferHandler(self) self.transfer_info = transfer_info notification_center.add_observer(self, sender=self._invitation) notification_center.post_notification('SIPSessionNewOutgoing', sender=self, data=NotificationData(streams=streams[:])) for stream in self.proposed_streams: notification_center.add_observer(self, sender=stream) stream.initialize(self, direction='outgoing') try: wait_count = len(self.proposed_streams) while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidInitialize': wait_count -= 1 try: contact_uri = self.account.contact[PublicGRUUIfAvailable, self.route] local_ip = host.outgoing_ip_for(self.route.address) if local_ip is None: raise ValueError("could not get outgoing IP address") except (KeyError, ValueError) as e: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='local', code=480, reason=sip_status_messages[480], error=str(e)) return connection = SDPConnection(local_ip.encode()) local_sdp = SDPSession(local_ip.encode(), name=settings.user_agent.encode()) for index, stream in enumerate(self.proposed_streams): stream.index = index media = stream.get_local_media(remote_sdp=None, index=index) if media.connection is None or (media.connection is not None and not media.has_ice_attributes and not media.has_ice_candidates): media.connection = connection local_sdp.media.append(media) display_name = self.account.display_name.decode() if self.account.display_name else None from_header = FromHeader(self.account.uri, display_name) route_header = RouteHeader(self.route.uri) contact_header = ContactHeader(contact_uri) if is_focus: contact_header.parameters['isfocus'] = None if self.transfer_info is not None: if self.transfer_info.referred_by is not None: extra_headers.append(Header('Referred-By', self.transfer_info.referred_by)) if self.transfer_info.replaced_dialog_id is not None: dialog_id = self.transfer_info.replaced_dialog_id extra_headers.append(ReplacesHeader(dialog_id.call_id, dialog_id.local_tag, dialog_id.remote_tag)) self._invitation.send_invite(to_header.uri, from_header, to_header, route_header, contact_header, local_sdp, self.account.credentials, extra_headers) try: with api.timeout(settings.sip.invite_timeout): while True: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp break else: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='remote', code=0, reason=None, error='SDP negotiation failed: %s' % notification.data.error) return elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'early': if notification.data.code == 180: notification_center.post_notification('SIPSessionGotRingIndication', self) notification_center.post_notification('SIPSessionGotProvisionalResponse', self, NotificationData(code=notification.data.code, reason=notification.data.reason)) elif notification.data.state == 'connecting': received_code = notification.data.code received_reason = notification.data.reason elif notification.data.state == 'connected': if not connected: connected = True notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=received_code, reason=received_reason)) else: unhandled_notifications.append(notification) elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) except api.TimeoutError: self.end() return notification_center.post_notification('SIPSessionWillStart', sender=self) stream_map = dict((stream.index, stream) for stream in self.proposed_streams) for index, local_media in enumerate(local_sdp.media): remote_media = remote_sdp.media[index] stream = stream_map[index] if remote_media.port: # TODO: check if port is also 0 in local_sdp. In that case PJSIP disabled the stream because # negotiation failed. If there are more streams, however, the negotiation is considered successful as a # whole, so while we built a normal SDP, PJSIP modified it and sent it to the other side. That's kind io # OK, but we cannot really start the stream. -Saul stream.start(local_sdp, remote_sdp, index) else: notification_center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() removed_streams = [stream for stream in self.proposed_streams if stream.index >= len(local_sdp.media)] for stream in removed_streams: notification_center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() invitation_notifications = [] with api.timeout(self.media_stream_timeout): wait_count = len(self.proposed_streams) while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidStart': wait_count -= 1 elif notification.name == 'SIPInvitationChangedState': invitation_notifications.append(notification) for notification in invitation_notifications: self._channel.send(notification) while not connected or self._channel: notification = self._channel.wait() if notification.name == 'SIPInvitationChangedState': if notification.data.state == 'early': if notification.data.code == 180: notification_center.post_notification('SIPSessionGotRingIndication', self) notification_center.post_notification('SIPSessionGotProvisionalResponse', self, NotificationData(code=notification.data.code, reason=notification.data.reason)) elif notification.data.state == 'connecting': received_code = notification.data.code received_reason = notification.data.reason elif notification.data.state == 'connected': if not connected: connected = True notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=received_code, reason=received_reason)) else: unhandled_notifications.append(notification) elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) except (MediaStreamDidNotInitializeError, MediaStreamDidFailError, api.TimeoutError) as e: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() if isinstance(e, api.TimeoutError): error = 'media stream timed-out while starting' elif isinstance(e, MediaStreamDidNotInitializeError): error = 'media stream did not initialize: %s' % e.data.reason else: error = 'media stream failed: %s' % e.data.reason self._fail(originator='local', code=0, reason=None, error=error) except InvitationDisconnectedError as e: notification_center.remove_observer(self, sender=self._invitation) for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.state = 'terminated' # As weird as it may sound, PJSIP accepts a BYE even without receiving a final response to the INVITE if e.data.prev_state in ('connecting', 'connected') or getattr(e.data, 'method', None) == 'BYE': notification_center.post_notification('SIPSessionWillEnd', self, NotificationData(originator=e.data.originator)) if e.data.originator == 'remote': notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method=e.data.method, code=200, reason=sip_status_messages[200])) self.end_time = ISOTimestamp.now() notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator=e.data.originator, end_reason=e.data.disconnect_reason)) else: if e.data.originator == 'remote': notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=e.data.code, reason=e.data.reason)) code = e.data.code reason = e.data.reason elif e.data.disconnect_reason == 'timeout': code = 408 reason = 'timeout' else: # TODO: we should know *exactly* when there are set -Saul code = getattr(e.data, 'code', 0) reason = getattr(e.data, 'reason', 'Session disconnected') if e.data.originator == 'remote' and code // 100 == 3: redirect_identities = e.data.headers.get('Contact', []) else: redirect_identities = None notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator=e.data.originator, code=code, reason=reason, failure_reason=e.data.disconnect_reason, redirect_identities=redirect_identities)) self.greenlet = None except SIPCoreError as e: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='local', code=0, reason=None, error='SIP core error: %s' % str(e)) else: self.greenlet = None self.state = 'connected' self.streams = self.proposed_streams self.proposed_streams = None self.start_time = ISOTimestamp.now() any_stream_ice = any(getattr(stream, 'ice_active', False) for stream in self.streams) if any_stream_ice: self._reinvite_after_ice() notification_center.post_notification('SIPSessionDidStart', self, NotificationData(streams=self.streams[:])) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: self._send_hold() def _reinvite_after_ice(self): # This function does not do any error checking, it's designed to be called at the end of connect and add_stream self.state = 'sending_proposal' self.greenlet = api.getcurrent() notification_center = NotificationCenter() local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 for index, stream in enumerate(self.streams): local_sdp.media[index] = stream.get_local_media(remote_sdp=None, index=index) self._invitation.send_reinvite(sdp=local_sdp) received_invitation_state = False received_sdp_update = False try: with api.timeout(self.short_reinvite_timeout): while not received_invitation_state or not received_sdp_update: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for index, stream in enumerate(self.streams): stream.update(local_sdp, remote_sdp, index) else: return elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=notification.data.code, reason=notification.data.reason)) elif notification.data.state == 'disconnected': self.end() return except Exception: pass finally: self.state = 'connected' self.greenlet = None @check_state(['incoming', 'received_proposal']) @run_in_green_thread def send_ring_indication(self): try: self._invitation.send_response(180) except SIPCoreInvalidStateError: pass # The INVITE session might have already been cancelled; ignore the error @transition_state('incoming', 'accepting') @run_in_green_thread def accept(self, streams, is_focus=False, extra_headers=None): self.greenlet = api.getcurrent() notification_center = NotificationCenter() settings = SIPSimpleSettings() self.local_focus = is_focus connected = False unhandled_notifications = [] extra_headers = extra_headers or [] if {'to', 'from', 'via', 'contact', 'route', 'record-route'}.intersection(header.name.lower() for header in extra_headers): raise RuntimeError('invalid header in extra_headers: To, From, Via, Contact, Route and Record-Route headers are not allowed') if self.proposed_streams: for stream in self.proposed_streams: if stream in streams: notification_center.add_observer(self, sender=stream) stream.initialize(self, direction='incoming') else: for index, stream in enumerate(streams): notification_center.add_observer(self, sender=stream) stream.index = index stream.initialize(self, direction='outgoing') self.proposed_streams = streams wait_count = len(self.proposed_streams) try: while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidInitialize': wait_count -= 1 remote_sdp = self._invitation.sdp.proposed_remote sdp_connection = remote_sdp.connection or next((media.connection for media in remote_sdp.media if media.connection is not None)) local_ip = host.outgoing_ip_for(sdp_connection.address) if sdp_connection.address != '0.0.0.0' else sdp_connection.address if local_ip is None: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='local', code=500, reason=sip_status_messages[500], error='could not get local IP address') return connection = SDPConnection(local_ip.encode()) local_sdp = SDPSession(local_ip.encode(), name=settings.user_agent.encode()) if remote_sdp: stream_map = dict((stream.index, stream) for stream in self.proposed_streams) for index, media in enumerate(remote_sdp.media): stream = stream_map.get(index, None) if stream is not None: print('session.py: we will get local media') # TODO: broken for RTP streams here media = stream.get_local_media(remote_sdp=remote_sdp, index=index) print('session.py: we got local media %s' % media) if not media.has_ice_attributes and not media.has_ice_candidates: media.connection = connection else: - media = SDPMediaStream.new(media.encode()) + media = SDPMediaStream.new(media if isinstance(media, bytes) else media.encode() ) media.connection = connection media.port = 0 media.attributes = [] media.bandwidth_info = [] print('Added media to SDP: %s' % media) local_sdp.media.append(media) else: for index, stream in enumerate(self.proposed_streams): stream.index = index print('session.py: we will get local media 2') media = stream.get_local_media(remote_sdp=None, index=index) print('session.py: we got local media 2 %s' % media) if media.connection is None or (media.connection is not None and not media.has_ice_attributes and not media.has_ice_candidates): media.connection = connection print('Added media to SDP: %s' % media) local_sdp.media.append(media) contact_header = ContactHeader.new(self._invitation.local_contact_header) try: local_contact_uri = self.account.contact[PublicGRUU, self._invitation.transport] except KeyError: pass else: contact_header.uri = local_contact_uri if is_focus: contact_header.parameters['isfocus'] = None self._invitation.send_response(200, contact_header=contact_header, sdp=local_sdp, extra_headers=extra_headers) notification_center.post_notification('SIPSessionWillStart', sender=self) # Local and remote SDPs will be set after the 200 OK is sent while True: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp break else: if not connected: # we could not have got a SIPInvitationGotSDPUpdate if we did not get an ACK connected = True notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=200, reason=sip_status_messages[200], ack_received=True)) for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='remote', code=0, reason=None, error='SDP negotiation failed: %s' % notification.data.error) return elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected': if not connected: connected = True notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=200, reason=sip_status_messages[200], ack_received=True)) elif notification.data.prev_state == 'connected': unhandled_notifications.append(notification) elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) wait_count = 0 stream_map = dict((stream.index, stream) for stream in self.proposed_streams) for index, local_media in enumerate(local_sdp.media): remote_media = remote_sdp.media[index] stream = stream_map.get(index, None) if stream is not None: if remote_media.port: wait_count += 1 stream.start(local_sdp, remote_sdp, index) else: notification_center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() removed_streams = [stream for stream in self.proposed_streams if stream.index >= len(local_sdp.media)] for stream in removed_streams: notification_center.remove_observer(self, sender=stream) self.proposed_streams.remove(stream) del stream_map[stream.index] stream.deactivate() stream.end() with api.timeout(self.media_stream_timeout): while wait_count > 0 or not connected or self._channel: notification = self._channel.wait() if notification.name == 'MediaStreamDidStart': wait_count -= 1 elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected': if not connected: connected = True notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=200, reason='OK', ack_received=True)) elif notification.data.prev_state == 'connected': unhandled_notifications.append(notification) elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) else: unhandled_notifications.append(notification) except (MediaStreamDidNotInitializeError, MediaStreamDidFailError, api.TimeoutError) as e: if self._invitation.state == 'connecting': ack_received = False if isinstance(e, api.TimeoutError) and wait_count == 0 else 'unknown' # pjsip's invite session object does not inform us whether the ACK was received or not notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=200, reason='OK', ack_received=ack_received)) elif self._invitation.state == 'connected' and not connected: # we didn't yet get to process the SIPInvitationChangedState (state -> connected) notification notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=200, reason='OK', ack_received=True)) for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() reason_header = None if isinstance(e, api.TimeoutError): if wait_count > 0: error = 'media stream timed-out while starting' else: error = 'No ACK received' reason_header = ReasonHeader('SIP') reason_header.cause = 500 reason_header.text = 'Missing ACK' elif isinstance(e, MediaStreamDidNotInitializeError): error = 'media stream did not initialize: %s' % e.data.reason reason_header = ReasonHeader('SIP') reason_header.cause = 500 reason_header.text = 'media stream did not initialize' else: error = 'media stream failed: %s' % e.data.reason reason_header = ReasonHeader('SIP') reason_header.cause = 500 reason_header.text = 'media stream failed to start' self.start_time = ISOTimestamp.now() if self._invitation.state in ('incoming', 'early'): self._fail(originator='local', code=500, reason=sip_status_messages[500], error=error, reason_header=reason_header) else: self._fail(originator='local', code=0, reason=None, error=error, reason_header=reason_header) except InvitationDisconnectedError as e: notification_center.remove_observer(self, sender=self._invitation) for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.state = 'terminated' if e.data.prev_state in ('incoming', 'early'): notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=487, reason='Session Cancelled', ack_received='unknown')) notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='remote', code=487, reason='Session Cancelled', failure_reason=e.data.disconnect_reason, redirect_identities=None)) elif e.data.prev_state == 'connecting' and e.data.disconnect_reason == 'missing ACK': notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=200, reason='OK', ack_received=False)) notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=200, reason=sip_status_messages[200], failure_reason=e.data.disconnect_reason, redirect_identities=None)) else: notification_center.post_notification('SIPSessionWillEnd', self, NotificationData(originator='remote')) notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method=getattr(e.data, 'method', 'INVITE'), code=200, reason='OK')) self.end_time = ISOTimestamp.now() notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator='remote', end_reason=e.data.disconnect_reason)) self.greenlet = None except SIPCoreInvalidStateError: # the only reason for which this error can be thrown is if invitation.send_response was called after the INVITE session was cancelled by the remote party notification_center.remove_observer(self, sender=self._invitation) for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.greenlet = None self.state = 'terminated' notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=487, reason='Session Cancelled', ack_received='unknown')) notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='remote', code=487, reason='Session Cancelled', failure_reason='user request', redirect_identities=None)) except SIPCoreError as e: for stream in self.proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self._fail(originator='local', code=500, reason=sip_status_messages[500], error='SIP core error: %s' % str(e)) else: self.greenlet = None self.state = 'connected' self.streams = self.proposed_streams self.proposed_streams = None self.start_time = ISOTimestamp.now() notification_center.post_notification('SIPSessionDidStart', self, NotificationData(streams=self.streams[:])) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: self._send_hold() finally: self.greenlet = None @transition_state('incoming', 'terminating') @run_in_green_thread def reject(self, code=603, reason=None): self.greenlet = api.getcurrent() notification_center = NotificationCenter() try: self._invitation.send_response(code, reason) with api.timeout(1): while True: notification = self._channel.wait() if notification.name == 'SIPInvitationChangedState': if notification.data.state == 'disconnected': ack_received = notification.data.disconnect_reason != 'missing ACK' notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=code, reason=sip_status_messages[code], ack_received=ack_received)) break except SIPCoreInvalidStateError: # the only reason for which this error can be thrown is if invitation.send_response was called after the INVITE session was cancelled by the remote party self.greenlet = None except SIPCoreError as e: self._fail(originator='local', code=500, reason=sip_status_messages[500], error='SIP core error: %s' % str(e)) except api.TimeoutError: notification_center.remove_observer(self, sender=self._invitation) self.greenlet = None self.state = 'terminated' notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=code, reason=sip_status_messages[code], ack_received=False)) notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=code, reason=sip_status_messages[code], failure_reason='timeout', redirect_identities=None)) else: notification_center.remove_observer(self, sender=self._invitation) self.greenlet = None self.state = 'terminated' notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=code, reason=sip_status_messages[code], failure_reason='user request', redirect_identities=None)) finally: self.greenlet = None @transition_state('received_proposal', 'accepting_proposal') @run_in_green_thread def accept_proposal(self, streams): self.greenlet = api.getcurrent() notification_center = NotificationCenter() unhandled_notifications = [] streams = [stream for stream in streams if stream in self.proposed_streams] for stream in streams: notification_center.add_observer(self, sender=stream) stream.initialize(self, direction='incoming') try: wait_count = len(streams) while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidInitialize': wait_count -= 1 local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 remote_sdp = self._invitation.sdp.proposed_remote connection = SDPConnection(local_sdp.address) stream_map = dict((stream.index, stream) for stream in streams) for index, media in enumerate(remote_sdp.media): stream = stream_map.get(index, None) if stream is not None: media = stream.get_local_media(remote_sdp=remote_sdp, index=index) if not media.has_ice_attributes and not media.has_ice_candidates: media.connection = connection if index < len(local_sdp.media): local_sdp.media[index] = media else: local_sdp.media.append(media) elif index >= len(local_sdp.media): # actually == is sufficient media = SDPMediaStream.new(media) media.connection = connection media.port = 0 media.attributes = [] media.bandwidth_info = [] local_sdp.media.append(media) self._invitation.send_response(200, sdp=local_sdp) prev_on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote) received_invitation_state = False received_sdp_update = False while not received_invitation_state or not received_sdp_update: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for stream in self.streams: stream.update(local_sdp, remote_sdp, stream.index) else: self._fail_proposal(originator='remote', error='SDP negotiation failed: %s' % notification.data.error) return elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=200, reason=sip_status_messages[200], ack_received='unknown')) received_invitation_state = True elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote) if on_hold_streams != prev_on_hold_streams: hold_supported_streams = (stream for stream in self.streams if stream.hold_supported) notification_center.post_notification('SIPSessionDidChangeHoldState', self, NotificationData(originator='remote', on_hold=bool(on_hold_streams), partial=bool(on_hold_streams) and any(not stream.on_hold_by_remote for stream in hold_supported_streams))) for stream in streams: # TODO: check if port is 0 in local_sdp. In that case PJSIP disabled the stream because # negotiation failed. If there are more streams, however, the negotiation is considered successful as a # whole, so while we built a normal SDP, PJSIP modified it and sent it to the other side. That's kind of # OK, but we cannot really start the stream. -Saul stream.start(local_sdp, remote_sdp, stream.index) with api.timeout(self.media_stream_timeout): wait_count = len(streams) while wait_count > 0 or self._channel: notification = self._channel.wait() if notification.name == 'MediaStreamDidStart': wait_count -= 1 else: unhandled_notifications.append(notification) except api.TimeoutError: self._fail_proposal(originator='remote', error='media stream timed-out while starting') except MediaStreamDidNotInitializeError as e: self._fail_proposal(originator='remote', error='media stream did not initialize: {.data.reason}'.format(e)) except MediaStreamDidFailError as e: self._fail_proposal(originator='remote', error='media stream failed: {.data.reason}'.format(e)) except InvitationDisconnectedError as e: self._fail_proposal(originator='remote', error='session ended') notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = notification_center self.handle_notification(notification) except SIPCoreError as e: self._fail_proposal(originator='remote', error='SIP core error: %s' % str(e)) else: proposed_streams = self.proposed_streams self.proposed_streams = None self.streams = self.streams + streams self.greenlet = None self.state = 'connected' notification_center.post_notification('SIPSessionProposalAccepted', self, NotificationData(originator='remote', accepted_streams=streams, proposed_streams=proposed_streams)) notification_center.post_notification('SIPSessionDidRenegotiateStreams', self, NotificationData(originator='remote', added_streams=streams, removed_streams=[])) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: self._send_hold() finally: self.greenlet = None @transition_state('received_proposal', 'rejecting_proposal') @run_in_green_thread def reject_proposal(self, code=488, reason=None): self.greenlet = api.getcurrent() notification_center = NotificationCenter() try: self._invitation.send_response(code, reason) with api.timeout(1, None): while True: notification = self._channel.wait() if notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=code, reason=sip_status_messages[code], ack_received='unknown')) break except SIPCoreError as e: self._fail_proposal(originator='remote', error='SIP core error: %s' % str(e)) else: proposed_streams = self.proposed_streams self.proposed_streams = None self.greenlet = None self.state = 'connected' notification_center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='remote', code=code, reason=sip_status_messages[code], proposed_streams=proposed_streams)) if self._hold_in_progress: self._send_hold() finally: self.greenlet = None def add_stream(self, stream): self.add_streams([stream]) @transition_state('connected', 'sending_proposal') @run_in_green_thread def add_streams(self, streams): streams = list(set(streams).difference(self.streams)) if not streams: self.state = 'connected' return self.greenlet = api.getcurrent() notification_center = NotificationCenter() settings = SIPSimpleSettings() unhandled_notifications = [] self.proposed_streams = streams for stream in self.proposed_streams: notification_center.add_observer(self, sender=stream) stream.initialize(self, direction='outgoing') try: wait_count = len(self.proposed_streams) while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidInitialize': wait_count -= 1 elif notification.name == 'SIPInvitationChangedState': # This is actually the only reason for which this notification could be received if notification.data.state == 'connected' and notification.data.sub_state == 'received_proposal': self._fail_proposal(originator='local', error='received stream proposal') self.handle_notification(notification) return local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 for stream in self.proposed_streams: # Try to reuse a disabled media stream to avoid an ever-growing SDP try: index = next(index for index, media in enumerate(local_sdp.media) if media.port == 0) reuse_media = True except StopIteration: index = len(local_sdp.media) reuse_media = False stream.index = index media = stream.get_local_media(remote_sdp=None, index=index) if reuse_media: local_sdp.media[index] = media else: local_sdp.media.append(media) self._invitation.send_reinvite(sdp=local_sdp) notification_center.post_notification('SIPSessionNewProposal', sender=self, data=NotificationData(originator='local', proposed_streams=self.proposed_streams[:])) received_invitation_state = False received_sdp_update = False try: with api.timeout(settings.sip.invite_timeout): while not received_invitation_state or not received_sdp_update: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for s in self.streams: s.update(local_sdp, remote_sdp, s.index) else: self._fail_proposal(originator='local', error='SDP negotiation failed: %s' % notification.data.error) return elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=notification.data.code, reason=notification.data.reason)) if notification.data.code >= 300: proposed_streams = self.proposed_streams for stream in proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.proposed_streams = None self.greenlet = None self.state = 'connected' notification_center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='local', code=notification.data.code, reason=notification.data.reason, proposed_streams=proposed_streams)) return elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) except api.TimeoutError: self.cancel_proposal() return accepted_streams = [] for stream in self.proposed_streams: try: remote_media = remote_sdp.media[stream.index] except IndexError: self._fail_proposal(originator='local', error='SDP media missing in answer') return else: if remote_media.port: stream.start(local_sdp, remote_sdp, stream.index) accepted_streams.append(stream) else: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() with api.timeout(self.media_stream_timeout): wait_count = len(accepted_streams) while wait_count > 0: notification = self._channel.wait() if notification.name == 'MediaStreamDidStart': wait_count -= 1 except api.TimeoutError: self._fail_proposal(originator='local', error='media stream timed-out while starting') except MediaStreamDidNotInitializeError as e: self._fail_proposal(originator='local', error='media stream did not initialize: {.data.reason}'.format(e)) except MediaStreamDidFailError as e: self._fail_proposal(originator='local', error='media stream failed: {.data.reason}'.format(e)) except InvitationDisconnectedError as e: self._fail_proposal(originator='local', error='session ended') notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = notification_center self.handle_notification(notification) except SIPCoreError as e: self._fail_proposal(originator='local', error='SIP core error: %s' % str(e)) else: self.greenlet = None self.state = 'connected' self.streams += accepted_streams proposed_streams = self.proposed_streams self.proposed_streams = None any_stream_ice = any(getattr(stream, 'ice_active', False) for stream in accepted_streams) if any_stream_ice: self._reinvite_after_ice() notification_center.post_notification('SIPSessionProposalAccepted', self, NotificationData(originator='local', accepted_streams=accepted_streams, proposed_streams=proposed_streams)) notification_center.post_notification('SIPSessionDidRenegotiateStreams', self, NotificationData(originator='local', added_streams=accepted_streams, removed_streams=[])) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: self._send_hold() finally: self.greenlet = None def remove_stream(self, stream): self.remove_streams([stream]) @transition_state('connected', 'sending_proposal') @run_in_green_thread def remove_streams(self, streams): streams = list(set(streams).intersection(self.streams)) if not streams: self.state = 'connected' return self.greenlet = api.getcurrent() notification_center = NotificationCenter() unhandled_notifications = [] try: local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 for stream in streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() self.streams.remove(stream) media = local_sdp.media[stream.index] media.port = 0 media.attributes = [] media.bandwidth_info = [] self._invitation.send_reinvite(sdp=local_sdp) received_invitation_state = False received_sdp_update = False with api.timeout(self.short_reinvite_timeout): while not received_invitation_state or not received_sdp_update: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for s in self.streams: s.update(local_sdp, remote_sdp, s.index) elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=notification.data.code, reason=notification.data.reason)) if not (200 <= notification.data.code < 300): break elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) except InvitationDisconnectedError as e: for stream in streams: stream.end() self.greenlet = None notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = notification_center self.handle_notification(notification) except (api.TimeoutError, MediaStreamDidFailError, SIPCoreError): for stream in streams: stream.end() self.end() else: for stream in streams: stream.end() self.greenlet = None self.state = 'connected' notification_center.post_notification('SIPSessionDidRenegotiateStreams', self, NotificationData(originator='local', added_streams=[], removed_streams=streams)) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: self._send_hold() finally: self.greenlet = None @transition_state('sending_proposal', 'cancelling_proposal') @run_in_green_thread def cancel_proposal(self): if self.greenlet is not None: api.kill(self.greenlet, api.GreenletExit()) self.greenlet = api.getcurrent() notification_center = NotificationCenter() try: self._invitation.cancel_reinvite() while True: try: notification = self._channel.wait() except MediaStreamDidFailError: continue if notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=notification.data.code, reason=notification.data.reason)) if notification.data.code == 487: proposed_streams = self.proposed_streams or [] for stream in proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.proposed_streams = None self.state = 'connected' notification_center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='local', code=notification.data.code, reason=notification.data.reason, proposed_streams=proposed_streams)) elif notification.data.code == 200: self.end() elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) break except SIPCoreError as e: notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', code=0, reason=None, failure_reason='SIP core error: %s' % str(e), redirect_identities=None)) proposed_streams = self.proposed_streams or [] for stream in proposed_streams: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.proposed_streams = None self.greenlet = None self.state = 'connected' notification_center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='local', code=0, reason='SIP core error: %s' % str(e), proposed_streams=proposed_streams)) except InvitationDisconnectedError as e: for stream in self.proposed_streams or []: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.proposed_streams = None self.greenlet = None notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = notification_center self.handle_notification(notification) else: for stream in self.proposed_streams or []: notification_center.remove_observer(self, sender=stream) stream.deactivate() stream.end() self.proposed_streams = None self.greenlet = None self.state = 'connected' finally: self.greenlet = None if self._hold_in_progress: self._send_hold() @run_in_green_thread def hold(self): if self.on_hold or self._hold_in_progress: return self._hold_in_progress = True streams = (self.streams or []) + (self.proposed_streams or []) if not streams: return for stream in streams: stream.hold() if self.state == 'connected': self._send_hold() @run_in_green_thread def unhold(self): if not self.on_hold and not self._hold_in_progress: return self._hold_in_progress = False streams = (self.streams or []) + (self.proposed_streams or []) if not streams: return for stream in streams: stream.unhold() if self.state == 'connected': self._send_unhold() @run_in_green_thread def end(self): if self.state in (None, 'terminating', 'terminated'): return if self.greenlet is not None: api.kill(self.greenlet, api.GreenletExit()) self.greenlet = None notification_center = NotificationCenter() if self._invitation is None: # The invitation was not yet constructed self.state = 'terminated' notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=487, reason='Session Cancelled', failure_reason='user request', redirect_identities=None)) return elif self._invitation.state is None: # The invitation was built but never sent streams = (self.streams or []) + (self.proposed_streams or []) for stream in streams[:]: try: notification_center.remove_observer(self, sender=stream) except KeyError: streams.remove(stream) else: stream.deactivate() stream.end() self.state = 'terminated' notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=487, reason='Session Cancelled', failure_reason='user request', redirect_identities=None)) return invitation_state = self._invitation.state if invitation_state in ('disconnecting', 'disconnected'): return self.greenlet = api.getcurrent() self.state = 'terminating' if invitation_state == 'connected': notification_center.post_notification('SIPSessionWillEnd', self, NotificationData(originator='local')) streams = (self.streams or []) + (self.proposed_streams or []) for stream in streams[:]: try: notification_center.remove_observer(self, sender=stream) except KeyError: streams.remove(stream) else: stream.deactivate() cancelling = invitation_state != 'connected' and self.direction == 'outgoing' try: self._invitation.end(timeout=1) while True: try: notification = self._channel.wait() except MediaStreamDidFailError: continue if notification.name == 'SIPInvitationChangedState' and notification.data.state == 'disconnected': if notification.data.disconnect_reason in ('internal error', 'missing ACK'): pass elif notification.data.disconnect_reason == 'timeout': notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local' if self.direction=='outgoing' else 'remote', method='INVITE', code=408, reason='Timeout')) elif cancelling: notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=notification.data.code, reason=notification.data.reason)) elif hasattr(notification.data, 'method'): notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method=notification.data.method, code=200, reason=sip_status_messages[200])) elif notification.data.disconnect_reason == 'user request': notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='BYE', code=notification.data.code, reason=notification.data.reason)) break except SIPCoreError as e: if cancelling: notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=0, reason=None, failure_reason='SIP core error: %s' % str(e), redirect_identities=None)) else: self.end_time = ISOTimestamp.now() notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator='local', end_reason='SIP core error: %s' % str(e))) except InvitationDisconnectedError as e: # As it weird as it may sound, PJSIP accepts a BYE even without receiving a final response to the INVITE if e.data.prev_state == 'connected': if e.data.originator == 'remote': notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator=e.data.originator, method=e.data.method, code=200, reason=sip_status_messages[200])) self.end_time = ISOTimestamp.now() notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator=e.data.originator, end_reason=e.data.disconnect_reason)) elif getattr(e.data, 'method', None) == 'BYE' and e.data.originator == 'remote': notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator=e.data.originator, method=e.data.method, code=200, reason=sip_status_messages[200])) notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator=e.data.originator, code=0, reason=None, failure_reason=e.data.disconnect_reason, redirect_identities=None)) else: if e.data.originator == 'remote': code = e.data.code reason = e.data.reason elif e.data.disconnect_reason == 'timeout': code = 408 reason = 'timeout' else: code = 0 reason = None if e.data.originator == 'remote' and code // 100 == 3: redirect_identities = e.data.headers.get('Contact', []) else: redirect_identities = None notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=code, reason=reason)) notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator=e.data.originator, code=code, reason=reason, failure_reason=e.data.disconnect_reason, redirect_identities=redirect_identities)) else: if cancelling: notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=487, reason='Session Cancelled', failure_reason='user request', redirect_identities=None)) else: self.end_time = ISOTimestamp.now() notification_center.post_notification('SIPSessionDidEnd', self, NotificationData(originator='local', end_reason='user request')) finally: for stream in streams: stream.end() notification_center.remove_observer(self, sender=self._invitation) self.greenlet = None self.state = 'terminated' @check_state(['connected']) @check_transfer_state(None, None) @run_in_twisted_thread def transfer(self, target_uri, replaced_session=None): notification_center = NotificationCenter() notification_center.post_notification('SIPSessionTransferNewOutgoing', self, NotificationData(transfer_destination=target_uri)) try: self._invitation.transfer(target_uri, replaced_session.dialog_id if replaced_session is not None else None) except SIPCoreError as e: notification_center.post_notification('SIPSessionTransferDidFail', sender=self, data=NotificationData(code=500, reason=str(e))) @check_state(['connected', 'received_proposal', 'sending_proposal', 'accepting_proposal', 'rejecting_proposal', 'cancelling_proposal']) @check_transfer_state('incoming', 'starting') def accept_transfer(self): notification_center = NotificationCenter() notification_center.post_notification('SIPSessionTransferDidStart', sender=self) @check_state(['connected', 'received_proposal', 'sending_proposal', 'accepting_proposal', 'rejecting_proposal', 'cancelling_proposal']) @check_transfer_state('incoming', 'starting') def reject_transfer(self, code=603, reason=None): notification_center = NotificationCenter() notification_center.post_notification('SIPSessionTransferDidFail', self, NotificationData(code=code, reason=reason or sip_status_messages[code])) @property def dialog_id(self): return self._invitation.dialog_id if self._invitation is not None else None @property def local_identity(self): if self._invitation is not None and self._invitation.local_identity is not None: return self._invitation.local_identity else: return self._local_identity @property def peer_address(self): return self._invitation.peer_address if self._invitation is not None else None @property def remote_identity(self): if self._invitation is not None and self._invitation.remote_identity is not None: return self._invitation.remote_identity else: return self._remote_identity @property def remote_user_agent(self): return self._invitation.remote_user_agent if self._invitation is not None else None def _cancel_hold(self): notification_center = NotificationCenter() try: self._invitation.cancel_reinvite() while True: try: notification = self._channel.wait() except MediaStreamDidFailError: continue if notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=notification.data.code, reason=notification.data.reason)) if notification.data.code == 200: self.end() return False elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) break except SIPCoreError as e: notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', code=0, reason=None, failure_reason='SIP core error: %s' % str(e), redirect_identities=None)) except InvitationDisconnectedError as e: self.greenlet = None notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = notification_center self.handle_notification(notification) return False return True def _send_hold(self): self.state = 'sending_proposal' self.greenlet = api.getcurrent() notification_center = NotificationCenter() unhandled_notifications = [] try: local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 for stream in self.streams: local_sdp.media[stream.index] = stream.get_local_media(remote_sdp=None, index=stream.index) self._invitation.send_reinvite(sdp=local_sdp) received_invitation_state = False received_sdp_update = False with api.timeout(self.short_reinvite_timeout): while not received_invitation_state or not received_sdp_update: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for stream in self.streams: stream.update(local_sdp, remote_sdp, stream.index) elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=notification.data.code, reason=notification.data.reason)) elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) except InvitationDisconnectedError as e: self.greenlet = None notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = notification_center self.handle_notification(notification) return except api.TimeoutError: if not self._cancel_hold(): return except SIPCoreError: pass self.greenlet = None self.on_hold = True self.state = 'connected' hold_supported_streams = (stream for stream in self.streams if stream.hold_supported) notification_center.post_notification('SIPSessionDidChangeHoldState', self, NotificationData(originator='local', on_hold=True, partial=any(not stream.on_hold_by_local for stream in hold_supported_streams))) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: self._hold_in_progress = False else: for stream in self.streams: stream.unhold() self._send_unhold() def _send_unhold(self): self.state = 'sending_proposal' self.greenlet = api.getcurrent() notification_center = NotificationCenter() unhandled_notifications = [] try: local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 for stream in self.streams: local_sdp.media[stream.index] = stream.get_local_media(remote_sdp=None, index=stream.index) self._invitation.send_reinvite(sdp=local_sdp) received_invitation_state = False received_sdp_update = False with api.timeout(self.short_reinvite_timeout): while not received_invitation_state or not received_sdp_update: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for stream in self.streams: stream.update(local_sdp, remote_sdp, stream.index) elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=notification.data.code, reason=notification.data.reason)) elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) except InvitationDisconnectedError as e: self.greenlet = None notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = notification_center self.handle_notification(notification) return except api.TimeoutError: if not self._cancel_hold(): return except SIPCoreError: pass self.greenlet = None self.on_hold = False self.state = 'connected' notification_center.post_notification('SIPSessionDidChangeHoldState', self, NotificationData(originator='local', on_hold=False, partial=False)) for notification in unhandled_notifications: self.handle_notification(notification) if self._hold_in_progress: for stream in self.streams: stream.hold() self._send_hold() def _fail(self, originator, code, reason, error, reason_header=None): notification_center = NotificationCenter() prev_inv_state = self._invitation.state self.state = 'terminating' if prev_inv_state not in (None, 'incoming', 'outgoing', 'early', 'connecting'): notification_center.post_notification('SIPSessionWillEnd', self, NotificationData(originator=originator)) if self._invitation.state not in (None, 'disconnecting', 'disconnected'): try: if self._invitation.direction == 'incoming' and self._invitation.state in ('incoming', 'early'): if 400<=code<=699 and reason is not None: self._invitation.send_response(code, extra_headers=[reason_header] if reason_header is not None else []) else: self._invitation.end(extra_headers=[reason_header] if reason_header is not None else []) with api.timeout(1): while True: notification = self._channel.wait() if notification.name == 'SIPInvitationChangedState' and notification.data.state == 'disconnected': if prev_inv_state in ('connecting', 'connected'): if notification.data.disconnect_reason in ('timeout', 'missing ACK'): sip_code = 200 sip_reason = 'OK' originator = 'local' elif hasattr(notification.data, 'method'): sip_code = 200 sip_reason = 'OK' originator = 'remote' else: sip_code = notification.data.code sip_reason = notification.data.reason originator = 'local' notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator=originator, method='BYE', code=sip_code, reason=sip_reason)) elif self._invitation.direction == 'incoming' and prev_inv_state in ('incoming', 'early'): ack_received = notification.data.disconnect_reason != 'missing ACK' notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=code, reason=reason, ack_received=ack_received)) elif self._invitation.direction == 'outgoing' and prev_inv_state in ('outgoing', 'early'): notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='INVITE', code=487, reason='Session Cancelled')) break except SIPCoreError: pass except api.TimeoutError: if prev_inv_state in ('connecting', 'connected'): notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='local', method='BYE', code=408, reason=sip_status_messages[408])) notification_center.remove_observer(self, sender=self._invitation) self.state = 'terminated' notification_center.post_notification('SIPSessionDidFail', self, NotificationData(originator=originator, code=code, reason=reason, failure_reason=error, redirect_identities=None)) self.greenlet = None def _fail_proposal(self, originator, error): notification_center = NotificationCenter() for stream in self.proposed_streams: try: notification_center.remove_observer(self, sender=stream) except KeyError: # _fail_proposal can be called from reject_proposal, which means the stream will # not have been initialized or the session registered as an observer for it. pass else: stream.deactivate() stream.end() if originator == 'remote' and self._invitation.sub_state == 'received_proposal': try: self._invitation.send_response(488 if self.proposed_streams else 500) except SIPCoreError: pass else: notification_center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=500, reason=sip_status_messages[500], ack_received='unknown')) notification_center.post_notification('SIPSessionHadProposalFailure', self, NotificationData(originator=originator, failure_reason=error, proposed_streams=self.proposed_streams[:])) self.state = 'connected' self.proposed_streams = None self.greenlet = None @run_in_green_thread def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPInvitationChangedState(self, notification): if self.state == 'terminated': return if notification.data.originator == 'remote' and notification.data.state not in ('disconnecting', 'disconnected'): contact_header = notification.data.headers.get('Contact', None) if contact_header and 'isfocus' in contact_header[0].parameters: self.remote_focus = True if self.greenlet is not None: if notification.data.state == 'disconnected' and notification.data.prev_state != 'disconnecting': self._channel.send_exception(InvitationDisconnectedError(notification.sender, notification.data)) else: self._channel.send(notification) else: self.greenlet = api.getcurrent() unhandled_notifications = [] try: if notification.data.state == 'connected' and notification.data.sub_state == 'received_proposal': self.state = 'received_proposal' try: proposed_remote_sdp = self._invitation.sdp.proposed_remote active_remote_sdp = self._invitation.sdp.active_remote if len(proposed_remote_sdp.media) < len(active_remote_sdp.media): engine = Engine() self._invitation.send_response(488, extra_headers=[WarningHeader(399, engine.user_agent, 'Streams cannot be deleted from the SDP')]) self.state = 'connected' notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=488, reason=sip_status_messages[488], ack_received='unknown')) return for stream in self.streams: if not stream.validate_update(proposed_remote_sdp, stream.index): engine = Engine() self._invitation.send_response(488, extra_headers=[WarningHeader(399, engine.user_agent, 'Failed to update media stream index %d' % stream.index)]) self.state = 'connected' notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=488, reason=sip_status_messages[488], ack_received='unknown')) return added_media_indexes = set() removed_media_indexes = set() reused_media_indexes = set() for index, media_stream in enumerate(proposed_remote_sdp.media): if index >= len(active_remote_sdp.media): added_media_indexes.add(index) elif media_stream.port == 0 and active_remote_sdp.media[index].port > 0: removed_media_indexes.add(index) elif media_stream.port > 0 and active_remote_sdp.media[index].port == 0: reused_media_indexes.add(index) elif media_stream.media != active_remote_sdp.media[index].media: added_media_indexes.add(index) removed_media_indexes.add(index) if added_media_indexes | reused_media_indexes and removed_media_indexes: engine = Engine() self._invitation.send_response(488, extra_headers=[WarningHeader(399, engine.user_agent, 'Both removing AND adding a media stream is currently not supported')]) self.state = 'connected' notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=488, reason=sip_status_messages[488], ack_received='unknown')) return elif added_media_indexes | reused_media_indexes: self.proposed_streams = [] for index in added_media_indexes | reused_media_indexes: media_stream = proposed_remote_sdp.media[index] if media_stream.port != 0: for stream_type in MediaStreamRegistry: try: stream = stream_type.new_from_sdp(self, proposed_remote_sdp, index) except UnknownStreamError: continue except InvalidStreamError as e: log.error("Invalid stream: {}".format(e)) break except Exception as e: log.exception("Exception occurred while setting up stream from SDP: {}".format(e)) break else: stream.index = index self.proposed_streams.append(stream) break if self.proposed_streams: self._invitation.send_response(100) notification.center.post_notification('SIPSessionNewProposal', sender=self, data=NotificationData(originator='remote', proposed_streams=self.proposed_streams[:])) else: self._invitation.send_response(488) self.state = 'connected' notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=488, reason=sip_status_messages[488], ack_received='unknown')) return else: local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 removed_streams = [stream for stream in self.streams if stream.index in removed_media_indexes] prev_on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote) for stream in removed_streams: notification.center.remove_observer(self, sender=stream) stream.deactivate() media = local_sdp.media[stream.index] media.port = 0 media.attributes = [] media.bandwidth_info = [] for stream in self.streams: local_sdp.media[stream.index] = stream.get_local_media(remote_sdp=proposed_remote_sdp, index=stream.index) try: self._invitation.send_response(200, sdp=local_sdp) except PJSIPError: for stream in removed_streams: self.streams.remove(stream) stream.end() if removed_streams: self.end() return else: try: self._invitation.send_response(488) except PJSIPError: self.end() return else: for stream in removed_streams: self.streams.remove(stream) stream.end() notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=200, reason=sip_status_messages[200], ack_received='unknown')) received_invitation_state = False received_sdp_update = False while not received_sdp_update or not received_invitation_state or self._channel: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for stream in self.streams: stream.update(local_sdp, remote_sdp, stream.index) elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) else: unhandled_notifications.append(notification) else: unhandled_notifications.append(notification) on_hold_streams = set(stream for stream in self.streams if stream.hold_supported and stream.on_hold_by_remote) if on_hold_streams != prev_on_hold_streams: hold_supported_streams = (stream for stream in self.streams if stream.hold_supported) notification.center.post_notification('SIPSessionDidChangeHoldState', self, NotificationData(originator='remote', on_hold=bool(on_hold_streams), partial=bool(on_hold_streams) and any(not stream.on_hold_by_remote for stream in hold_supported_streams))) if removed_media_indexes: notification.center.post_notification('SIPSessionDidRenegotiateStreams', self, NotificationData(originator='remote', added_streams=[], removed_streams=removed_streams)) except InvitationDisconnectedError as e: self.greenlet = None self.state = 'connected' notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = NotificationCenter() self.handle_notification(notification) except SIPCoreError: self.end() else: self.state = 'connected' elif notification.data.state == 'connected' and notification.data.sub_state == 'received_proposal_request': self.state = 'received_proposal_request' try: # An empty proposal was received, generate an offer self._invitation.send_response(100) local_sdp = SDPSession.new(self._invitation.sdp.active_local) local_sdp.version += 1 connection_address = host.outgoing_ip_for(self._invitation.peer_address.ip) if local_sdp.connection is not None: local_sdp.connection.address = connection_address for index, stream in enumerate(self.streams): stream.reset(index) media = stream.get_local_media(remote_sdp=None, index=index) if media.connection is not None: media.connection.address = connection_address local_sdp.media[stream.index] = media self._invitation.send_response(200, sdp=local_sdp) notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=200, reason=sip_status_messages[200], ack_received='unknown')) received_invitation_state = False received_sdp_update = False while not received_sdp_update or not received_invitation_state or self._channel: notification = self._channel.wait() if notification.name == 'SIPInvitationGotSDPUpdate': received_sdp_update = True if notification.data.succeeded: local_sdp = notification.data.local_sdp remote_sdp = notification.data.remote_sdp for stream in self.streams: stream.update(local_sdp, remote_sdp, stream.index) elif notification.name == 'SIPInvitationChangedState': if notification.data.state == 'connected' and notification.data.sub_state == 'normal': received_invitation_state = True elif notification.data.state == 'disconnected': raise InvitationDisconnectedError(notification.sender, notification.data) else: unhandled_notifications.append(notification) else: unhandled_notifications.append(notification) except InvitationDisconnectedError as e: self.greenlet = None self.state = 'connected' notification = Notification('SIPInvitationChangedState', e.invitation, e.data) notification.center = NotificationCenter() self.handle_notification(notification) except SIPCoreError: raise # FIXME else: self.state = 'connected' elif notification.data.prev_state == notification.data.state == 'connected' and notification.data.prev_sub_state == 'received_proposal' and notification.data.sub_state == 'normal': if notification.data.originator == 'local' and notification.data.code == 487: proposed_streams = self.proposed_streams self.proposed_streams = None self.state = 'connected' notification.center.post_notification('SIPSessionProposalRejected', self, NotificationData(originator='remote', code=notification.data.code, reason=notification.data.reason, proposed_streams=proposed_streams)) if self._hold_in_progress: self._send_hold() elif notification.data.state == 'disconnected': if self.state == 'incoming': self.state = 'terminated' if notification.data.originator == 'remote': notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator='remote', method='INVITE', code=487, reason='Session Cancelled', ack_received='unknown')) notification.center.post_notification('SIPSessionDidFail', self, NotificationData(originator='remote', code=487, reason='Session Cancelled', failure_reason=notification.data.disconnect_reason, redirect_identities=None)) else: # There must have been an error involved notification.center.post_notification('SIPSessionDidFail', self, NotificationData(originator='local', code=0, reason=None, failure_reason=notification.data.disconnect_reason, redirect_identities=None)) else: self.state = 'terminated' notification.center.post_notification('SIPSessionWillEnd', self, NotificationData(originator=notification.data.originator)) for stream in self.streams: notification.center.remove_observer(self, sender=stream) stream.deactivate() stream.end() if notification.data.originator == 'remote': if hasattr(notification.data, 'method'): notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator=notification.data.originator, method=notification.data.method, code=200, reason=sip_status_messages[200])) else: notification.center.post_notification('SIPSessionDidProcessTransaction', self, NotificationData(originator=notification.data.originator, method='INVITE', code=notification.data.code, reason=notification.data.reason)) self.end_time = ISOTimestamp.now() notification.center.post_notification('SIPSessionDidEnd', self, NotificationData(originator=notification.data.originator, end_reason=notification.data.disconnect_reason)) notification.center.remove_observer(self, sender=self._invitation) finally: self.greenlet = None for notification in unhandled_notifications: self.handle_notification(notification) def _NH_SIPInvitationGotSDPUpdate(self, notification): if self.greenlet is not None: self._channel.send(notification) def _NH_MediaStreamDidInitialize(self, notification): if self.greenlet is not None: self._channel.send(notification) def _NH_RTPStreamDidEnableEncryption(self, notification): if notification.sender.type != 'audio': return audio_stream = notification.sender if audio_stream.encryption.type == 'ZRTP': # start ZRTP on the video stream, if applicable try: video_stream = next(stream for stream in self.streams or [] if stream.type=='video') except StopIteration: return if video_stream.encryption.type == 'ZRTP' and not video_stream.encryption.active: video_stream.encryption.zrtp._enable(audio_stream) def _NH_MediaStreamDidStart(self, notification): stream = notification.sender if stream.type == 'audio' and stream.encryption.type == 'ZRTP': stream.encryption.zrtp._enable() elif stream.type == 'video' and stream.encryption.type == 'ZRTP': # start ZRTP on the video stream, if applicable try: audio_stream = next(stream for stream in self.streams or [] if stream.type=='audio') except StopIteration: pass else: if audio_stream.encryption.type == 'ZRTP' and audio_stream.encryption.active: stream.encryption.zrtp._enable(audio_stream) if self.greenlet is not None: self._channel.send(notification) def _NH_MediaStreamDidNotInitialize(self, notification): if self.greenlet is not None and self.state not in ('terminating', 'terminated'): self._channel.send_exception(MediaStreamDidNotInitializeError(notification.sender, notification.data)) def _NH_MediaStreamDidFail(self, notification): if self.greenlet is not None: if self.state not in ('terminating', 'terminated'): self._channel.send_exception(MediaStreamDidFailError(notification.sender, notification.data)) else: stream = notification.sender if self.streams == [stream]: self.end() else: try: self.remove_stream(stream) except IllegalStateError: self.end() @implementer(IObserver) class SessionManager(object, metaclass=Singleton): def __init__(self): self.sessions = [] self.state = None self._channel = coros.queue() def start(self): self.state = 'starting' notification_center = NotificationCenter() notification_center.post_notification('SIPSessionManagerWillStart', sender=self) notification_center.add_observer(self, 'SIPInvitationChangedState') notification_center.add_observer(self, 'SIPSessionNewIncoming') notification_center.add_observer(self, 'SIPSessionNewOutgoing') notification_center.add_observer(self, 'SIPSessionDidFail') notification_center.add_observer(self, 'SIPSessionDidEnd') self.state = 'started' notification_center.post_notification('SIPSessionManagerDidStart', sender=self) def stop(self): self.state = 'stopping' notification_center = NotificationCenter() notification_center.post_notification('SIPSessionManagerWillEnd', sender=self) for session in self.sessions: session.end() while self.sessions: self._channel.wait() notification_center.remove_observer(self, 'SIPInvitationChangedState') notification_center.remove_observer(self, 'SIPSessionNewIncoming') notification_center.remove_observer(self, 'SIPSessionNewOutgoing') notification_center.remove_observer(self, 'SIPSessionDidFail') notification_center.remove_observer(self, 'SIPSessionDidEnd') self.state = 'stopped' notification_center.post_notification('SIPSessionManagerDidEnd', sender=self) @run_in_twisted_thread def handle_notification(self, notification): if notification.name == 'SIPInvitationChangedState' and notification.data.state == 'incoming': account_manager = AccountManager() account = account_manager.find_account(notification.data.request_uri) if account is None: notification.sender.send_response(404) return notification.sender.send_response(100) session = Session(account) session.init_incoming(notification.sender, notification.data) elif notification.name in ('SIPSessionNewIncoming', 'SIPSessionNewOutgoing'): self.sessions.append(notification.sender) elif notification.name in ('SIPSessionDidFail', 'SIPSessionDidEnd'): self.sessions.remove(notification.sender) if self.state == 'stopping': self._channel.send(notification) diff --git a/sipsimple/streams/rtp/__init__.py b/sipsimple/streams/rtp/__init__.py index 7cc6e73b..8f1635ad 100644 --- a/sipsimple/streams/rtp/__init__.py +++ b/sipsimple/streams/rtp/__init__.py @@ -1,619 +1,619 @@ """ Handling of RTP media streams according to RFC3550, RFC3605, RFC3581, RFC2833 and RFC3711, RFC3489 and RFC5245. """ __all__ = ['RTPStream'] import weakref from abc import ABCMeta, abstractmethod from application.notification import IObserver, NotificationCenter, NotificationData, ObserverWeakrefProxy from application.python import Null from threading import RLock from zope.interface import implementer from sipsimple.account import BonjourAccount from sipsimple.configuration.settings import SIPSimpleSettings from sipsimple.core import RTPTransport, SIPCoreError, SIPURI from sipsimple.lookup import DNSLookup from sipsimple.streams import IMediaStream, InvalidStreamError, MediaStreamType, UnknownStreamError from sipsimple.threading import run_in_thread @implementer(IObserver) class ZRTPStreamOptions(object): def __init__(self, stream): self._stream = stream self.__dict__['master'] = None self.__dict__['sas'] = None self.__dict__['verified'] = False self.__dict__['peer_name'] = '' @property def sas(self): if self.master is not None: return self.master.encryption.zrtp.sas return self.__dict__['sas'] @property def verified(self): if self.master is not None: return self.master.encryption.zrtp.verified return self.__dict__['verified'] @verified.setter def verified(self, verified): if self.__dict__['verified'] == verified: return if self.sas is None: raise AttributeError('Cannot verify peer before SAS is received') if self.master is not None: self.master.encryption.zrtp.verified = verified else: rtp_transport = self._stream._rtp_transport if rtp_transport is not None: @run_in_thread('file-io') def update_verified(rtp_transport, verified): rtp_transport.set_zrtp_sas_verified(verified) notification_center = NotificationCenter() notification_center.post_notification('RTPStreamZRTPVerifiedStateChanged', sender=self._stream, data=NotificationData(verified=verified)) self.__dict__['verified'] = verified update_verified(rtp_transport, verified) @property def peer_id(self): if self.master is not None: return self.master.encryption.zrtp.peer_id rtp_transport = self._stream._rtp_transport if rtp_transport is None: return None return rtp_transport.zrtp_peer_id @property def peer_name(self): if self.master is not None: return self.master.encryption.zrtp.peer_name return self.__dict__['peer_name'] @peer_name.setter def peer_name(self, name): if self.__dict__['peer_name'] == name: return if self.master is not None: self.master.encryption.zrtp.peer_name = name else: rtp_transport = self._stream._rtp_transport if rtp_transport is not None: @run_in_thread('file-io') def update_name(rtp_transport, name): rtp_transport.zrtp_peer_name = name notification_center = NotificationCenter() notification_center.post_notification('RTPStreamZRTPPeerNameChanged', sender=self._stream, data=NotificationData(name=name)) self.__dict__['peer_name'] = name update_name(rtp_transport, name) @property def master(self): return self.__dict__['master'] @master.setter def master(self, master): old_master = self.__dict__['master'] if old_master is master: return notification_center = NotificationCenter() if old_master is not None: notification_center.remove_observer(self, sender=old_master) if master is not None: notification_center.add_observer(self, sender=master) self.__dict__['master'] = master def _enable(self, master_stream=None): rtp_transport = self._stream._rtp_transport if rtp_transport is None: return if master_stream is not None and not (master_stream.encryption.active and master_stream.encryption.type == 'ZRTP'): raise RuntimeError('Master stream must have ZRTP encryption activated') rtp_transport.set_zrtp_enabled(True, master_stream) self.master = master_stream def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_RTPStreamZRTPReceivedSAS(self, notification): # ZRTP begins on the audio stream, so this notification will only be processed # by the other streams self.__dict__['sas'] = notification.data.sas self.__dict__['verified'] = notification.data.verified self.__dict__['peer_name'] = notification.data.peer_name notification.center.post_notification(notification.name, sender=self._stream, data=notification.data) def _NH_RTPStreamZRTPVerifiedStateChanged(self, notification): self.__dict__['verified'] = notification.data.verified notification.center.post_notification(notification.name, sender=self._stream, data=notification.data) def _NH_RTPStreamZRTPPeerNameChanged(self, notification): self.__dict__['peer_name'] = notification.data.name notification.center.post_notification(notification.name, sender=self._stream, data=notification.data) def _NH_MediaStreamDidEnd(self, notification): self.master = None @implementer(IObserver) class RTPStreamEncryption(object): def __init__(self, stream): self._stream_ref = weakref.ref(stream) # Keep a weak reference before the stream is initialized to avoid a memory cycle that would delay releasing audio resources self._stream = None # We will store the actual reference once it's initialized and we're guaranteed to get MediaStreamDidEnd and do the cleanup self._rtp_transport = None self.__dict__['type'] = None self.__dict__['zrtp'] = None notification_center = NotificationCenter() notification_center.add_observer(ObserverWeakrefProxy(self), name='MediaStreamDidInitialize') notification_center.add_observer(ObserverWeakrefProxy(self), name='MediaStreamDidNotInitialize') @property def active(self): rtp_transport = self._rtp_transport if rtp_transport is None: return False if self.type == 'SRTP/SDES': return rtp_transport.srtp_active elif self.type == 'ZRTP': return rtp_transport.zrtp_active return False @property def cipher(self): rtp_transport = self._rtp_transport if rtp_transport is None: return None if self.type == 'SRTP/SDES': return rtp_transport.srtp_cipher elif self.type == 'ZRTP': return rtp_transport.zrtp_cipher return None @property def type(self): return self.__dict__['type'] @property def zrtp(self): zrtp = self.__dict__['zrtp'] if zrtp is None: raise RuntimeError('ZRTP options have not been initialized') return zrtp def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_MediaStreamDidInitialize(self, notification): stream = notification.sender if stream is self._stream_ref(): self._stream = stream self._rtp_transport = stream._rtp_transport notification.center.remove_observer(ObserverWeakrefProxy(self), name='MediaStreamDidInitialize') notification.center.remove_observer(ObserverWeakrefProxy(self), name='MediaStreamDidNotInitialize') notification.center.add_observer(self, sender=self._stream) notification.center.add_observer(self, sender=self._rtp_transport) encryption = stream._srtp_encryption or '' if encryption.startswith('sdes'): self.__dict__['type'] = 'SRTP/SDES' elif encryption == 'zrtp': self.__dict__['type'] = 'ZRTP' self.__dict__['zrtp'] = ZRTPStreamOptions(stream) def _NH_MediaStreamDidNotInitialize(self, notification): if notification.sender is self._stream_ref(): notification.center.remove_observer(ObserverWeakrefProxy(self), name='MediaStreamDidInitialize') notification.center.remove_observer(ObserverWeakrefProxy(self), name='MediaStreamDidNotInitialize') self._stream_ref = None self._stream = None def _NH_MediaStreamDidStart(self, notification): if self.type == 'SRTP/SDES': stream = notification.sender if self.active: notification.center.post_notification('RTPStreamDidEnableEncryption', sender=stream) else: reason = 'Not supported by remote' notification.center.post_notification('RTPStreamDidNotEnableEncryption', sender=stream, data=NotificationData(reason=reason)) def _NH_MediaStreamDidEnd(self, notification): notification.center.remove_observer(self, sender=self._stream) notification.center.remove_observer(self, sender=self._rtp_transport) self._stream = None self._stream_ref = None self._rtp_transport = None self.__dict__['type'] = None self.__dict__['zrtp'] = None def _NH_RTPTransportZRTPSecureOn(self, notification): stream = self._stream with stream._lock: if stream.state == "ENDED": return notification.center.post_notification('RTPStreamDidEnableEncryption', sender=stream) def _NH_RTPTransportZRTPSecureOff(self, notification): # We should never get here because we don't allow disabling encryption -Saul pass def _NH_RTPTransportZRTPReceivedSAS(self, notification): stream = self._stream with stream._lock: if stream.state == "ENDED": return self.zrtp.__dict__['sas'] = sas = notification.data.sas self.zrtp.__dict__['verified'] = verified = notification.data.verified self.zrtp.__dict__['peer_name'] = peer_name = notification.sender.zrtp_peer_name notification.center.post_notification('RTPStreamZRTPReceivedSAS', sender=stream, data=NotificationData(sas=sas, verified=verified, peer_name=peer_name)) def _NH_RTPTransportZRTPLog(self, notification): stream = self._stream with stream._lock: if stream.state == "ENDED": return notification.center.post_notification('RTPStreamZRTPLog', sender=stream, data=notification.data) def _NH_RTPTransportZRTPNegotiationFailed(self, notification): stream = self._stream with stream._lock: if stream.state == "ENDED": return reason = 'Negotiation failed: %s' % notification.data.reason notification.center.post_notification('RTPStreamDidNotEnableEncryption', sender=stream, data=NotificationData(reason=reason)) def _NH_RTPTransportZRTPNotSupportedByRemote(self, notification): stream = self._stream with stream._lock: if stream.state == "ENDED": return reason = 'ZRTP not supported by remote' notification.center.post_notification('RTPStreamDidNotEnableEncryption', sender=stream, data=NotificationData(reason=reason)) class RTPStreamType(ABCMeta, MediaStreamType): pass @implementer(IMediaStream, IObserver) class RTPStream(object, metaclass=RTPStreamType): type = None priority = None hold_supported = True def __init__(self): self.notification_center = NotificationCenter() self.on_hold_by_local = False self.on_hold_by_remote = False self.direction = None self.state = "NULL" self.session = None self.encryption = RTPStreamEncryption(self) self._transport = None self._hold_request = None self._ice_state = "NULL" self._lock = RLock() self._rtp_transport = None self._try_ice = False self._srtp_encryption = None self._remote_rtp_address_sdp = None self._remote_rtp_port_sdp = None self._initialized = False self._done = False self._failure_reason = None @property def codec(self): return self._transport.codec if self._transport else None @property def sample_rate(self): return self._transport.sample_rate if self._transport else None @property def statistics(self): return self._transport.statistics if self._transport else None @property def local_rtp_address(self): return self._rtp_transport.local_rtp_address if self._rtp_transport else None @property def local_rtp_port(self): return self._rtp_transport.local_rtp_port if self._rtp_transport else None @property def local_rtp_candidate(self): return self._rtp_transport.local_rtp_candidate if self._rtp_transport else None @property def remote_rtp_address(self): if self._ice_state == "IN_USE": return self._rtp_transport.remote_rtp_address if self._rtp_transport else None return self._remote_rtp_address_sdp if self._rtp_transport else None @property def remote_rtp_port(self): if self._ice_state == "IN_USE": return self._rtp_transport.remote_rtp_port if self._rtp_transport else None return self._remote_rtp_port_sdp if self._rtp_transport else None @property def remote_rtp_candidate(self): return self._rtp_transport.remote_rtp_candidate if self._rtp_transport else None @property def ice_active(self): return self._ice_state == "IN_USE" @property def on_hold(self): return self.on_hold_by_local or self.on_hold_by_remote @abstractmethod def start(self, local_sdp, remote_sdp, stream_index): raise NotImplementedError @abstractmethod def update(self, local_sdp, remote_sdp, stream_index): raise NotImplementedError @abstractmethod def validate_update(self, remote_sdp, stream_index): raise NotImplementedError @abstractmethod def deactivate(self): raise NotImplementedError @abstractmethod def end(self): raise NotImplementedError @abstractmethod def reset(self, stream_index): raise NotImplementedError def hold(self): with self._lock: if self.on_hold_by_local or self._hold_request == 'hold': return if self.state == "ESTABLISHED" and self.direction != "inactive": self._pause() self._hold_request = 'hold' def unhold(self): with self._lock: if (not self.on_hold_by_local and self._hold_request != 'hold') or self._hold_request == 'unhold': return if self.state == "ESTABLISHED" and self._hold_request == 'hold': self._resume() self._hold_request = None if self._hold_request == 'hold' else 'unhold' @classmethod def new_from_sdp(cls, session, remote_sdp, stream_index): # TODO: actually validate the SDP settings = SIPSimpleSettings() remote_stream = remote_sdp.media[stream_index] - if remote_stream.media != cls.type: + if remote_stream.media != cls.type.encode(): raise UnknownStreamError if remote_stream.transport not in (b'RTP/AVP', b'RTP/SAVP'): raise InvalidStreamError("expected RTP/AVP or RTP/SAVP transport in %s stream, got %s" % (cls.type, remote_stream.transport.decode())) local_encryption_policy = session.account.rtp.encryption.key_negotiation if session.account.rtp.encryption.enabled else None if local_encryption_policy == "sdes_mandatory" and not b"crypto" in remote_stream.attributes: raise InvalidStreamError("SRTP/SDES is locally mandatory but it's not remotely enabled") if remote_stream.transport == b'RTP/SAVP' and b"crypto" in remote_stream.attributes and local_encryption_policy not in ("opportunistic", "sdes_optional", "sdes_mandatory"): raise InvalidStreamError("SRTP/SDES is remotely mandatory but it's not locally enabled") account_preferred_codecs = getattr(session.account.rtp, '%s_codec_list' % cls.type) general_codecs = getattr(settings.rtp, '%s_codec_list' % cls.type) supported_codecs = account_preferred_codecs or general_codecs if not any(codec for codec in remote_stream.codec_list if codec in supported_codecs): raise InvalidStreamError("no compatible codecs found") stream = cls() stream._incoming_remote_sdp = remote_sdp stream._incoming_stream_index = stream_index return stream def initialize(self, session, direction): with self._lock: if self.state != "NULL": raise RuntimeError("%sStream.initialize() may only be called in the NULL state" % self.type.capitalize()) self.state = "INITIALIZING" self.session = session local_encryption_policy = session.account.rtp.encryption.key_negotiation if session.account.rtp.encryption.enabled else None if hasattr(self, "_incoming_remote_sdp") and hasattr(self, '_incoming_stream_index'): # ICE attributes could come at the session level or at the media level remote_stream = self._incoming_remote_sdp.media[self._incoming_stream_index] self._try_ice = self.session.account.nat_traversal.use_ice and ((remote_stream.has_ice_attributes or self._incoming_remote_sdp.has_ice_attributes) and remote_stream.has_ice_candidates) if "zrtp-hash" in remote_stream.attributes: incoming_stream_encryption = 'zrtp' elif "crypto" in remote_stream.attributes: incoming_stream_encryption = 'sdes_mandatory' if remote_stream.transport == 'RTP/SAVP' else 'sdes_optional' else: incoming_stream_encryption = None if incoming_stream_encryption is not None and local_encryption_policy == 'opportunistic': self._srtp_encryption = incoming_stream_encryption else: self._srtp_encryption = 'zrtp' if local_encryption_policy == 'opportunistic' else local_encryption_policy else: self._try_ice = self.session.account.nat_traversal.use_ice self._srtp_encryption = 'zrtp' if local_encryption_policy == 'opportunistic' else local_encryption_policy if self._try_ice: if self.session.account.nat_traversal.stun_server_list: stun_servers = list((server.host, server.port) for server in self.session.account.nat_traversal.stun_server_list) self._init_rtp_transport(stun_servers) elif not isinstance(self.session.account, BonjourAccount): dns_lookup = DNSLookup() self.notification_center.add_observer(self, sender=dns_lookup) dns_lookup.lookup_service(SIPURI(self.session.account.id.domain), "stun") else: self._init_rtp_transport() def get_local_media(self, remote_sdp=None, index=0): with self._lock: if self.state not in ("INITIALIZED", "WAIT_ICE", "ESTABLISHED"): raise RuntimeError("%sStream.get_local_media() may only be called in the INITIALIZED, WAIT_ICE or ESTABLISHED states" % self.type.capitalize()) if remote_sdp is None: # offer old_direction = self._transport.direction if old_direction is None: new_direction = "sendrecv" elif b"send" in old_direction: new_direction = ("sendonly" if (self._hold_request == 'hold' or (self._hold_request is None and self.on_hold_by_local)) else "sendrecv") else: new_direction = ("inactive" if (self._hold_request == 'hold' or (self._hold_request is None and self.on_hold_by_local)) else "recvonly") else: new_direction = None new_direction = new_direction.encode() if new_direction else None return self._transport.get_local_media(remote_sdp, index, new_direction) # Notifications def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_DNSLookupDidFail(self, notification): self.notification_center.remove_observer(self, sender=notification.sender) with self._lock: if self.state == "ENDED": return self._init_rtp_transport() def _NH_DNSLookupDidSucceed(self, notification): self.notification_center.remove_observer(self, sender=notification.sender) with self._lock: if self.state == "ENDED": return self._init_rtp_transport(notification.data.result) def _NH_RTPTransportDidInitialize(self, notification): rtp_transport = notification.sender with self._lock: if self.state == "ENDED": self.notification_center.remove_observer(self, sender=rtp_transport) return del self._rtp_args del self._stun_servers remote_sdp = self.__dict__.pop('_incoming_remote_sdp', None) stream_index = self.__dict__.pop('_incoming_stream_index', None) try: if remote_sdp is not None: transport = self._create_transport(rtp_transport, remote_sdp=remote_sdp, stream_index=stream_index) self._save_remote_sdp_rtp_info(remote_sdp, stream_index) else: transport = self._create_transport(rtp_transport) except SIPCoreError as e: self.state = "ENDED" self.notification_center.remove_observer(self, sender=rtp_transport) self.notification_center.post_notification('MediaStreamDidNotInitialize', sender=self, data=NotificationData(reason=e.args[0])) return self._rtp_transport = rtp_transport self._transport = transport self.notification_center.add_observer(self, sender=transport) self._initialized = True self.state = "INITIALIZED" self.notification_center.post_notification('MediaStreamDidInitialize', sender=self) def _NH_RTPTransportDidFail(self, notification): self.notification_center.remove_observer(self, sender=notification.sender) with self._lock: if self.state == "ENDED": return self._try_next_rtp_transport(notification.data.reason) def _NH_RTPTransportICENegotiationStateDidChange(self, notification): with self._lock: if self._ice_state != "NULL" or self.state not in ("INITIALIZING", "INITIALIZED", "WAIT_ICE"): return self.notification_center.post_notification('RTPStreamICENegotiationStateDidChange', sender=self, data=notification.data) def _NH_RTPTransportICENegotiationDidSucceed(self, notification): with self._lock: if self.state != "WAIT_ICE": return self._ice_state = "IN_USE" self.state = 'ESTABLISHED' self.notification_center.post_notification('RTPStreamICENegotiationDidSucceed', sender=self, data=notification.data) self.notification_center.post_notification('MediaStreamDidStart', sender=self) def _NH_RTPTransportICENegotiationDidFail(self, notification): with self._lock: if self.state != "WAIT_ICE": return self._ice_state = "FAILED" self.state = 'ESTABLISHED' self.notification_center.post_notification('RTPStreamICENegotiationDidFail', sender=self, data=notification.data) self.notification_center.post_notification('MediaStreamDidStart', sender=self) # Private methods def _init_rtp_transport(self, stun_servers=None): self._rtp_args = dict() self._rtp_args["encryption"] = self._srtp_encryption self._rtp_args["use_ice"] = self._try_ice self._stun_servers = [(None, None)] if stun_servers: self._stun_servers.extend(reversed(stun_servers)) self._try_next_rtp_transport() def _try_next_rtp_transport(self, failure_reason=None): if self._stun_servers: stun_address, stun_port = self._stun_servers.pop() try: stun_address = stun_address.encode() if stun_address else None rtp_transport = RTPTransport(ice_stun_address=stun_address, ice_stun_port=stun_port, **self._rtp_args) except SIPCoreError as e: self._try_next_rtp_transport(e.args[0]) else: self.notification_center.add_observer(self, sender=rtp_transport) try: rtp_transport.set_INIT() except SIPCoreError as e: self.notification_center.remove_observer(self, sender=rtp_transport) self._try_next_rtp_transport(e.args[0]) else: self.state = "ENDED" self.notification_center.post_notification('MediaStreamDidNotInitialize', sender=self, data=NotificationData(reason=failure_reason)) def _save_remote_sdp_rtp_info(self, remote_sdp, index): connection = remote_sdp.media[index].connection or remote_sdp.connection self._remote_rtp_address_sdp = connection.address self._remote_rtp_port_sdp = remote_sdp.media[index].port @abstractmethod def _create_transport(self, rtp_transport, remote_sdp=None, stream_index=None): raise NotImplementedError @abstractmethod def _check_hold(self, direction, is_initial): raise NotImplementedError @abstractmethod def _pause(self): raise NotImplementedError @abstractmethod def _resume(self): raise NotImplementedError from sipsimple.streams.rtp import audio, video