diff --git a/sipsimple/core/_core.sdp.pxi b/sipsimple/core/_core.sdp.pxi index 49cf3741..379a2eb5 100644 --- a/sipsimple/core/_core.sdp.pxi +++ b/sipsimple/core/_core.sdp.pxi @@ -1,1227 +1,1227 @@ import re from application.python.descriptor import WriteOnceAttribute cdef object BaseSDPSession_richcmp(object self, object other, int op) with gil: cdef int eq = 1 if op not in [2,3]: return NotImplemented if not isinstance(other, BaseSDPSession): return NotImplemented for attr in ("id", "version", "user", "net_type", "address_type", "address", "address", "name", "connection", "start_time", "stop_time", "attributes", "bandwidth_info", "media"): if getattr(self, attr) != getattr(other, attr): eq = 0 break if op == 2: return bool(eq) else: return not eq cdef pjmedia_sdp_session* _parse_sdp_session(str sdp): cdef int status cdef pjmedia_sdp_session *sdp_session status = pjmedia_sdp_parse(_get_ua()._pjsip_endpoint._pool, _str_as_str(sdp), _str_as_size(sdp), &sdp_session) if status != 0: raise PJSIPError("failed to parse SDP", status) return sdp_session cdef class BaseSDPSession: def __init__(self, *args, **kwargs): raise TypeError("BaseSDPSession cannot be instantiated directly") def __repr__(self): return "%s(%r, %r, %r, %r, %r, %r, %r, %r, %r, %r, %r, %r, %r)" % (self.__class__.__name__, self.address, self.id, self.version, self.user, self.net_type, self.address_type, self.name, self.connection, self.start_time, self.stop_time, self.attributes, self.bandwidth_info, self.media) def __str__(self): cdef char cbuf[2048] cdef int buf_len buf_len = pjmedia_sdp_print(self.get_sdp_session(), cbuf, sizeof(cbuf)) if buf_len > -1: return _pj_buf_len_to_str(cbuf, buf_len) return '' def __richcmp__(self, other, op): return BaseSDPSession_richcmp(self, other, op) cdef pjmedia_sdp_session* get_sdp_session(self): self._sdp_session.media_count = len(self.media) for index, m in enumerate(self.media): if m is not None: self._sdp_session.media[index] = (m).get_sdp_media() else: self._sdp_session.media[index] = NULL self._sdp_session.attr_count = len(self.attributes) for index, attr in enumerate(self.attributes): self._sdp_session.attr[index] = (attr).get_sdp_attribute() self._sdp_session.bandw_count = len(self.bandwidth_info) for index, info in enumerate(self.bandwidth_info): self._sdp_session.bandw[index] = (info).get_sdp_bandwidth_info() return &self._sdp_session property has_ice_attributes: def __get__(self): return set([attr.name for attr in self.attributes]).issuperset(['ice-pwd', 'ice-ufrag']) cdef class SDPSession(BaseSDPSession): def __init__(self, str address not None, object id=None, object version=None, str user not None="-", str net_type not None="IN", str address_type not None="IP4", str name not None=" ", SDPConnection connection=None, unsigned long start_time=0, unsigned long stop_time=0, list attributes=None, list bandwidth_info=None, list media=None): cdef unsigned int version_id = 2208988800UL cdef pj_time_val tv pj_gettimeofday(&tv) version_id += tv.sec self.address = address self.id = id if id is not None else version_id self.version = version if version is not None else version_id self.user = user self.net_type = net_type self.address_type = address_type self.name = name self.connection = connection self.start_time = start_time self.stop_time = stop_time self.attributes = attributes if attributes is not None else [] self.bandwidth_info = bandwidth_info if bandwidth_info is not None else [] self.media = media if media is not None else [] @classmethod def new(cls, BaseSDPSession sdp_session): connection = SDPConnection.new(sdp_session.connection) if (sdp_session.connection is not None) else None attributes = [SDPAttribute.new(attr) for attr in sdp_session.attributes] bandwidth_info = [SDPBandwidthInfo.new(info) for info in sdp_session.bandwidth_info] media = [SDPMediaStream.new(m) if m is not None else None for m in sdp_session.media] return cls(sdp_session.address, sdp_session.id, sdp_session.version, sdp_session.user, sdp_session.net_type, sdp_session.address_type, sdp_session.name, connection, sdp_session.start_time, sdp_session.stop_time, attributes, bandwidth_info, media) @classmethod def parse(cls, str sdp): cdef pjmedia_sdp_session *sdp_session sdp_session = _parse_sdp_session(sdp) return SDPSession_create(sdp_session) property address: def __get__(self): return self._address def __set__(self, str address not None): _str_to_pj_str(address, &self._sdp_session.origin.addr) self._address = address property id: def __get__(self): return self._sdp_session.origin.id def __set__(self, unsigned int id): self._sdp_session.origin.id = id property version: def __get__(self): return self._sdp_session.origin.version def __set__(self, unsigned int version): self._sdp_session.origin.version = version property user: def __get__(self): return self._user def __set__(self, str user not None): _str_to_pj_str(user, &self._sdp_session.origin.user) self._user = user property net_type: def __get__(self): return self._net_type def __set__(self, str net_type not None): _str_to_pj_str(net_type, &self._sdp_session.origin.net_type) self._net_type = net_type property address_type: def __get__(self): return self._address_type def __set__(self, str address_type not None): _str_to_pj_str(address_type, &self._sdp_session.origin.addr_type) self._address_type = address_type property name: def __get__(self): return self._name def __set__(self, str name not None): _str_to_pj_str(name, &self._sdp_session.name) self._name = name property connection: def __get__(self): return self._connection def __set__(self, SDPConnection connection): if connection is None: self._sdp_session.conn = NULL else: self._sdp_session.conn = connection.get_sdp_connection() self._connection = connection property start_time: def __get__(self): return self._sdp_session.time.start def __set__(self, unsigned long start_time): self._sdp_session.time.start = start_time property stop_time: def __get__(self): return self._sdp_session.time.stop def __set__(self, unsigned long stop_time): self._sdp_session.time.stop = stop_time property attributes: def __get__(self): return self._attributes def __set__(self, list attributes not None): if len(attributes) > PJMEDIA_MAX_SDP_ATTR: raise SIPCoreError("Too many attributes") for attr in attributes: if not isinstance(attr, SDPAttribute): raise TypeError("Items in SDPSession attribute list must be SDPAttribute instancess") if not isinstance(attributes, SDPAttributeList): attributes = SDPAttributeList(attributes) self._attributes = attributes property bandwidth_info: def __get__(self): return self._bandwidth_info def __set__(self, list infos not None): if len(infos) > PJMEDIA_MAX_SDP_BANDW: raise SIPCoreError("Too many bandwidth info attributes") for info in infos: if not isinstance(info, SDPBandwidthInfo): raise TypeError("Items in SDPSession attribute list must be SDPBandwidthInfo instancess") if not isinstance(infos, SDPBandwidthInfoList): infos = SDPBandwidthInfoList(infos) self._bandwidth_info = infos property media: def __get__(self): return self._media def __set__(self, list media not None): if len(media) > PJMEDIA_MAX_SDP_MEDIA: raise SIPCoreError("Too many media objects") for m in media: if m is not None and not isinstance(m, SDPMediaStream): raise TypeError("Items in SDPSession media list must be SDPMediaStream instancess") self._media = media cdef int _update(self) except -1: cdef SDPSession session cdef SDPMediaStream media, old_media session = SDPSession_create(&(self)._sdp_session) if len(self._media) != len(session._media): raise ValueError("Number of media streams in SDPSession got changed") if len(self._attributes) > len(session._attributes): raise ValueError("Number of attributes in SDPSession got reduced") for attr in ("id", "version", "user", "net_type", "address_type", "address", "name", "start_time", "stop_time"): setattr(self, attr, getattr(session, attr)) if session._connection is None: self.connection = None elif self._connection is None or self._connection != session._connection: self.connection = session._connection for index, attribute in enumerate(session._attributes): try: old_attribute = self._attributes[index] except IndexError: self._attributes.append(attribute) else: if old_attribute != attribute: self._attributes[index] = attribute for index, info in enumerate(session._bandwidth_info): try: old_info = self._bandwidth_info[index] except IndexError: self._bandwidth_info.append(info) else: if old_info != info: self._bandwidth_info[index] = info for index, media in enumerate(session._media): old_media = self._media[index] if old_media is not None: old_media._update(media) cdef class FrozenSDPSession(BaseSDPSession): def __init__(self, str address not None, object id=None, object version=None, str user not None="-", str net_type not None="IN", str address_type not None="IP4", str name not None=" ", FrozenSDPConnection connection=None, unsigned long start_time=0, unsigned long stop_time=0, frozenlist attributes not None=frozenlist(), frozenlist bandwidth_info not None=frozenlist(), frozenlist media not None=frozenlist()): cdef unsigned int version_id = 2208988800UL cdef pj_time_val tv if not self.initialized: if len(attributes) > PJMEDIA_MAX_SDP_ATTR: raise SIPCoreError("Too many attributes") for attr in attributes: if not isinstance(attr, FrozenSDPAttribute): raise TypeError("Items in FrozenSDPSession attribute list must be FrozenSDPAttribute instances") if len(bandwidth_info) > PJMEDIA_MAX_SDP_BANDW: raise SIPCoreError("Too many bandwidth info attributes") for info in bandwidth_info: if not isinstance(info, FrozenSDPBandwidthInfo): raise TypeError("Items in FrozenSDPSession bandwidth info attribute list must be FrozenSDPBandwidthInfo instances") if len(media) > PJMEDIA_MAX_SDP_MEDIA: raise SIPCoreError("Too many media objects") for m in media: if not isinstance(m, FrozenSDPMediaStream): raise TypeError("Items in FrozenSDPSession media list must be FrozenSDPMediaStream instancess") pj_gettimeofday(&tv) version_id += tv.sec self.address = address _str_to_pj_str(address, &self._sdp_session.origin.addr) self.id = id if id is not None else version_id self._sdp_session.origin.id = id if id is not None else version_id self.version = version if version is not None else version_id self._sdp_session.origin.version = version if version is not None else version_id self.user = user _str_to_pj_str(user, &self._sdp_session.origin.user) self.net_type = net_type _str_to_pj_str(net_type, &self._sdp_session.origin.net_type) self.address_type = address_type _str_to_pj_str(address_type, &self._sdp_session.origin.addr_type) self.name = name _str_to_pj_str(name, &self._sdp_session.name) self.connection = connection if connection is None: self._sdp_session.conn = NULL else: self._sdp_session.conn = connection.get_sdp_connection() self.start_time = start_time self._sdp_session.time.start = start_time self.stop_time = stop_time self._sdp_session.time.stop = stop_time self.attributes = FrozenSDPAttributeList(attributes) if not isinstance(attributes, FrozenSDPAttributeList) else attributes self.bandwidth_info = FrozenSDPBandwidthInfoList(bandwidth_info) if not isinstance(bandwidth_info, FrozenSDPBandwidthInfo) else bandwidth_info self.media = media self.initialized = 1 @classmethod def new(cls, BaseSDPSession sdp_session): if isinstance(sdp_session, FrozenSDPSession): return sdp_session connection = FrozenSDPConnection.new(sdp_session.connection) if (sdp_session.connection is not None) else None attributes = frozenlist([FrozenSDPAttribute.new(attr) for attr in sdp_session.attributes]) bandwidth_info = frozenlist([FrozenSDPBandwidthInfo.new(info) for info in sdp_session.bandwidth_info]) media = frozenlist([FrozenSDPMediaStream.new(m) for m in sdp_session.media]) return cls(sdp_session.address, sdp_session.id, sdp_session.version, sdp_session.user, sdp_session.net_type, sdp_session.address_type, sdp_session.name, connection, sdp_session.start_time, sdp_session.stop_time, attributes, bandwidth_info, media) @classmethod def parse(cls, str sdp): cdef pjmedia_sdp_session *sdp_session sdp_session = _parse_sdp_session(sdp) return FrozenSDPSession_create(sdp_session) def __hash__(self): return hash((self.address, self.id, self.version, self.user, self.net_type, self.address_type, self.name, self.connection, self.start_time, self.stop_time, self.attributes, self.bandwidth_info, self.media)) def __richcmp__(self, other, op): return BaseSDPSession_richcmp(self, other, op) class MediaCodec(object): name = WriteOnceAttribute() rate = WriteOnceAttribute() def __init__(self, name, rate): self.name = name self.rate = int(rate) def __repr__(self): return "%s(%r, %r)" % (self.__class__.__name__, self.name, self.rate) def __str__(self): return "%s/%s" % (self.name, self.rate) def __hash__(self): return hash(self.name) def __eq__(self, other): if isinstance(other, MediaCodec): return self.name.lower() == other.name.lower() and self.rate == other.rate elif isinstance(other, basestring): if '/' in other: return self.__str__().lower() == other.lower() else: return self.name.lower() == other.lower() return False def __ne__(self, other): return not self.__eq__(other) cdef object BaseSDPMediaStream_richcmp(object self, object other, int op) with gil: cdef int eq = 1 if op not in [2,3]: return NotImplemented if not isinstance(other, BaseSDPMediaStream): return NotImplemented for attr in ("media", "port", "port_count", "transport", "formats", "connection", "attributes", "bandwidth_info"): if getattr(self, attr) != getattr(other, attr): eq = 0 break if op == 2: return bool(eq) else: return not eq cdef class BaseSDPMediaStream: rtpmap_re = re.compile(r"""^(?P\d+)\s+(?P[-\w]+)/(?P\d+)(?:/\w+)?$""", re.IGNORECASE | re.MULTILINE) rtp_mappings = { 0: MediaCodec('PCMU', 8000), 3: MediaCodec('GSM', 8000), 4: MediaCodec('G723', 8000), 5: MediaCodec('DVI4', 8000), 6: MediaCodec('DVI4', 16000), 7: MediaCodec('LPC', 8000), 8: MediaCodec('PCMA', 8000), 9: MediaCodec('G722', 8000), 10: MediaCodec('L16', 44100), # 2 channels 11: MediaCodec('L16', 44100), # 1 channel 12: MediaCodec('QCELP', 8000), 13: MediaCodec('CN', 8000), 14: MediaCodec('MPA', 90000), 15: MediaCodec('G728', 8000), 16: MediaCodec('DVI4', 11025), 17: MediaCodec('DVI4', 22050), 18: MediaCodec('G729', 8000)} def __init__(self, *args, **kwargs): raise TypeError("BaseSDPMediaStream cannot be instantiated directly") def __repr__(self): return "%s(%r, %r, %r, %r, %r, %r, %r, %r)" % (self.__class__.__name__, self.media, self.port, self.transport, self.port_count, self.formats, self.connection, self.attributes, self.bandwidth_info) def __richcmp__(self, other, op): return BaseSDPMediaStream_richcmp(self, other, op) property direction: def __get__(self): for attribute in self.attributes: if attribute.name in ("sendrecv", "sendonly", "recvonly", "inactive"): return attribute.name return "sendrecv" property has_ice_attributes: def __get__(self): return set([attr.name for attr in self.attributes]).issuperset(['ice-pwd', 'ice-ufrag']) property has_ice_candidates: def __get__(self): return 'candidate' in self.attributes cdef pjmedia_sdp_media* get_sdp_media(self): self._sdp_media.attr_count = len(self.attributes) for index, attr in enumerate(self.attributes): self._sdp_media.attr[index] = (attr).get_sdp_attribute() self._sdp_media.bandw_count = len(self.bandwidth_info) for index, info in enumerate(self.bandwidth_info): self._sdp_media.bandw[index] = (info).get_sdp_bandwidth_info() return &self._sdp_media cdef class SDPMediaStream(BaseSDPMediaStream): def __init__(self, str media not None, int port, str transport not None, int port_count=1, list formats=None, SDPConnection connection=None, list attributes=None, list bandwidth_info=None): self.media = media self.port = port self.transport = transport self.port_count = port_count self.formats = formats if formats is not None else [] self.connection = connection self.attributes = attributes if attributes is not None else [] self.bandwidth_info = bandwidth_info if bandwidth_info is not None else [] @classmethod def new(cls, BaseSDPMediaStream sdp_media): connection = SDPConnection.new(sdp_media.connection) if (sdp_media.connection is not None) else None attributes = [SDPAttribute.new(attr) for attr in sdp_media.attributes] bandwidth_info = [SDPBandwidthInfo.new(bi) for bi in sdp_media.bandwidth_info] return cls(sdp_media.media, sdp_media.port, sdp_media.transport, sdp_media.port_count, list(sdp_media.formats), connection, attributes, bandwidth_info) property media: def __get__(self): return self._media def __set__(self, str media not None): _str_to_pj_str(media, &self._sdp_media.desc.media) self._media = media property port: def __get__(self): return self._sdp_media.desc.port def __set__(self, int port): self._sdp_media.desc.port = port property transport: def __get__(self): return self._transport def __set__(self, str transport not None): _str_to_pj_str(transport, &self._sdp_media.desc.transport) self._transport = transport property port_count: def __get__(self): return self._sdp_media.desc.port_count def __set__(self, int port_count): self._sdp_media.desc.port_count = port_count property formats: def __get__(self): return self._formats def __set__(self, list formats not None): if len(formats) > PJMEDIA_MAX_SDP_FMT: raise SIPCoreError("Too many formats") self._sdp_media.desc.fmt_count = len(formats) for index, format in enumerate(formats): _str_to_pj_str(format, &self._sdp_media.desc.fmt[index]) self._formats = formats property codec_list: def __get__(self): return self._codec_list property connection: def __get__(self): return self._connection def __set__(self, SDPConnection connection): if connection is None: self._sdp_media.conn = NULL else: self._sdp_media.conn = connection.get_sdp_connection() self._connection = connection property attributes: def __get__(self): return self._attributes def __set__(self, list attributes not None): if len(attributes) > PJMEDIA_MAX_SDP_ATTR: raise SIPCoreError("Too many attributes") for attr in attributes: if not isinstance(attr, SDPAttribute): raise TypeError("Items in SDPMediaStream attribute list must be SDPAttribute instances") if not isinstance(attributes, SDPAttributeList): attributes = SDPAttributeList(attributes) self._attributes = attributes if self._media in ("audio", "video"): rtp_mappings = self.rtp_mappings.copy() rtpmap_lines = '\n'.join([attr.value for attr in attributes if attr.name=='rtpmap']) # iterators are not supported -Dan rtpmap_codecs = dict([(int(type), MediaCodec(name, rate)) for type, name, rate in self.rtpmap_re.findall(rtpmap_lines)]) rtp_mappings.update(rtpmap_codecs) self._codec_list = [rtp_mappings.get(int(format), MediaCodec('Unknown', 0)) for format in self.formats] else: self._codec_list = list() property bandwidth_info: def __get__(self): return self._bandwidth_info def __set__(self, list infos not None): if len(infos) > PJMEDIA_MAX_SDP_BANDW: raise SIPCoreError("Too many bandwidth information attributes") for info in infos: if not isinstance(info, SDPBandwidthInfo): raise TypeError("Items in SDPMediaStream bandwidth_info list must be SDPBandwidthInfo instances") if not isinstance(infos, SDPBandwidthInfoList): infos = SDPBandwidthInfoList(infos) self._bandwidth_info = infos cdef int _update(self, SDPMediaStream media) except -1: if len(self._attributes) > len(media._attributes): raise ValueError("Number of attributes in SDPMediaStream got reduced") if len(self._bandwidth_info) > len(media._bandwidth_info): raise ValueError("Number of bandwidth info attributes in SDPMediaStream got reduced") for attr in ("media", "port", "transport", "port_count", "formats"): setattr(self, attr, getattr(media, attr)) if media._connection is None: self.connection = None elif self._connection is None or self._connection != media.connection: self.connection = media._connection for index, attribute in enumerate(media._attributes): try: old_attribute = self._attributes[index] except IndexError: self._attributes.append(attribute) else: if old_attribute != attribute: self._attributes[index] = attribute for index, info in enumerate(media._bandwidth_info): try: old_info = self._bandwidth_info[index] except IndexError: self._bandwidth_info.append(info) else: if old_info != info: self._bandwidth_info[index] = info cdef class FrozenSDPMediaStream(BaseSDPMediaStream): def __init__(self, str media not None, int port, str transport not None, int port_count=1, frozenlist formats not None=frozenlist(), FrozenSDPConnection connection=None, frozenlist attributes not None=frozenlist(), frozenlist bandwidth_info not None=frozenlist()): if not self.initialized: if len(formats) > PJMEDIA_MAX_SDP_FMT: raise SIPCoreError("Too many formats") if len(attributes) > PJMEDIA_MAX_SDP_ATTR: raise SIPCoreError("Too many attributes") for attr in attributes: if not isinstance(attr, FrozenSDPAttribute): raise TypeError("Items in FrozenSDPMediaStream attribute list must be FrozenSDPAttribute instances") if len(bandwidth_info) > PJMEDIA_MAX_SDP_BANDW: raise SIPCoreError("Too many bandwidth info attributes") for info in bandwidth_info: if not isinstance(info, FrozenSDPBandwidthInfo): raise TypeError("Items in FrozenSDPMediaStream bandwidth info list must be FrozenSDPBandwidthInfo instances") self.media = media _str_to_pj_str(media, &self._sdp_media.desc.media) self.port = port self._sdp_media.desc.port = port self.transport = transport _str_to_pj_str(transport, &self._sdp_media.desc.transport) self.port_count = port_count self._sdp_media.desc.port_count = port_count self.formats = formats self._sdp_media.desc.fmt_count = len(self.formats) for index, format in enumerate(self.formats): _str_to_pj_str(format, &self._sdp_media.desc.fmt[index]) self.connection = connection if connection is None: self._sdp_media.conn = NULL else: self._sdp_media.conn = connection.get_sdp_connection() self.attributes = FrozenSDPAttributeList(attributes) if not isinstance(attributes, FrozenSDPAttributeList) else attributes if self.media in ("audio", "video"): rtp_mappings = self.rtp_mappings.copy() rtpmap_lines = '\n'.join([attr.value for attr in attributes if attr.name=='rtpmap']) # iterators are not supported -Dan rtpmap_codecs = dict([(int(type), MediaCodec(name, rate)) for type, name, rate in self.rtpmap_re.findall(rtpmap_lines)]) rtp_mappings.update(rtpmap_codecs) self.codec_list = frozenlist([rtp_mappings.get(int(format) if format.isdigit() else None, MediaCodec('Unknown', 0)) for format in self.formats]) else: self.codec_list = frozenlist() self.bandwidth_info = FrozenSDPBandwidthInfoList(bandwidth_info) if not isinstance(bandwidth_info, FrozenSDPBandwidthInfoList) else bandwidth_info self.initialized = 1 @classmethod def new(cls, BaseSDPMediaStream sdp_media): if isinstance(sdp_media, FrozenSDPMediaStream): return sdp_media connection = FrozenSDPConnection.new(sdp_media.connection) if (sdp_media.connection is not None) else None attributes = frozenlist([FrozenSDPAttribute.new(attr) for attr in sdp_media.attributes]) bandwidth_info = frozenlist([FrozenSDPBandwidthInfo.new(info) for info in sdp_media.bandwidth_info]) return cls(sdp_media.media, sdp_media.port, sdp_media.transport, sdp_media.port_count, frozenlist(sdp_media.formats), connection, attributes, bandwidth_info) def __hash__(self): return hash((self.media, self.port, self.transport, self.port_count, self.formats, self.connection, self.attributes, self.bandwidth_info)) def __richcmp__(self, other, op): return BaseSDPMediaStream_richcmp(self, other, op) cdef object BaseSDPConnection_richcmp(object self, object other, int op) with gil: cdef int eq = 1 if op not in [2,3]: return NotImplemented if not isinstance(other, BaseSDPConnection): return NotImplemented for attr in ("net_type", "address_type", "address"): if getattr(self, attr) != getattr(other, attr): eq = 0 break if op == 2: return bool(eq) else: return not eq cdef class BaseSDPConnection: def __init__(self, *args, **kwargs): raise TypeError("BaseSDPConnection cannot be instantiated directly") def __repr__(self): return "%s(%r, %r, %r)" % (self.__class__.__name__, self.address, self.net_type, self.address_type) def __richcmp__(self, other, op): return BaseSDPConnection_richcmp(self, other, op) cdef pjmedia_sdp_conn* get_sdp_connection(self): return &self._sdp_connection cdef class SDPConnection(BaseSDPConnection): def __init__(self, str address not None, str net_type not None="IN", str address_type not None="IP4"): self.address = address self.net_type = net_type self.address_type = address_type @classmethod def new(cls, BaseSDPConnection sdp_connection): return cls(sdp_connection.address, sdp_connection.net_type, sdp_connection.address_type) property address: def __get__(self): return self._address def __set__(self, str address not None): _str_to_pj_str(address, &self._sdp_connection.addr) self._address = address property net_type: def __get__(self): return self._net_type def __set__(self, str net_type not None): _str_to_pj_str(net_type, &self._sdp_connection.net_type) self._net_type = net_type property address_type: def __get__(self): return self._address_type def __set__(self, str address_type not None): _str_to_pj_str(address_type, &self._sdp_connection.addr_type) self._address_type = address_type cdef class FrozenSDPConnection(BaseSDPConnection): def __init__(self, str address not None, str net_type not None="IN", str address_type not None="IP4"): if not self.initialized: _str_to_pj_str(address, &self._sdp_connection.addr) _str_to_pj_str(net_type, &self._sdp_connection.net_type) _str_to_pj_str(address_type, &self._sdp_connection.addr_type) self.address = address self.net_type = net_type self.address_type = address_type self.initialized = 1 @classmethod def new(cls, BaseSDPConnection sdp_connection): if isinstance(sdp_connection, FrozenSDPConnection): return sdp_connection return cls(sdp_connection.address, sdp_connection.net_type, sdp_connection.address_type) def __hash__(self): return hash((self.address, self.net_type, self.address_type)) def __richcmp__(self, other, op): return BaseSDPConnection_richcmp(self, other, op) cdef class SDPAttributeList(list): def __contains__(self, item): if isinstance(item, BaseSDPAttribute): return list.__contains__(self, item) else: return item in [attr.name for attr in self] def getall(self, name): return [attr.value for attr in self if attr.name == name] def getfirst(self, name, default=None): for attr in self: if attr.name == name: return attr.value return default cdef class FrozenSDPAttributeList(frozenlist): def __contains__(self, item): if isinstance(item, BaseSDPAttribute): return list.__contains__(self, item) else: return item in [attr.name for attr in self] def getall(self, name): return [attr.value for attr in self if attr.name == name] def getfirst(self, name, default=None): for attr in self: if attr.name == name: return attr.value return default cdef object BaseSDPAttribute_richcmp(object self, object other, int op) with gil: cdef int eq = 1 if op not in [2,3]: return NotImplemented if not isinstance(other, BaseSDPAttribute): return NotImplemented for attr in ("name", "value"): if getattr(self, attr) != getattr(other, attr): eq = 0 break if op == 2: return bool(eq) else: return not eq cdef class BaseSDPAttribute: def __init__(self, *args, **kwargs): raise TypeError("BaseSDPAttribute cannot be instantiated directly") def __repr__(self): return "%s(%r, %r)" % (self.__class__.__name__, self.name, self.value) def __richcmp__(self, other, op): return BaseSDPAttribute_richcmp(self, other, op) cdef pjmedia_sdp_attr* get_sdp_attribute(self): return &self._sdp_attribute cdef class SDPAttribute(BaseSDPAttribute): def __init__(self, str name not None, str value not None): self.name = name self.value = value @classmethod def new(cls, BaseSDPAttribute sdp_attribute): return cls(sdp_attribute.name, sdp_attribute.value) property name: def __get__(self): return self._name def __set__(self, str name not None): _str_to_pj_str(name, &self._sdp_attribute.name) self._name = name property value: def __get__(self): return self._value def __set__(self, str value not None): _str_to_pj_str(value, &self._sdp_attribute.value) self._value = value cdef class FrozenSDPAttribute(BaseSDPAttribute): def __init__(self, str name not None, str value not None): if not self.initialized: _str_to_pj_str(name, &self._sdp_attribute.name) _str_to_pj_str(value, &self._sdp_attribute.value) self.name = name self.value = value self.initialized = 1 @classmethod def new(cls, BaseSDPAttribute sdp_attribute): if isinstance(sdp_attribute, FrozenSDPAttribute): return sdp_attribute return cls(sdp_attribute.name, sdp_attribute.value) def __hash__(self): return hash((self.name, self.value)) def __richcmp__(self, other, op): return BaseSDPAttribute_richcmp(self, other, op) cdef class SDPBandwidthInfoList(list): def __contains__(self, item): if isinstance(item, BaseSDPBandwidthInfo): return list.__contains__(self, item) else: - return item in [attr.name for attr in self] + return item in [attr.name.decode() for attr in self] cdef class FrozenSDPBandwidthInfoList(frozenlist): def __contains__(self, item): if isinstance(item, BaseSDPBandwidthInfo): return list.__contains__(self, item) else: - return item in [info.modifier for info in self] + return item in [info.modifier.decode() for info in self] cdef object BaseSDPBandwidthInfo_richcmp(object self, object other, int op) with gil: cdef int eq = 1 if op not in [2,3]: return NotImplemented if not isinstance(other, BaseSDPBandwidthInfo): return NotImplemented for attr in ("modifier", "value"): if getattr(self, attr) != getattr(other, attr): eq = 0 break if op == 2: return bool(eq) else: return not eq cdef class BaseSDPBandwidthInfo: def __init__(self, *args, **kwargs): raise TypeError("BaseSDPBandwidthInfo cannot be instantiated directly") def __repr__(self): return "%s(%r, %r)" % (self.__class__.__name__, self.modifier, self.value) def __richcmp__(self, other, op): return BaseSDPBandwidthInfo_richcmp(self, other, op) cdef pjmedia_sdp_bandw* get_sdp_bandwidth_info(self): return &self._sdp_bandwidth_info cdef class SDPBandwidthInfo(BaseSDPBandwidthInfo): def __init__(self, str modifier not None, object value not None): self.modifier = modifier self.value = value @classmethod def new(cls, BaseSDPBandwidthInfo sdp_bandwidth_info): return cls(sdp_bandwidth_info.modifier, sdp_bandwidth_info.value) property modifier: def __get__(self): return self._modifier def __set__(self, str modifier not None): _str_to_pj_str(modifier, &self._sdp_bandwidth_info.modifier) self._modifier = modifier property value: def __get__(self): return self._value def __set__(self, object value not None): self._value = value self._sdp_bandwidth_info.value = self._value cdef class FrozenSDPBandwidthInfo(BaseSDPBandwidthInfo): def __init__(self, str modifier not None, object value not None): if not self.initialized: _str_to_pj_str(modifier, &self._sdp_bandwidth_info.modifier) self.modifier = modifier self._sdp_bandwidth_info.value = value self.value = value self.initialized = 1 @classmethod def new(cls, BaseSDPBandwidthInfo sdp_bandwidth_info): if isinstance(sdp_bandwidth_info, FrozenSDPBandwidthInfo): return sdp_bandwidth_info return cls(sdp_bandwidth_info.modifier, sdp_bandwidth_info.value) def __hash__(self): return hash((self.modifier, self.value)) def __richcmp__(self, other, op): return BaseSDPBandwidthInfo_richcmp(self, other, op) # Factory functions # cdef SDPSession SDPSession_create(pjmedia_sdp_session_ptr_const pj_session): cdef SDPConnection connection = None cdef int i if pj_session.conn != NULL: connection = SDPConnection_create(pj_session.conn) return SDPSession(_pj_str_to_str(pj_session.origin.addr), pj_session.origin.id, pj_session.origin.version, _pj_str_to_str(pj_session.origin.user), _pj_str_to_str(pj_session.origin.net_type), _pj_str_to_str(pj_session.origin.addr_type), _pj_str_to_str(pj_session.name), connection, pj_session.time.start, pj_session.time.stop, [SDPAttribute_create(pj_session.attr[i]) for i in range(pj_session.attr_count)], [SDPBandwidthInfo_create(pj_session.bandw[i]) for i in range(pj_session.bandw_count)], [SDPMediaStream_create(pj_session.media[i]) if pj_session.media[i] != NULL else None for i in range(pj_session.media_count)]) cdef FrozenSDPSession FrozenSDPSession_create(pjmedia_sdp_session_ptr_const pj_session): cdef FrozenSDPConnection connection = None cdef int i if pj_session.conn != NULL: connection = FrozenSDPConnection_create(pj_session.conn) return FrozenSDPSession(_pj_str_to_str(pj_session.origin.addr), pj_session.origin.id, pj_session.origin.version, _pj_str_to_str(pj_session.origin.user), _pj_str_to_str(pj_session.origin.net_type), _pj_str_to_str(pj_session.origin.addr_type), _pj_str_to_str(pj_session.name), connection, pj_session.time.start, pj_session.time.stop, frozenlist([FrozenSDPAttribute_create(pj_session.attr[i]) for i in range(pj_session.attr_count)]), frozenlist([FrozenSDPBandwidthInfo_create(pj_session.bandw[i]) for i in range(pj_session.bandw_count)]), frozenlist([FrozenSDPMediaStream_create(pj_session.media[i]) if pj_session.media[i] != NULL else None for i in range(pj_session.media_count)])) cdef SDPMediaStream SDPMediaStream_create(pjmedia_sdp_media *pj_media): cdef SDPConnection connection = None cdef int i if pj_media.conn != NULL: connection = SDPConnection_create(pj_media.conn) return SDPMediaStream(_pj_str_to_str(pj_media.desc.media), pj_media.desc.port, _pj_str_to_str(pj_media.desc.transport), pj_media.desc.port_count, [_pj_str_to_str(pj_media.desc.fmt[i]) for i in range(pj_media.desc.fmt_count)], connection, [SDPAttribute_create(pj_media.attr[i]) for i in range(pj_media.attr_count)], [SDPBandwidthInfo_create(pj_media.bandw[i]) for i in range(pj_media.bandw_count)]) cdef FrozenSDPMediaStream FrozenSDPMediaStream_create(pjmedia_sdp_media *pj_media): cdef FrozenSDPConnection connection = None cdef int i if pj_media.conn != NULL: connection = FrozenSDPConnection_create(pj_media.conn) return FrozenSDPMediaStream(_pj_str_to_str(pj_media.desc.media), pj_media.desc.port, _pj_str_to_str(pj_media.desc.transport), pj_media.desc.port_count, frozenlist([_pj_str_to_str(pj_media.desc.fmt[i]) for i in range(pj_media.desc.fmt_count)]), connection, frozenlist([FrozenSDPAttribute_create(pj_media.attr[i]) for i in range(pj_media.attr_count)]), frozenlist([FrozenSDPBandwidthInfo_create(pj_media.bandw[i]) for i in range(pj_media.bandw_count)])) cdef SDPConnection SDPConnection_create(pjmedia_sdp_conn *pj_conn): return SDPConnection(_pj_str_to_str(pj_conn.addr), _pj_str_to_str(pj_conn.net_type), _pj_str_to_str(pj_conn.addr_type)) cdef FrozenSDPConnection FrozenSDPConnection_create(pjmedia_sdp_conn *pj_conn): return FrozenSDPConnection(_pj_str_to_str(pj_conn.addr), _pj_str_to_str(pj_conn.net_type), _pj_str_to_str(pj_conn.addr_type)) cdef SDPAttribute SDPAttribute_create(pjmedia_sdp_attr *pj_attr): return SDPAttribute(_pj_str_to_str(pj_attr.name), _pj_str_to_str(pj_attr.value)) cdef FrozenSDPAttribute FrozenSDPAttribute_create(pjmedia_sdp_attr *pj_attr): return FrozenSDPAttribute(_pj_str_to_str(pj_attr.name), _pj_str_to_str(pj_attr.value)) cdef SDPBandwidthInfo SDPBandwidthInfo_create(pjmedia_sdp_bandw *pj_bandw): return SDPBandwidthInfo(_pj_str_to_str(pj_bandw.modifier), int(pj_bandw.value)) cdef FrozenSDPBandwidthInfo FrozenSDPBandwidthInfo_create(pjmedia_sdp_bandw *pj_bandw): return FrozenSDPBandwidthInfo(_pj_str_to_str(pj_bandw.modifier), int(pj_bandw.value)) # SDP negotiator # cdef class SDPNegotiator: def __cinit__(self, *args, **kwargs): cdef pj_pool_t *pool cdef bytes pool_name cdef PJSIPUA ua ua = _get_ua() pool_name = b"SDPNegotiator_%d" % id(self) pool = ua.create_memory_pool(pool_name, 4096, 4096) self._pool = pool self._neg = NULL def __dealloc__(self): cdef PJSIPUA ua try: ua = _get_ua() except: return ua.release_memory_pool(self._pool) self._pool = NULL @classmethod def create_with_local_offer(cls, BaseSDPSession sdp_session): cdef int status cdef pjmedia_sdp_neg *neg cdef pj_pool_t *pool cdef SDPNegotiator obj obj = cls() pool = obj._pool status = pjmedia_sdp_neg_create_w_local_offer(pool, sdp_session.get_sdp_session(), &neg) if status != 0: raise PJSIPError("failed to create SDPNegotiator with local offer", status) obj._neg = neg return obj @classmethod def create_with_remote_offer(cls, BaseSDPSession sdp_session): cdef int status cdef pjmedia_sdp_neg *neg cdef pj_pool_t *pool cdef SDPNegotiator obj obj = cls() pool = obj._pool status = pjmedia_sdp_neg_create_w_remote_offer(pool, NULL, sdp_session.get_sdp_session(), &neg) if status != 0: raise PJSIPError("failed to create SDPNegotiator with remote offer", status) obj._neg = neg return obj def __repr__(self): return "%s, state=%s" % (self.__class__.__name__, self.state) property state: def __get__(self): if self._neg == NULL: return None return _buf_to_str(pjmedia_sdp_neg_state_str(pjmedia_sdp_neg_get_state(self._neg))) property active_local: def __get__(self): cdef int status cdef pjmedia_sdp_session_ptr_const sdp if self._neg == NULL: return None status = pjmedia_sdp_neg_get_active_local(self._neg, &sdp) if status != 0: return None return FrozenSDPSession_create(sdp) property active_remote: def __get__(self): cdef int status cdef pjmedia_sdp_session_ptr_const sdp if self._neg == NULL: return None status = pjmedia_sdp_neg_get_active_remote(self._neg, &sdp) if status != 0: return None return FrozenSDPSession_create(sdp) property current_local: def __get__(self): cdef int status cdef pjmedia_sdp_session_ptr_const sdp if self._neg == NULL: return None status = pjmedia_sdp_neg_get_neg_local(self._neg, &sdp) if status != 0: return None return FrozenSDPSession_create(sdp) property current_remote: def __get__(self): cdef int status cdef pjmedia_sdp_session_ptr_const sdp if self._neg == NULL: return None status = pjmedia_sdp_neg_get_neg_remote(self._neg, &sdp) if status != 0: return None return FrozenSDPSession_create(sdp) def _check_self(self): if self._neg == NULL: raise RuntimeError('SDPNegotiator was not properly initialized') def set_local_answer(self, BaseSDPSession sdp_session): self._check_self() cdef int status cdef pj_pool_t *pool = self._pool status = pjmedia_sdp_neg_set_local_answer(pool, self._neg, sdp_session.get_sdp_session()) if status != 0: raise PJSIPError("failed to set local answer", status) def set_local_offer(self, BaseSDPSession sdp_session): # PJSIP has an asymmetric API here. This function will modify the local session with the new SDP and treat it as a local offer. self._check_self() cdef int status cdef pj_pool_t *pool = self._pool status = pjmedia_sdp_neg_modify_local_offer(pool, self._neg, sdp_session.get_sdp_session()) if status != 0: raise PJSIPError("failed to set local offer", status) def set_remote_answer(self, BaseSDPSession sdp_session): self._check_self() cdef int status cdef pj_pool_t *pool = self._pool status = pjmedia_sdp_neg_set_remote_answer(pool, self._neg, sdp_session.get_sdp_session()) if status != 0: raise PJSIPError("failed to set remote answer", status) def set_remote_offer(self, BaseSDPSession sdp_session): self._check_self() cdef int status cdef pj_pool_t *pool = self._pool status = pjmedia_sdp_neg_set_remote_offer(pool, self._neg, sdp_session.get_sdp_session()) if status != 0: raise PJSIPError("failed to set remote offer", status) def cancel_offer(self): self._check_self() cdef int status status = pjmedia_sdp_neg_cancel_offer(self._neg) if status != 0: raise PJSIPError("failed to cancel offer", status) def negotiate(self): self._check_self() cdef int status cdef pj_pool_t *pool = self._pool status = pjmedia_sdp_neg_negotiate(pool, self._neg, 0) if status != 0: raise PJSIPError("SDP negotiation failed", status) diff --git a/sipsimple/core/_core.ua.pxi b/sipsimple/core/_core.ua.pxi index d9b4f757..855f560b 100644 --- a/sipsimple/core/_core.ua.pxi +++ b/sipsimple/core/_core.ua.pxi @@ -1,1229 +1,1229 @@ import errno import heapq import re import random import sys import time import traceback import os import tempfile cdef class Timer: cdef int schedule(self, float delay, timer_callback callback, object obj) except -1: cdef PJSIPUA ua = _get_ua() if delay < 0: raise ValueError("delay must be a non-negative number") if callback == NULL: raise ValueError("callback must be non-NULL") if self._scheduled: raise RuntimeError("already scheduled") self.schedule_time = PyFloat_AsDouble(time.time() + delay) self.callback = callback self.obj = obj ua._add_timer(self) self._scheduled = 1 return 0 cdef int cancel(self) except -1: cdef PJSIPUA ua = _get_ua() if not self._scheduled: return 0 ua._remove_timer(self) self._scheduled = 0 return 0 cdef int call(self) except -1: self._scheduled = 0 self.callback(self.obj, self) def __richcmp__(self, other, op): cdef double diff if not isinstance(self, Timer) or not isinstance(other, Timer): return NotImplemented diff = (self).schedule_time - (other).schedule_time if op == 0: # < return diff < 0.0 elif op == 1: # <= return diff <= 0.0 elif op == 2: # == return diff == 0.0 elif op == 3: # != return diff != 0.0 elif op == 4: # > return diff > 0.0 elif op == 5: # >= return diff >= 0.0 return cdef class PJSIPUA: def __cinit__(self, *args, **kwargs): global _ua if _ua != NULL: raise SIPCoreError("Can only have one PJSUPUA instance at the same time") _ua = self self._threads = [] self._timers = list() self._events = {} self._incoming_events = set() self._incoming_requests = set() self._sent_messages = set() def __init__(self, event_handler, *args, **kwargs): global _event_queue_lock cdef str event cdef str method cdef list accept_types cdef int status cdef PJSTR message_method = PJSTR("MESSAGE") cdef PJSTR refer_method = PJSTR("REFER") cdef PJSTR str_norefersub = PJSTR("norefersub") cdef PJSTR str_gruu = PJSTR("gruu") self._event_handler = event_handler if kwargs["log_level"] < 0 or kwargs["log_level"] > PJ_LOG_MAX_LEVEL: raise ValueError("Log level should be between 0 and %d" % PJ_LOG_MAX_LEVEL) pj_log_set_level(kwargs["log_level"]) pj_log_set_decor(PJ_LOG_HAS_YEAR | PJ_LOG_HAS_MONTH | PJ_LOG_HAS_DAY_OF_MON | PJ_LOG_HAS_TIME | PJ_LOG_HAS_MICRO_SEC | PJ_LOG_HAS_SENDER | PJ_LOG_HAS_INDENT) pj_log_set_log_func(_cb_log) self._pjlib = PJLIB() pj_srand(random.getrandbits(32)) # rely on python seed for now self._caching_pool = PJCachingPool() self._pjmedia_endpoint = PJMEDIAEndpoint(self._caching_pool) self._pjsip_endpoint = PJSIPEndpoint(self._caching_pool, kwargs["ip_address"], kwargs["udp_port"], kwargs["tcp_port"], kwargs["tls_port"], kwargs["tls_verify_server"], kwargs["tls_ca_file"], kwargs["tls_cert_file"], kwargs["tls_privkey_file"], kwargs["tls_timeout"]) status = pj_mutex_create_simple(self._pjsip_endpoint._pool, "event_queue_lock", &_event_queue_lock) if status != 0: raise PJSIPError("Could not initialize event queue mutex", status) self._ip_address = kwargs["ip_address"] self.codecs = kwargs["codecs"] self.video_codecs = kwargs["video_codecs"] self._module_name = PJSTR("mod-core") self._module.name = self._module_name.pj_str self._module.id = -1 self._module.priority = PJSIP_MOD_PRIORITY_APPLICATION self._module.on_rx_request = _PJSIPUA_cb_rx_request self._module.on_tsx_state = _Request_cb_tsx_state status = pjsip_endpt_register_module(self._pjsip_endpoint._obj, &self._module) if status != 0: raise PJSIPError("Could not load application module", status) status = pjsip_endpt_add_capability(self._pjsip_endpoint._obj, &self._module, PJSIP_H_ALLOW, NULL, 1, &message_method.pj_str) if status != 0: raise PJSIPError("Could not add MESSAGE method to supported methods", status) status = pjsip_endpt_add_capability(self._pjsip_endpoint._obj, &self._module, PJSIP_H_ALLOW, NULL, 1, &refer_method.pj_str) if status != 0: raise PJSIPError("Could not add REFER method to supported methods", status) status = pjsip_endpt_add_capability(self._pjsip_endpoint._obj, NULL, PJSIP_H_SUPPORTED, NULL, 1, &str_norefersub.pj_str) if status != 0: raise PJSIPError("Could not add 'norefsub' to Supported header", status) status = pjsip_endpt_add_capability(self._pjsip_endpoint._obj, NULL, PJSIP_H_SUPPORTED, NULL, 1, &str_gruu.pj_str) if status != 0: raise PJSIPError("Could not add 'gruu' to Supported header", status) self._trace_sip = int(bool(kwargs["trace_sip"])) self._detect_sip_loops = int(bool(kwargs["detect_sip_loops"])) self._enable_colorbar_device = int(bool(kwargs["enable_colorbar_device"])) self._opus_fix_module_name = PJSTR("mod-core-opus-fix") self._opus_fix_module.name = self._opus_fix_module_name.pj_str self._opus_fix_module.id = -1 self._opus_fix_module.priority = PJSIP_MOD_PRIORITY_TRANSPORT_LAYER+1 self._opus_fix_module.on_rx_request = _cb_opus_fix_rx self._opus_fix_module.on_rx_response = _cb_opus_fix_rx self._opus_fix_module.on_tx_request = _cb_opus_fix_tx self._opus_fix_module.on_tx_response = _cb_opus_fix_tx status = pjsip_endpt_register_module(self._pjsip_endpoint._obj, &self._opus_fix_module) if status != 0: raise PJSIPError("Could not load opus-fix module", status) self._trace_module_name = PJSTR("mod-core-sip-trace") self._trace_module.name = self._trace_module_name.pj_str self._trace_module.id = -1 self._trace_module.priority = 0 self._trace_module.on_rx_request = _cb_trace_rx self._trace_module.on_rx_response = _cb_trace_rx self._trace_module.on_tx_request = _cb_trace_tx self._trace_module.on_tx_response = _cb_trace_tx status = pjsip_endpt_register_module(self._pjsip_endpoint._obj, &self._trace_module) if status != 0: raise PJSIPError("Could not load sip trace module", status) self._ua_tag_module_name = PJSTR("mod-core-ua-tag") self._ua_tag_module.name = self._ua_tag_module_name.pj_str self._ua_tag_module.id = -1 self._ua_tag_module.priority = PJSIP_MOD_PRIORITY_TRANSPORT_LAYER+1 self._ua_tag_module.on_tx_request = _cb_add_user_agent_hdr self._ua_tag_module.on_tx_response = _cb_add_server_hdr status = pjsip_endpt_register_module(self._pjsip_endpoint._obj, &self._ua_tag_module) if status != 0: raise PJSIPError("Could not load User-Agent/Server header tagging module", status) - self._event_module_name = PJSTR("mod-core-events") - self._event_module.name = self._event_module_name.pj_str - self._event_module.id = -1 - self._event_module.priority = PJSIP_MOD_PRIORITY_DIALOG_USAGE - status = pjsip_endpt_register_module(self._pjsip_endpoint._obj, &self._event_module) - if status != 0: - raise PJSIPError("Could not load events module", status) +# self._event_module_name = PJSTR("mod-core-events") +# self._event_module.name = self._event_module_name.pj_str +# self._event_module.id = -1 +# self._event_module.priority = PJSIP_MOD_PRIORITY_DIALOG_USAGE +# status = pjsip_endpt_register_module(self._pjsip_endpoint._obj, &self._event_module) +# if status != 0: +# raise PJSIPError("Could not load events module", status) status = pjmedia_aud_dev_set_observer_cb(_cb_audio_dev_process_event); if status != 0: raise PJSIPError("Could not set audio_change callbacks", status) status = pj_rwmutex_create(self._pjsip_endpoint._pool, "ua_audio_change_rwlock", &self.audio_change_rwlock) if status != 0: raise PJSIPError("Could not initialize audio change rwmutex", status) status = pj_mutex_create_recursive(self._pjsip_endpoint._pool, "ua_video_lock", &self.video_lock) if status != 0: raise PJSIPError("Could not initialize video mutex", status) self._user_agent = PJSTR(kwargs["user_agent"]) for event, accept_types in kwargs["events"].iteritems(): self.add_event(event, accept_types) for event in kwargs["incoming_events"]: if event not in self._events.keys(): raise ValueError('Event "%s" is not known' % event) self._incoming_events.add(event) for method in kwargs["incoming_requests"]: method = method.upper() if method in ("ACK", "BYE", "INVITE", "REFER", "SUBSCRIBE"): raise ValueError('Handling incoming "%s" requests is not allowed' % method) self._incoming_requests.add(method) self.rtp_port_range = kwargs["rtp_port_range"] self.zrtp_cache = kwargs["zrtp_cache"] pj_stun_config_init(&self._stun_cfg, &self._caching_pool._obj.factory, 0, pjmedia_endpt_get_ioqueue(self._pjmedia_endpoint._obj), pjsip_endpt_get_timer_heap(self._pjsip_endpoint._obj)) property trace_sip: def __get__(self): self._check_self() return bool(self._trace_sip) def __set__(self, value): self._check_self() self._trace_sip = int(bool(value)) property detect_sip_loops: def __get__(self): self._check_self() return bool(self._detect_sip_loops) def __set__(self, value): self._check_self() self._detect_sip_loops = int(bool(value)) property enable_colorbar_device: def __get__(self): self._check_self() return bool(self._enable_colorbar_device) def __set__(self, value): self._check_self() self._enable_colorbar_device = int(bool(value)) self.refresh_video_devices() property events: def __get__(self): self._check_self() return self._events.copy() property ip_address: def __get__(self): self._check_self() return self._ip_address def add_event(self, object event, list accept_types): cdef pj_str_t event_pj cdef pj_str_t accept_types_pj[PJSIP_MAX_ACCEPT_COUNT] cdef int index cdef object accept_type cdef int accept_cnt = len(accept_types) cdef int status self._check_self() if accept_cnt == 0: raise SIPCoreError("Need at least one of accept_types") if accept_cnt > PJSIP_MAX_ACCEPT_COUNT: raise SIPCoreError("Too many accept_types") _str_to_pj_str(event, &event_pj) for index, accept_type in enumerate(accept_types): _str_to_pj_str(accept_type, &accept_types_pj[index]) status = pjsip_evsub_register_pkg(&self._event_module, &event_pj, 3600, accept_cnt, accept_types_pj) if status != 0: raise PJSIPError("Could not register event package", status) self._events[event] = accept_types[:] property incoming_events: def __get__(self): self._check_self() return self._incoming_events.copy() def add_incoming_event(self, str event): self._check_self() if event not in self._events.keys(): raise ValueError('Event "%s" is not known' % event) self._incoming_events.add(event) def remove_incoming_event(self, str event): self._check_self() if event not in self._events.keys(): raise ValueError('Event "%s" is not known' % event) self._incoming_events.discard(event) property incoming_requests: def __get__(self): self._check_self() return self._incoming_requests.copy() def add_incoming_request(self, object value): cdef str method self._check_self() method = value.upper() if method in ("ACK", "BYE", "INVITE", "REFER", "SUBSCRIBE"): raise ValueError('Handling incoming "%s" requests is not allowed' % method) self._incoming_requests.add(method) def remove_incoming_request(self, object value): cdef str method self._check_self() method = value.upper() if method in ("ACK", "BYE", "INVITE", "REFER", "SUBSCRIBE"): raise ValueError('Handling incoming "%s" requests is not allowed' % method) self._incoming_requests.discard(method) cdef pj_pool_t* create_memory_pool(self, bytes name, int initial_size, int resize_size): cdef pj_pool_t *pool cdef char *c_pool_name cdef pjsip_endpoint *endpoint c_pool_name = name endpoint = self._pjsip_endpoint._obj with nogil: pool = pjsip_endpt_create_pool(endpoint, c_pool_name, initial_size, resize_size) if pool == NULL: raise SIPCoreError("Could not allocate memory pool") return pool cdef void release_memory_pool(self, pj_pool_t* pool): cdef pjsip_endpoint *endpoint endpoint = self._pjsip_endpoint._obj if pool != NULL: with nogil: pjsip_endpt_release_pool(endpoint, pool) cdef void reset_memory_pool(self, pj_pool_t* pool): if pool != NULL: with nogil: pj_pool_reset(pool) cdef object _get_sound_devices(self, int is_output): cdef int count cdef pjmedia_aud_dev_info info cdef list retval = list() cdef int status with nogil: status = pj_rwmutex_lock_read(self.audio_change_rwlock) if status != 0: raise PJSIPError('Could not acquire audio_change_rwlock', status) try: for i in range(pjmedia_aud_dev_count()): with nogil: status = pjmedia_aud_dev_get_info(i, &info) if status != 0: raise PJSIPError("Could not get audio device info", status) if is_output: count = info.output_count else: count = info.input_count if count: retval.append(decode_device_name(info.name)) return retval finally: pj_rwmutex_unlock_read(self.audio_change_rwlock) cdef object _get_default_sound_device(self, int is_output): cdef pjmedia_aud_dev_info info cdef int dev_id cdef int status with nogil: status = pj_rwmutex_lock_read(self.audio_change_rwlock) if status != 0: raise SIPCoreError('Could not acquire audio_change_rwlock', status) try: if is_output: dev_id = PJMEDIA_AUD_DEFAULT_PLAYBACK_DEV else: dev_id = PJMEDIA_AUD_DEFAULT_CAPTURE_DEV with nogil: status = pjmedia_aud_dev_get_info(dev_id, &info) if status != 0: raise PJSIPError("Could not get audio device info", status) return decode_device_name(info.name) finally: pj_rwmutex_unlock_read(self.audio_change_rwlock) property default_output_device: def __get__(self): self._check_self() return self._get_default_sound_device(1) property default_input_device: def __get__(self): self._check_self() return self._get_default_sound_device(0) property output_devices: def __get__(self): self._check_self() return self._get_sound_devices(1) property input_devices: def __get__(self): self._check_self() return self._get_sound_devices(0) property sound_devices: def __get__(self): self._check_self() cdef int count cdef pjmedia_aud_dev_info info cdef list retval = list() cdef int status with nogil: status = pj_rwmutex_lock_read(self.audio_change_rwlock) if status != 0: raise SIPCoreError('Could not acquire audio_change_rwlock', status) try: for i in range(pjmedia_aud_dev_count()): with nogil: status = pjmedia_aud_dev_get_info(i, &info) if status == 0: retval.append(decode_device_name(info.name)) return retval finally: pj_rwmutex_unlock_read(self.audio_change_rwlock) def refresh_sound_devices(self): self._check_self() cdef int status cdef dict event_dict self.old_devices = self.sound_devices with nogil: status = pj_rwmutex_lock_write(self.audio_change_rwlock) if status != 0: raise SIPCoreError('Could not acquire audio_change_rwlock', status) with nogil: pjmedia_aud_dev_refresh() status = pj_rwmutex_unlock_write(self.audio_change_rwlock) if status != 0: raise SIPCoreError('Could not release audio_change_rwlock', status) event_dict = dict() event_dict["old_devices"] = self.old_devices event_dict["new_devices"] = self.sound_devices _add_event("AudioDevicesDidChange", event_dict) cdef object _get_video_devices(self): cdef pjmedia_vid_dev_info info cdef list retval = list() cdef int direction cdef int status for i in range(pjmedia_vid_dev_count()): with nogil: status = pjmedia_vid_dev_get_info(i, &info) if status != 0: raise PJSIPError("Could not get video device info", status) direction = info.dir if direction in (PJMEDIA_DIR_CAPTURE, PJMEDIA_DIR_CAPTURE_PLAYBACK): if (not self._enable_colorbar_device and bytes(info.driver) == "Colorbar") or bytes(info.driver) == "Null": continue retval.append(decode_device_name(info.name)) return retval cdef object _get_default_video_device(self): cdef pjmedia_vid_dev_info info cdef int status with nogil: status = pjmedia_vid_dev_get_info(PJMEDIA_VID_DEFAULT_CAPTURE_DEV, &info) if status != 0: raise PJSIPError("Could not get default video device info", status) if (not self._enable_colorbar_device and bytes(info.driver) == "Colorbar") or bytes(info.driver) == "Null": raise SIPCoreError("Could not get default video device") return decode_device_name(info.name) def refresh_video_devices(self): self._check_self() cdef int status cdef dict event_dict self.old_video_devices = self.video_devices with nogil: pjmedia_vid_dev_refresh() event_dict = dict() event_dict["old_devices"] = self.old_video_devices event_dict["new_devices"] = self.video_devices _add_event("VideoDevicesDidChange", event_dict) property default_video_device: def __get__(self): self._check_self() return self._get_default_video_device() property video_devices: def __get__(self): self._check_self() return self._get_video_devices() property available_codecs: def __get__(self): self._check_self() return self._pjmedia_endpoint._get_all_codecs() property codecs: def __get__(self): self._check_self() return self._pjmedia_endpoint._get_current_codecs() def __set__(self, value): self._check_self() self._pjmedia_endpoint._set_codecs(value) property available_video_codecs: def __get__(self): self._check_self() return self._pjmedia_endpoint._get_all_video_codecs() property video_codecs: def __get__(self): self._check_self() return self._pjmedia_endpoint._get_current_video_codecs() def __set__(self, value): self._check_self() self._pjmedia_endpoint._set_video_codecs(value) property udp_port: def __get__(self): self._check_self() if self._pjsip_endpoint._udp_transport == NULL: return None return self._pjsip_endpoint._udp_transport.local_name.port def set_udp_port(self, value): cdef int port self._check_self() if value is None: if self._pjsip_endpoint._udp_transport == NULL: return self._pjsip_endpoint._stop_udp_transport() else: port = value if not (0 <= port <= 65535): raise ValueError("Not a valid UDP port: %d" % value) if self._pjsip_endpoint._udp_transport != NULL: if port == self._pjsip_endpoint._udp_transport.local_name.port: return self._pjsip_endpoint._stop_udp_transport() self._pjsip_endpoint._start_udp_transport(port) property tcp_port: def __get__(self): self._check_self() if self._pjsip_endpoint._tcp_transport == NULL: return None return self._pjsip_endpoint._tcp_transport.addr_name.port def set_tcp_port(self, value): cdef int port self._check_self() if value is None: if self._pjsip_endpoint._tcp_transport == NULL: return self._pjsip_endpoint._stop_tcp_transport() else: port = value if not (0 <= port <= 65535): raise ValueError("Not a valid TCP port: %d" % value) if self._pjsip_endpoint._tcp_transport != NULL: if port == self._pjsip_endpoint._tcp_transport.addr_name.port: return self._pjsip_endpoint._stop_tcp_transport() self._pjsip_endpoint._start_tcp_transport(port) property tls_port: def __get__(self): self._check_self() if self._pjsip_endpoint._tls_transport == NULL: return None return self._pjsip_endpoint._tls_transport.addr_name.port property rtp_port_range: def __get__(self): self._check_self() return (self._rtp_port_start, self._rtp_port_start + self._rtp_port_count) def __set__(self, value): cdef int _rtp_port_start cdef int _rtp_port_stop cdef int _rtp_port_count cdef int _rtp_port_usable_count cdef int port self._check_self() for port in value: if not (0 <= port <= 65535): raise SIPCoreError("RTP port range values should be between 0 and 65535") _rtp_port_start, _rtp_port_stop = value _rtp_port_count = _rtp_port_stop - _rtp_port_start _rtp_port_usable_count = _rtp_port_count - _rtp_port_count % 2 # we need an even number of ports, so we won't use the last one if an odd number is provided if _rtp_port_usable_count < 2: raise SIPCoreError("RTP port range should contain at least 2 ports") self._rtp_port_start = _rtp_port_start self._rtp_port_count = _rtp_port_count self._rtp_port_usable_count = _rtp_port_usable_count self._rtp_port_index = 0 property user_agent: def __get__(self): self._check_self() return self._user_agent.str def __set__(self, value): self._check_self() self._user_agent = PJSTR("value") property log_level: def __get__(self): self._check_self() return pj_log_get_level() def __set__(self, value): self._check_self() if value < 0 or value > PJ_LOG_MAX_LEVEL: raise ValueError("Log level should be between 0 and %d" % PJ_LOG_MAX_LEVEL) pj_log_set_level(value) property tls_verify_server: def __get__(self): self._check_self() return bool(self._pjsip_endpoint._tls_verify_server) property tls_ca_file: def __get__(self): self._check_self() if self._pjsip_endpoint._tls_ca_file is None: return None else: return self._pjsip_endpoint._tls_ca_file.str property tls_cert_file: def __get__(self): self._check_self() if self._pjsip_endpoint._tls_cert_file is None: return None else: return self._pjsip_endpoint._tls_cert_file.str property tls_privkey_file: def __get__(self): self._check_self() if self._pjsip_endpoint._tls_privkey_file is None: return None else: return self._pjsip_endpoint._tls_privkey_file.str property tls_timeout: def __get__(self): self._check_self() return self._pjsip_endpoint._tls_timeout def set_tls_options(self, port=None, verify_server=False, ca_file=None, cert_file=None, privkey_file=None, int timeout=3000): cdef int c_port self._check_self() if port is None: if self._pjsip_endpoint._tls_transport == NULL: return self._pjsip_endpoint._stop_tls_transport() else: c_port = port if not (0 <= c_port <= 65535): raise ValueError("Not a valid TCP port: %d" % port) if ca_file is not None and not os.path.isfile(ca_file): raise ValueError("Cannot find the specified CA file: %s" % ca_file) if cert_file is not None and not os.path.isfile(cert_file): raise ValueError("Cannot find the specified certificate file: %s" % cert_file) if privkey_file is not None and not os.path.isfile(privkey_file): raise ValueError("Cannot find the specified private key file: %s" % privkey_file) if timeout < 0: raise ValueError("Invalid TLS timeout value: %d" % timeout) if self._pjsip_endpoint._tls_transport != NULL: self._pjsip_endpoint._stop_tls_transport() self._pjsip_endpoint._tls_verify_server = int(bool(verify_server)) if ca_file is None: self._pjsip_endpoint._tls_ca_file = None else: self._pjsip_endpoint._tls_ca_file = PJSTR(ca_file.encode(sys.getfilesystemencoding())) if cert_file is None: self._pjsip_endpoint._tls_cert_file = None else: self._pjsip_endpoint._tls_cert_file = PJSTR(cert_file.encode(sys.getfilesystemencoding())) if privkey_file is None: self._pjsip_endpoint._tls_privkey_file = None else: self._pjsip_endpoint._tls_privkey_file = PJSTR(privkey_file.encode(sys.getfilesystemencoding())) self._pjsip_endpoint._tls_timeout = timeout self._pjsip_endpoint._start_tls_transport(c_port) def detect_nat_type(self, stun_server_address, stun_server_port=PJ_STUN_PORT, object user_data=None): cdef pj_str_t stun_server_address_pj cdef pj_sockaddr_in stun_server cdef int status self._check_self() if not _is_valid_ip(pj_AF_INET(), stun_server_address): raise ValueError("Not a valid IPv4 address: %s" % stun_server_address) _str_to_pj_str(stun_server_address, &stun_server_address_pj) status = pj_sockaddr_in_init(&stun_server, &stun_server_address_pj, stun_server_port) if status != 0: raise PJSIPError("Could not init STUN server address", status) status = pj_stun_detect_nat_type(&stun_server, &self._stun_cfg, user_data, _cb_detect_nat_type) if status != 0: raise PJSIPError("Could not start NAT type detection", status) Py_INCREF(user_data) def set_nameservers(self, list nameservers): self._check_self() return self._pjsip_endpoint._set_dns_nameservers([n for n in nameservers if _re_ipv4.match(n)]) def set_h264_options(self, profile, level): self._check_self() self._pjmedia_endpoint._set_h264_options(str(profile), int(level.replace('.', ''))) def set_video_options(self, max_resolution, int max_framerate, object max_bitrate): self._check_self() self._pjmedia_endpoint._set_video_options(tuple(max_resolution), max_framerate, max_bitrate or 0.0) property zrtp_cache: def __get__(self): self._check_self() return self._zrtp_cache def __set__(self, value): self._check_self() if value is None: value = os.path.join(tempfile.gettempdir(), 'zrtp_cache_%d.db' % os.getpid()) self._zrtp_cache = value def __dealloc__(self): self.dealloc() def dealloc(self): global _ua, _dealloc_handler_queue, _event_queue_lock if _ua == NULL: return self._check_thread() pjmedia_aud_dev_set_observer_cb(NULL) if self.audio_change_rwlock != NULL: pj_rwmutex_destroy(self.audio_change_rwlock) self.audio_change_rwlock = NULL if self.video_lock != NULL: pj_mutex_destroy(self.video_lock) self.video_lock = NULL _process_handler_queue(self, &_dealloc_handler_queue) if _event_queue_lock != NULL: pj_mutex_lock(_event_queue_lock) pj_mutex_destroy(_event_queue_lock) _event_queue_lock = NULL self._pjsip_endpoint = None self._pjmedia_endpoint = None self._caching_pool = None self._pjlib = None _ua = NULL self._poll_log() cdef int _poll_log(self) except -1: cdef object event_name cdef dict event_params cdef list events events = _get_clear_event_queue() for event_name, event_params in events: self._event_handler(event_name, **event_params) def poll(self): global _post_poll_handler_queue cdef int status cdef double now cdef object retval = None cdef float max_timeout cdef pj_time_val pj_max_timeout cdef list timers cdef Timer timer self._check_self() max_timeout = 0.100 while self._timers: if not (self._timers[0])._scheduled: # timer was cancelled heapq.heappop(self._timers) else: max_timeout = min(max((self._timers[0]).schedule_time - time.time(), 0.0), max_timeout) break pj_max_timeout.sec = int(max_timeout) pj_max_timeout.msec = int(max_timeout * 1000) % 1000 with nogil: status = pjsip_endpt_handle_events(self._pjsip_endpoint._obj, &pj_max_timeout) IF UNAME_SYSNAME == "Darwin": if status not in [0, PJ_ERRNO_START_SYS + errno.EBADF]: raise PJSIPError("Error while handling events", status) ELSE: if status != 0: raise PJSIPError("Error while handling events", status) _process_handler_queue(self, &_post_poll_handler_queue) timers = list() now = time.time() while self._timers: if not (self._timers[0])._scheduled: # timer was cancelled heapq.heappop(self._timers) elif (self._timers[0]).schedule_time <= now: # timer needs to be processed timer = heapq.heappop(self._timers) timers.append(timer) else: break for timer in timers: timer.call() self._poll_log() if self._fatal_error: return True else: return False cdef int _handle_exception(self, int is_fatal) except -1: cdef object exc_type cdef object exc_val cdef object exc_tb exc_type, exc_val, exc_tb = sys.exc_info() if is_fatal: self._fatal_error = is_fatal _add_event("SIPEngineGotException", dict(type=exc_type, value=exc_val, traceback="".join(traceback.format_exception(exc_type, exc_val, exc_tb)))) return 0 cdef int _check_self(self) except -1: global _ua if _ua == NULL: raise SIPCoreError("The PJSIPUA is no longer running") self._check_thread() cdef int _check_thread(self) except -1: if not pj_thread_is_registered(): self._threads.append(PJSIPThread()) return 0 cdef int _add_timer(self, Timer timer) except -1: heapq.heappush(self._timers, timer) return 0 cdef int _remove_timer(self, Timer timer) except -1: # Don't remove it from the heap, just mark it as not scheduled timer._scheduled = 0 return 0 cdef int _cb_rx_request(self, pjsip_rx_data *rdata) except 0: global _event_hdr_name cdef int status cdef int bad_request cdef pjsip_tx_data *tdata = NULL cdef pjsip_hdr_ptr_const hdr_add cdef IncomingRequest request cdef Invitation inv cdef IncomingSubscription sub cdef IncomingReferral ref cdef list extra_headers cdef dict event_dict cdef dict message_params cdef pj_str_t tsx_key cdef pjsip_via_hdr *top_via cdef pjsip_via_hdr *via cdef pjsip_transaction *tsx = NULL cdef unsigned int options = PJSIP_INV_SUPPORT_100REL cdef pjsip_event_hdr *event_hdr cdef object method_name = _pj_str_to_str(rdata.msg_info.msg.line.req.method.name) if method_name != "ACK": if self._detect_sip_loops: # Temporarily trick PJSIP into believing the last Via header is actually the first top_via = via = rdata.msg_info.via while True: rdata.msg_info.via = via via = pjsip_msg_find_hdr(rdata.msg_info.msg, PJSIP_H_VIA, ( via).next) if via == NULL: break status = pjsip_tsx_create_key(rdata.tp_info.pool, &tsx_key, PJSIP_ROLE_UAC, &rdata.msg_info.msg.line.req.method, rdata) rdata.msg_info.via = top_via if status != 0: raise PJSIPError("Could not generate transaction key for incoming request", status) tsx = pjsip_tsx_layer_find_tsx(&tsx_key, 0) if tsx != NULL: status = pjsip_endpt_create_response(self._pjsip_endpoint._obj, rdata, 482, NULL, &tdata) if status != 0: raise PJSIPError("Could not create response", status) elif method_name in self._incoming_requests: request = IncomingRequest() request.init(self, rdata) elif method_name == "OPTIONS": status = pjsip_endpt_create_response(self._pjsip_endpoint._obj, rdata, 200, NULL, &tdata) if status != 0: raise PJSIPError("Could not create response", status) for hdr_type in [PJSIP_H_ALLOW, PJSIP_H_ACCEPT, PJSIP_H_SUPPORTED]: hdr_add = pjsip_endpt_get_capability(self._pjsip_endpoint._obj, hdr_type, NULL) if hdr_add != NULL: pjsip_msg_add_hdr(tdata.msg, pjsip_hdr_clone(tdata.pool, hdr_add)) elif method_name == "INVITE": status = pjsip_inv_verify_request(rdata, &options, NULL, NULL, self._pjsip_endpoint._obj, &tdata) if status == 0: inv = Invitation() inv.init_incoming(self, rdata, options) elif method_name == "SUBSCRIBE": event_hdr = pjsip_msg_find_hdr_by_name(rdata.msg_info.msg, &_event_hdr_name.pj_str, NULL) if event_hdr == NULL or _pj_str_to_str(event_hdr.event_type) not in self._incoming_events: status = pjsip_endpt_create_response(self._pjsip_endpoint._obj, rdata, 489, NULL, &tdata) if status != 0: raise PJSIPError("Could not create response", status) else: sub = IncomingSubscription() sub.init(self, rdata, _pj_str_to_str(event_hdr.event_type)) elif method_name == "REFER": ref = IncomingReferral() ref.init(self, rdata) elif method_name == "MESSAGE": bad_request = 0 extra_headers = list() message_params = dict() event_dict = dict() _pjsip_msg_to_dict(rdata.msg_info.msg, event_dict) message_params["request_uri"] = event_dict["request_uri"] message_params["from_header"] = event_dict["headers"].get("From", None) message_params["to_header"] = event_dict["headers"].get("To", None) message_params["headers"] = event_dict["headers"] message_params["body"] = event_dict["body"] content_type = message_params["headers"].get("Content-Type", None) if content_type is not None: message_params["content_type"] = content_type.content_type if message_params["headers"].get("Content-Length", 0) > 0 and message_params["body"] is None: bad_request = 1 extra_headers.append(WarningHeader(399, "local", "Missing body")) else: message_params["content_type"] = None if message_params["headers"].get("Content-Length", 0) > 0 and message_params["body"] is None: bad_request = 1 extra_headers.append(WarningHeader(399, "local", "Missing Content-Type header")) if bad_request: status = pjsip_endpt_create_response(self._pjsip_endpoint._obj, rdata, 400, NULL, &tdata) if status != 0: raise PJSIPError("Could not create response", status) _add_headers_to_tdata(tdata, extra_headers) else: _add_event("SIPEngineGotMessage", message_params) status = pjsip_endpt_create_response(self._pjsip_endpoint._obj, rdata, 200, NULL, &tdata) if status != 0: raise PJSIPError("Could not create response", status) elif method_name != "ACK": status = pjsip_endpt_create_response(self._pjsip_endpoint._obj, rdata, 405, NULL, &tdata) if status != 0: raise PJSIPError("Could not create response", status) if tdata != NULL: status = pjsip_endpt_send_response2(self._pjsip_endpoint._obj, rdata, tdata, NULL, NULL) if status != 0: pjsip_tx_data_dec_ref(tdata) raise PJSIPError("Could not send response", status) return 1 cdef class PJSIPThread: def __cinit__(self): str_id = "python_%d" % id(self) cdef object thread_name = str_id.encode() cdef int status status = pj_thread_register(thread_name, self._thread_desc, &self._obj) if status != 0: raise PJSIPError("Error while registering thread", status) # callback functions cdef void _cb_audio_dev_process_event(pjmedia_aud_dev_event event) with gil: cdef PJSIPUA ua event_dict = dict() try: ua = _get_ua() except: return try: if event in (PJMEDIA_AUD_DEV_DEFAULT_INPUT_CHANGED, PJMEDIA_AUD_DEV_DEFAULT_OUTPUT_CHANGED): event_dict["changed_input"] = event == PJMEDIA_AUD_DEV_DEFAULT_INPUT_CHANGED event_dict["changed_output"] = event == PJMEDIA_AUD_DEV_DEFAULT_OUTPUT_CHANGED _add_event("DefaultAudioDeviceDidChange", event_dict) elif event == PJMEDIA_AUD_DEV_LIST_WILL_REFRESH: ua.old_devices = ua.sound_devices with nogil: status = pj_rwmutex_lock_write(ua.audio_change_rwlock) if status != 0: raise SIPCoreError('Could not acquire audio_change_rwlock for writing', status) elif event == PJMEDIA_AUD_DEV_LIST_DID_REFRESH: with nogil: status = pj_rwmutex_unlock_write(ua.audio_change_rwlock) if status != 0: raise SIPCoreError('Could not release the audio_change_rwlock', status) event_dict["old_devices"] = ua.old_devices event_dict["new_devices"] = ua.sound_devices _add_event("AudioDevicesDidChange", event_dict) except: ua._handle_exception(1) cdef void _cb_detect_nat_type(void *user_data, pj_stun_nat_detect_result_ptr_const res) with gil: cdef PJSIPUA ua cdef dict event_dict cdef object user_data_obj = user_data Py_DECREF(user_data_obj) try: ua = _get_ua() except: return try: event_dict = dict() event_dict["succeeded"] = res.status == 0 event_dict["user_data"] = user_data_obj if res.status == 0: event_dict["nat_type"] = res.nat_type_name else: event_dict["error"] = res.status_text _add_event("SIPEngineDetectedNATType", event_dict) except: ua._handle_exception(0) cdef int _PJSIPUA_cb_rx_request(pjsip_rx_data *rdata) with gil: cdef PJSIPUA ua try: ua = _get_ua() except: return 0 try: return ua._cb_rx_request(rdata) except: ua._handle_exception(0) cdef int _cb_opus_fix_tx(pjsip_tx_data *tdata) with gil: cdef PJSIPUA ua cdef pjsip_msg_body *body cdef pjsip_msg_body *new_body cdef pjmedia_sdp_session *sdp cdef pjmedia_sdp_media *media cdef pjmedia_sdp_attr *attr cdef int i cdef int j cdef pj_str_t new_value try: ua = _get_ua() except: return 0 try: if tdata != NULL and tdata.msg != NULL: body = tdata.msg.body if body != NULL and _pj_str_to_str(body.content_type.type).lower() == "application" and _pj_str_to_str(body.content_type.subtype).lower() == "sdp": new_body = pjsip_msg_body_clone(tdata.pool, body) sdp = new_body.data for i in range(sdp.media_count): media = sdp.media[i] if _pj_str_to_str(media.desc.media).lower() != "audio": continue for j in range(media.attr_count): attr = media.attr[j] if _pj_str_to_str(attr.name).lower() != "rtpmap": continue attr_value = _pj_str_to_str(attr.value).lower() pos = attr_value.find("opus") if pos == -1: continue # this is the opus rtpmap attribute opus_line = attr_value[:pos] + "opus/48000/2" new_value.slen = len(opus_line) new_value.ptr = pj_pool_alloc(tdata.pool, new_value.slen) memcpy(new_value.ptr, PyBytes_AsString(opus_line), new_value.slen) attr.value = new_value break tdata.msg.body = new_body except: ua._handle_exception(0) return 0 cdef int _cb_opus_fix_rx(pjsip_rx_data *rdata) with gil: cdef PJSIPUA ua cdef pjsip_msg_body *body cdef int pos1 cdef int pos2 cdef char *body_ptr try: ua = _get_ua() except: return 0 try: if rdata != NULL and rdata.msg_info.msg != NULL: body = rdata.msg_info.msg.body if body != NULL and _pj_str_to_str(body.content_type.type).lower() == "application" and _pj_str_to_str(body.content_type.subtype).lower() == "sdp": body_ptr = body.data body_str = _pj_buf_len_to_str(body_ptr, body.len).lower() pos1 = body_str.find("opus/48000") if pos1 != -1: pos2 = body_str.find("opus/48000/2") if pos2 != -1: memcpy(body_ptr + pos2 + 11, '1', 1) else: # old opus, we must make it fail memcpy(body_ptr + pos1 + 5, 'XXXXX', 5) except: ua._handle_exception(0) return 0 cdef int _cb_trace_rx(pjsip_rx_data *rdata) with gil: cdef PJSIPUA ua try: ua = _get_ua() except: return 0 try: if ua._trace_sip: _add_event("SIPEngineSIPTrace", dict(received=True, source_ip=rdata.pkt_info.src_name, source_port=rdata.pkt_info.src_port, destination_ip=_pj_str_to_str(rdata.tp_info.transport.local_name.host), destination_port=rdata.tp_info.transport.local_name.port, data=_pj_buf_len_to_str(rdata.pkt_info.packet, rdata.pkt_info.len), transport=rdata.tp_info.transport.type_name)) except: ua._handle_exception(0) return 0 cdef int _cb_trace_tx(pjsip_tx_data *tdata) with gil: cdef PJSIPUA ua try: ua = _get_ua() except: return 0 try: if ua._trace_sip: _add_event("SIPEngineSIPTrace", dict(received=False, source_ip=_pj_str_to_str(tdata.tp_info.transport.local_name.host), source_port=tdata.tp_info.transport.local_name.port, destination_ip=tdata.tp_info.dst_name, destination_port=tdata.tp_info.dst_port, data=_pj_buf_len_to_str(tdata.buf.start, tdata.buf.cur - tdata.buf.start), transport=tdata.tp_info.transport.type_name)) except: ua._handle_exception(0) return 0 cdef int _cb_add_user_agent_hdr(pjsip_tx_data *tdata) with gil: cdef PJSIPUA ua cdef pjsip_hdr *hdr cdef void *found_hdr try: ua = _get_ua() except: return 0 try: found_hdr = pjsip_msg_find_hdr_by_name(tdata.msg, &_user_agent_hdr_name.pj_str, NULL) if found_hdr == NULL: hdr = pjsip_generic_string_hdr_create(tdata.pool, &_user_agent_hdr_name.pj_str, &ua._user_agent.pj_str) if hdr == NULL: raise SIPCoreError('Could not add "User-Agent" header to outgoing request') pjsip_msg_add_hdr(tdata.msg, hdr) except: ua._handle_exception(0) return 0 cdef int _cb_add_server_hdr(pjsip_tx_data *tdata) with gil: cdef PJSIPUA ua cdef pjsip_hdr *hdr cdef void *found_hdr try: ua = _get_ua() except: return 0 try: found_hdr = pjsip_msg_find_hdr_by_name(tdata.msg, &_server_hdr_name.pj_str, NULL) if found_hdr == NULL: hdr = pjsip_generic_string_hdr_create(tdata.pool, &_server_hdr_name.pj_str, &ua._user_agent.pj_str) if hdr == NULL: raise SIPCoreError('Could not add "Server" header to outgoing response') pjsip_msg_add_hdr(tdata.msg, hdr) except: ua._handle_exception(0) return 0 # functions cdef PJSIPUA _get_ua(): global _ua cdef PJSIPUA ua if _ua == NULL: raise SIPCoreError("PJSIPUA is not instantiated") ua = _ua ua._check_thread() return ua cdef int deallocate_weakref(object weak_ref, object timer) except -1 with gil: Py_DECREF(weak_ref) # globals cdef void *_ua = NULL cdef PJSTR _user_agent_hdr_name = PJSTR("User-Agent") cdef PJSTR _server_hdr_name = PJSTR("Server") cdef PJSTR _event_hdr_name = PJSTR("Event") cdef object _re_ipv4 = re.compile(r"^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})$") diff --git a/sipsimple/core/_core.util.pxi b/sipsimple/core/_core.util.pxi index 31586f0a..c7be8ad1 100644 --- a/sipsimple/core/_core.util.pxi +++ b/sipsimple/core/_core.util.pxi @@ -1,422 +1,424 @@ import platform import re import sys from application.version import Version cdef class PJSTR: def __cinit__(self, str): self.str = str _str_to_pj_str(str, &self.pj_str) def __str__(self): return self.str cdef class SIPStatusMessages: cdef object _default_status def __cinit__(self, *args, **kwargs): self._default_status = _pj_str_to_str(pjsip_get_status_text(0)[0]) def __getitem__(self, int val): cdef object _status _status = _pj_str_to_str(pjsip_get_status_text(val)[0]) if _status == self._default_status: raise IndexError("Unknown SIP response code: %d" % val) return _status cdef class frozenlist: def __cinit__(self, *args, **kw): self.list = list() self.initialized = 0 self.hash = 0 def __init__(self, *args, **kw): if not self.initialized: self.list = list(*args, **kw) self.initialized = 1 self.hash = hash(tuple(self.list)) def __reduce__(self): return (self.__class__, (self.list,), None) def __repr__(self): return "frozenlist(%r)" % self.list def __len__(self): return self.list.__len__() def __hash__(self): return self.hash def __iter__(self): return self.list.__iter__() def __cmp__(self, frozenlist other): return self.list.__cmp__(other.list) def __richcmp__(frozenlist self, other, op): if isinstance(other, frozenlist): other = (other).list if op == 0: return self.list.__cmp__(other) < 0 elif op == 1: return self.list.__cmp__(other) <= 0 elif op == 2: return self.list.__eq__(other) elif op == 3: return self.list.__ne__(other) elif op == 4: return self.list.__cmp__(other) > 0 elif op == 5: return self.list.__cmp__(other) >= 0 else: return NotImplemented def __contains__(self, item): return self.list.__contains__(item) def __getitem__(self, key): return self.list.__getitem__(key) def __add__(first, second): if isinstance(first, frozenlist): first = (first).list if isinstance(second, frozenlist): second = (second).list return frozenlist(first+second) def __mul__(first, second): if isinstance(first, frozenlist): first = (first).list if isinstance(second, frozenlist): second = (second).list return frozenlist(first*second) def __reversed__(self): return self.list.__reversed__() def count(self, elem): return self.list.count(elem) def index(self, elem): return self.list.index(elem) cdef class frozendict: def __cinit__(self, *args, **kw): self.dict = dict() self.initialized = 0 def __init__(self, *args, **kw): if not self.initialized: self.dict = dict(*args, **kw) self.initialized = 1 self.hash = hash(tuple(self.dict.iteritems())) def __reduce__(self): return (self.__class__, (self.dict,), None) def __repr__(self): return "frozendict(%r)" % self.dict def __len__(self): return self.dict.__len__() def __hash__(self): return self.hash def __iter__(self): return self.dict.__iter__() def __cmp__(self, frozendict other): return self.dict.__cmp__(other.dict) def __richcmp__(frozendict self, other, op): if isinstance(other, frozendict): other = (other).dict if op == 0: return self.dict.__cmp__(other) < 0 elif op == 1: return self.dict.__cmp__(other) <= 0 elif op == 2: return self.dict.__eq__(other) elif op == 3: return self.dict.__ne__(other) elif op == 4: return self.dict.__cmp__(other) > 0 elif op == 5: return self.dict.__cmp__(other) >= 0 else: return NotImplemented def __contains__(self, item): return self.dict.__contains__(item) def __getitem__(self, key): return self.dict.__getitem__(key) def copy(self): return self def get(self, *args): return self.dict.get(*args) def has_key(self, key): return self.dict.has_key(key) def items(self): return list(self.dict.items()) def iteritems(self): return list(self.dict.items()) def iterkeys(self): return list(self.dict.keys()) def itervalues(self): return list(self.dict.values()) def keys(self): return list(self.dict.keys()) def values(self): return list(self.dict.values()) # functions cdef int _str_to_pj_str(object string, pj_str_t *pj_str) except -1: # Feed data from Python to PJSIP - # TODO: convert to Python3 - bytes_string = string.encode() + # Python 2 old version + # pj_str.ptr = PyString_AsString(string) + # pj_str.slen = len(string) + + # Python 3 new version + bytes_string = string.encode() if isinstance(string, str) else string pj_str.ptr = PyBytes_AsString(bytes_string) pj_str.slen = len(bytes_string) - print("Encoded STR %s to PJS %s" % (string, pj_str.ptr)) + # print("Encoded STR %s to PJS %s" % (string, pj_str.ptr)) cdef object _pj_str_to_str(pj_str_t pj_str): # Feed data from PJSIP to the Python + # Python 2 old version + # return PyString_FromStringAndSize(pj_str.ptr, pj_str.slen) + + # Python 3 new version bytes_string = PyBytes_FromStringAndSize(pj_str.ptr, pj_str.slen) - string = bytes_string.decode() - print("Decoded PJS %s to STR %s" % (pj_str.ptr, string)) - return string + return bytes_string.decode() cdef object _pj_buf_len_to_str(object buf, int buf_len): - # TODO: convert to Python3 return PyBytes_FromStringAndSize(buf, buf_len) cdef object _buf_to_str(object buf): - # TODO: convert to Python3 return PyBytes_FromString(buf) cdef object _str_as_str(object string): - # TODO: convert to Python3 return PyBytes_AsString(string) cdef object _str_as_size(object string): - # TODO: convert to Python3 return PyBytes_Size(string) cdef object _pj_status_to_str(int status): cdef char buf[PJ_ERR_MSG_SIZE] return _pj_str_to_str(pj_strerror(status, buf, PJ_ERR_MSG_SIZE)) cdef object _pj_status_to_def(int status): return _re_pj_status_str_def.match(_pj_status_to_str(status)).group(1) cdef dict _pjsip_param_to_dict(pjsip_param *param_list): cdef pjsip_param *param cdef dict retval = dict() param = ( param_list).next while param != param_list: if param.value.slen == 0: retval[_pj_str_to_str(param.name)] = None else: retval[_pj_str_to_str(param.name)] = _pj_str_to_str(param.value) param = ( param).next return retval cdef int _dict_to_pjsip_param(object params, pjsip_param *param_list, pj_pool_t *pool): cdef pjsip_param *param = NULL for name, value in params.iteritems(): param = pj_pool_alloc(pool, sizeof(pjsip_param)) if param == NULL: return -1 _str_to_pj_str(name, ¶m.name) if value is None: param.value.slen = 0 else: _str_to_pj_str(value, ¶m.value) pj_list_insert_after( param_list, param) return 0 cdef int _pjsip_msg_to_dict(pjsip_msg *msg, dict info_dict) except -1: cdef pjsip_msg_body *body cdef pjsip_hdr *header cdef pjsip_generic_array_hdr *array_header cdef pjsip_ctype_hdr *ctype_header cdef pjsip_cseq_hdr *cseq_header cdef char *buf cdef int buf_len, i, status headers = {} header = ( &msg.hdr).next while header != &msg.hdr: header_name = _pj_str_to_str(header.name) header_data = None multi_header = False if header_name in ("Accept", "Allow", "Require", "Supported", "Unsupported", "Allow-Events"): array_header = header header_data = [] for i from 0 <= i < array_header.count: header_data.append(_pj_str_to_str(array_header.values[i])) elif header_name == "Contact": multi_header = True header_data = FrozenContactHeader_create( header) elif header_name == "Content-Length": header_data = ( header).len elif header_name == "Content-Type": header_data = FrozenContentTypeHeader_create( header) elif header_name == "CSeq": cseq_header = header header_data = (cseq_header.cseq, _pj_str_to_str(cseq_header.method.name)) elif header_name in ("Expires", "Max-Forwards", "Min-Expires"): header_data = ( header).ivalue elif header_name == "From": header_data = FrozenFromHeader_create( header) elif header_name == "To": header_data = FrozenToHeader_create( header) elif header_name == "Route": multi_header = True header_data = FrozenRouteHeader_create( header) elif header_name == "Reason": value = _pj_str_to_str((header).hvalue) protocol, sep, params_str = value.partition(';') params = frozendict([(name, value or None) for name, sep, value in [param.partition('=') for param in params_str.split(';')]]) header_data = FrozenReasonHeader(protocol, params) elif header_name == "Record-Route": multi_header = True header_data = FrozenRecordRouteHeader_create( header) elif header_name == "Retry-After": header_data = FrozenRetryAfterHeader_create( header) elif header_name == "Via": multi_header = True header_data = FrozenViaHeader_create( header) elif header_name == "Warning": match = _re_warning_hdr.match(_pj_str_to_str((header).hvalue)) if match is not None: warning_params = match.groupdict() warning_params['code'] = int(warning_params['code']) header_data = FrozenWarningHeader(**warning_params) elif header_name == "Event": header_data = FrozenEventHeader_create( header) elif header_name == "Subscription-State": header_data = FrozenSubscriptionStateHeader_create( header) elif header_name == "Refer-To": header_data = FrozenReferToHeader_create( header) elif header_name == "Subject": header_data = FrozenSubjectHeader_create( header) elif header_name == "Replaces": header_data = FrozenReplacesHeader_create( header) # skip the following headers: elif header_name not in ("Authorization", "Proxy-Authenticate", "Proxy-Authorization", "WWW-Authenticate"): header_data = FrozenHeader(header_name, _pj_str_to_str(( header).hvalue)) if header_data is not None: if multi_header: headers.setdefault(header_name, []).append(header_data) else: if header_name not in headers: headers[header_name] = header_data header = ( header).next info_dict["headers"] = headers body = msg.body if body == NULL: info_dict["body"] = None else: status = pjsip_print_body(body, &buf, &buf_len) if status != 0: info_dict["body"] = None else: info_dict["body"] = _pj_buf_len_to_str(buf, buf_len) if msg.type == PJSIP_REQUEST_MSG: info_dict["method"] = _pj_str_to_str(msg.line.req.method.name) # You need to call pjsip_uri_get_uri on the request URI if the message is for transmitting, # but it isn't required if message is one received. Otherwise, a seg fault occurs. Don't ask. info_dict["request_uri"] = FrozenSIPURI_create(pjsip_uri_get_uri(msg.line.req.uri)) else: info_dict["code"] = msg.line.status.code info_dict["reason"] = _pj_str_to_str(msg.line.status.reason) return 0 cdef int _is_valid_ip(int af, object ip) except -1: cdef char buf[16] cdef pj_str_t src cdef int status _str_to_pj_str(ip, &src) status = pj_inet_pton(af, &src, buf) if status == 0: return 1 else: return 0 cdef int _get_ip_version(object ip) except -1: if _is_valid_ip(pj_AF_INET(), ip): return pj_AF_INET() elif _is_valid_ip(pj_AF_INET6(), ip): return pj_AF_INET() else: return 0 cdef int _add_headers_to_tdata(pjsip_tx_data *tdata, object headers) except -1: cdef pj_str_t name_pj, value_pj cdef pjsip_hdr *hdr for header in headers: _str_to_pj_str(header.name, &name_pj) _str_to_pj_str(header.body, &value_pj) hdr = pjsip_generic_string_hdr_create(tdata.pool, &name_pj, &value_pj) pjsip_msg_add_hdr(tdata.msg, hdr) cdef int _remove_headers_from_tdata(pjsip_tx_data *tdata, object headers) except -1: cdef pj_str_t header_name_pj cdef pjsip_hdr *hdr for header in headers: _str_to_pj_str(header, &header_name_pj) hdr = pjsip_msg_find_remove_hdr_by_name(tdata.msg, &header_name_pj, NULL) cdef int _BaseSIPURI_to_pjsip_sip_uri(BaseSIPURI uri, pjsip_sip_uri *pj_uri, pj_pool_t *pool) except -1: cdef pjsip_param *param pjsip_sip_uri_init(pj_uri, uri.secure) if uri.user: _str_to_pj_str(uri.user, &pj_uri.user) if uri.password: _str_to_pj_str(uri.password, &pj_uri.passwd) if uri.host: _str_to_pj_str(uri.host, &pj_uri.host) if uri.port: pj_uri.port = uri.port for name, value in uri.parameters.iteritems(): if name == "lr": pj_uri.lr_param = 1 elif name == "maddr": _str_to_pj_str(value, &pj_uri.maddr_param) elif name == "method": _str_to_pj_str(value, &pj_uri.method_param) elif name == "transport": _str_to_pj_str(value, &pj_uri.transport_param) elif name == "ttl": pj_uri.ttl_param = int(value) elif name == "user": _str_to_pj_str(value, &pj_uri.user_param) else: param = pj_pool_alloc(pool, sizeof(pjsip_param)) _str_to_pj_str(name, ¶m.name) if value is None: param.value.slen = 0 else: _str_to_pj_str(value, ¶m.value) pj_list_insert_after( &pj_uri.other_param, param) _dict_to_pjsip_param(uri.headers, &pj_uri.header_param, pool) return 0 cdef int _BaseRouteHeader_to_pjsip_route_hdr(BaseIdentityHeader header, pjsip_route_hdr *pj_header, pj_pool_t *pool) except -1: cdef pjsip_param *param cdef pjsip_sip_uri *sip_uri pjsip_route_hdr_init(NULL, pj_header) sip_uri = pj_pool_alloc(pool, sizeof(pjsip_sip_uri)) _BaseSIPURI_to_pjsip_sip_uri(header.uri, sip_uri, pool) pj_header.name_addr.uri = sip_uri if header.display_name: _str_to_pj_str(header.display_name.encode('utf-8'), &pj_header.name_addr.display) _dict_to_pjsip_param(header.parameters, &pj_header.other_param, pool) return 0 def _get_device_name_encoding(): if sys.platform == 'win32': encoding = 'mbcs' elif sys.platform.startswith('linux2') and Version.parse(platform.release()) < Version(2,6,31): encoding = 'latin1' else: encoding = 'utf-8' return encoding _device_name_encoding = _get_device_name_encoding() def decode_device_name(device_name): # ignore decoding errors, some systems (I'm looking at you, OSX), seem to misbehave return device_name.decode(_device_name_encoding, 'ignore') # globals cdef object _re_pj_status_str_def = re.compile("^.*\((.*)\)$") cdef object _re_warning_hdr = re.compile('(?P[0-9]{3}) (?P.*?) "(?P.*?)"') sip_status_messages = SIPStatusMessages() diff --git a/sipsimple/core/_primitives.py b/sipsimple/core/_primitives.py index 5f5eeeda..35c3baae 100644 --- a/sipsimple/core/_primitives.py +++ b/sipsimple/core/_primitives.py @@ -1,314 +1,316 @@ """ Implements a high-level mechanism for SIP methods that can be used for non-session based operations like REGISTER, SUBSCRIBE, PUBLISH and MESSAGE. """ __all__ = ["Message", "Registration", "Publication", "PublicationError", "PublicationETagError"] from threading import RLock from application.notification import IObserver, NotificationCenter, NotificationData from application.python import Null from zope.interface import implementer from sipsimple.core._core import ContactHeader, Header, Request, RouteHeader, SIPCoreError, SIPURI, ToHeader @implementer(IObserver) class Registration(object): def __init__(self, from_header, credentials=None, duration=300, extra_headers=None): self.from_header = from_header self.credentials = credentials self.duration = duration self.extra_headers = extra_headers or [] self._current_request = None self._last_request = None self._unregistering = False self._lock = RLock() is_registered = property(lambda self: self._last_request is not None) contact_uri = property(lambda self: None if self._last_request is None else self._last_request.contact_uri) expires_in = property(lambda self: 0 if self._last_request is None else self._last_request.expires_in) peer_address = property(lambda self: None if self._last_request is None else self._last_request.peer_address) def register(self, contact_header, route_header, timeout=None): with self._lock: try: self._make_and_send_request(contact_header, route_header, timeout, True) except SIPCoreError as e: notification_center = NotificationCenter() notification_center.post_notification('SIPRegistrationDidFail', sender=self, data=NotificationData(code=0, reason=e.args[0], route_header=route_header)) def end(self, timeout=None): with self._lock: if self._last_request is None: return notification_center = NotificationCenter() notification_center.post_notification('SIPRegistrationWillEnd', sender=self) try: self._make_and_send_request(ContactHeader.new(self._last_request.contact_header), RouteHeader.new(self._last_request.route_header), timeout, False) except SIPCoreError as e: notification_center.post_notification('SIPRegistrationDidNotEnd', sender=self, data=NotificationData(code=0, reason=e.args[0])) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPRequestDidSucceed(self, notification): request = notification.sender with self._lock: if request is not self._current_request: return self._current_request = None if self._unregistering: if self._last_request is not None: self._last_request.end() self._last_request = None notification.center.post_notification('SIPRegistrationDidEnd', sender=self, data=NotificationData(expired=False)) else: self._last_request = request try: contact_header_list = notification.data.headers["Contact"] except KeyError: contact_header_list = [] notification.center.post_notification('SIPRegistrationDidSucceed', sender=self, data=NotificationData(code=notification.data.code, reason=notification.data.reason, contact_header=request.contact_header, contact_header_list=contact_header_list, expires_in=notification.data.expires, route_header=request.route_header)) def _NH_SIPRequestDidFail(self, notification): request = notification.sender with self._lock: if request is not self._current_request: return self._current_request = None if self._unregistering: notification.center.post_notification('SIPRegistrationDidNotEnd', sender=self, data=NotificationData(code=notification.data.code, reason=notification.data.reason)) else: if hasattr(notification.data, 'headers'): min_expires = notification.data.headers.get('Min-Expires', None) else: min_expires = None notification.center.post_notification('SIPRegistrationDidFail', sender=self, data=NotificationData(code=notification.data.code, reason=notification.data.reason, route_header=request.route_header, min_expires=min_expires)) def _NH_SIPRequestWillExpire(self, notification): with self._lock: if notification.sender is not self._last_request: return notification.center.post_notification('SIPRegistrationWillExpire', sender=self, data=NotificationData(expires=notification.data.expires)) def _NH_SIPRequestDidEnd(self, notification): request = notification.sender with self._lock: notification.center.remove_observer(self, sender=request) if request is not self._last_request: return self._last_request = None if self._current_request is not None: self._current_request.end() self._current_request = None notification.center.post_notification('SIPRegistrationDidEnd', sender=self, data=NotificationData(expired=True)) def _make_and_send_request(self, contact_header, route_header, timeout, do_register): notification_center = NotificationCenter() prev_request = self._current_request or self._last_request if prev_request is not None: call_id = prev_request.call_id cseq = prev_request.cseq + 1 else: call_id = None cseq = 1 extra_headers = [] extra_headers.append(Header("Expires", str(int(self.duration) if do_register else 0))) extra_headers.extend(self.extra_headers) - request = Request("REGISTER", SIPURI(self.from_header.uri.host), self.from_header, ToHeader.new(self.from_header), route_header, + uri = SIPURI(self.from_header.uri.host) + to_header = ToHeader.new(self.from_header) + request = Request("REGISTER", uri, self.from_header, to_header, route_header, credentials=self.credentials, contact_header=contact_header, call_id=call_id, cseq=cseq, extra_headers=extra_headers) notification_center.add_observer(self, sender=request) if self._current_request is not None: # we are trying to send something already, cancel whatever it is self._current_request.end() self._current_request = None try: request.send(timeout=timeout) except: notification_center.remove_observer(self, sender=request) raise self._unregistering = not do_register self._current_request = request @implementer(IObserver) class Message(object): def __init__(self, from_header, to_header, route_header, content_type, body, credentials=None, extra_headers=None): self._request = Request("MESSAGE", to_header.uri, from_header, to_header, route_header, credentials=credentials, extra_headers=extra_headers, content_type=content_type, body=body) self._lock = RLock() from_header = property(lambda self: self._request.from_header) to_header = property(lambda self: self._request.to_header) route_header = property(lambda self: self._request.route_header) content_type = property(lambda self: self._request.content_type) body = property(lambda self: self._request.body) credentials = property(lambda self: self._request.credentials) is_sent = property(lambda self: self._request.state != "INIT") in_progress = property(lambda self: self._request.state == "IN_PROGRESS") peer_address = property(lambda self: self._request.peer_address) def send(self, timeout=None): notification_center = NotificationCenter() with self._lock: if self.is_sent: raise RuntimeError("This MESSAGE was already sent") notification_center.add_observer(self, sender=self._request) try: self._request.send(timeout) except: notification_center.remove_observer(self, sender=self._request) raise def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPRequestDidSucceed(self, notification): if notification.data.expires: # this shouldn't happen really notification.sender.end() notification.center.post_notification('SIPMessageDidSucceed', sender=self, data=notification.data) def _NH_SIPRequestDidFail(self, notification): notification.center.post_notification('SIPMessageDidFail', sender=self, data=notification.data) def _NH_SIPRequestDidEnd(self, notification): notification.center.remove_observer(self, sender=notification.sender) class PublicationError(Exception): pass class PublicationETagError(PublicationError): pass @implementer(IObserver) class Publication(object): def __init__(self, from_header, event, content_type, credentials=None, duration=300, extra_headers=None): self.from_header = from_header self.event = event self.content_type = content_type self.credentials = credentials self.duration = duration self.extra_headers = extra_headers or [] self._last_etag = None self._current_request = None self._last_request = None self._unpublishing = False self._lock = RLock() is_published = property(lambda self: self._last_request is not None) expires_in = property(lambda self: 0 if self._last_request is None else self._last_request.expires_in) peer_address = property(lambda self: None if self._last_request is None else self._last_request.peer_address) def publish(self, body, route_header, timeout=None): with self._lock: if body is None: if self._last_request is None: raise ValueError("Need body for initial PUBLISH") elif self._last_etag is None: raise PublicationETagError("Cannot refresh, last ETag was invalid") self._make_and_send_request(body, route_header, timeout, True) def end(self, timeout=None): with self._lock: if self._last_request is None: return notification_center = NotificationCenter() notification_center.post_notification('SIPPublicationWillEnd', sender=self) try: self._make_and_send_request(None, RouteHeader.new(self._last_request.route_header), timeout, False) except SIPCoreError as e: notification_center.post_notification('SIPPublicationDidNotEnd', sender=self, data=NotificationData(code=0, reason=e.args[0])) def handle_notification(self, notification): handler = getattr(self, '_NH_%s' % notification.name, Null) handler(notification) def _NH_SIPRequestDidSucceed(self, notification): request = notification.sender with self._lock: if request is not self._current_request: return self._current_request = None if self._unpublishing: if self._last_request is not None: self._last_request.end() self._last_request = None self._last_etag = None notification.center.post_notification('SIPPublicationDidEnd', sender=self, data=NotificationData(expired=False)) else: self._last_request = request self._last_etag = notification.data.headers["SIP-ETag"].body if "SIP-ETag" in notification.data.headers else None # TODO: add more data? notification.center.post_notification('SIPPublicationDidSucceed', sender=self, data=NotificationData(code=notification.data.code, reason=notification.data.reason, expires_in=notification.data.expires, route_header=request.route_header)) def _NH_SIPRequestDidFail(self, notification): request = notification.sender with self._lock: if request is not self._current_request: return self._current_request = None if notification.data.code == 412: self._last_etag = None if self._unpublishing: notification.center.post_notification('SIPPublicationDidNotEnd', sender=self, data=NotificationData(code=notification.data.code, reason=notification.data.reason)) else: notification.center.post_notification('SIPPublicationDidFail', sender=self, data=NotificationData(code=notification.data.code, reason=notification.data.reason, route_header=request.route_header)) def _NH_SIPRequestWillExpire(self, notification): with self._lock: if notification.sender is not self._last_request: return notification.center.post_notification('SIPPublicationWillExpire', sender=self, data=NotificationData(expires=notification.data.expires)) def _NH_SIPRequestDidEnd(self, notification): with self._lock: notification.center.remove_observer(self, sender=notification.sender) if notification.sender is not self._last_request: return self._last_request = None if self._current_request is not None: self._current_request.end() self._current_request = None self._last_etag = None notification.center.post_notification('SIPPublicationDidEnd', sender=self, data=NotificationData(expired=True)) def _make_and_send_request(self, body, route_header, timeout, do_publish): notification_center = NotificationCenter() extra_headers = [] extra_headers.append(Header("Event", self.event)) extra_headers.append(Header("Expires", str(int(self.duration) if do_publish else 0))) if self._last_etag is not None: extra_headers.append(Header("SIP-If-Match", self._last_etag)) extra_headers.extend(self.extra_headers) content_type = (self.content_type if body is not None else None) request = Request("PUBLISH", self.from_header.uri, self.from_header, ToHeader.new(self.from_header), route_header, credentials=self.credentials, cseq=1, extra_headers=extra_headers, content_type=content_type, body=body) notification_center.add_observer(self, sender=request) if self._current_request is not None: # we are trying to send something already, cancel whatever it is self._current_request.end() self._current_request = None try: request.send(timeout=timeout) except: notification_center.remove_observer(self, sender=request) raise self._unpublishing = not do_publish self._current_request = request