Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F7159351
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
24 KB
Referenced Files
None
Subscribers
None
View Options
diff --git a/sipsimple/core/_engine.py b/sipsimple/core/_engine.py
index 03acecb9..79c4ded9 100644
--- a/sipsimple/core/_engine.py
+++ b/sipsimple/core/_engine.py
@@ -1,139 +1,141 @@
# Copyright (C) 2008-2011 AG Projects. See LICENSE for details.
#
"""
Implements a mechanism for starting the SIP core engine based on PJSIP
(http://pjsip.org) stack.
"""
from __future__ import with_statement
+__all__ = ["Engine"]
+
import sys
import traceback
import atexit
from datetime import datetime
from threading import Thread, RLock
from application.notification import NotificationCenter, NotificationData
from application.python.types import Singleton
from sipsimple.core._core import PJSIPUA, PJ_VERSION, PJ_SVN_REVISION, SIPCoreError
from sipsimple import __version__
class Engine(Thread):
__metaclass__ = Singleton
default_start_options = {"ip_address": None,
"udp_port": 0,
"tcp_port": None,
"tls_port": None,
"tls_protocol": "TLSv1",
"tls_verify_server": False,
"tls_ca_file": None,
"tls_cert_file": None,
"tls_privkey_file": None,
"tls_timeout": 3000,
"user_agent": "sipsimple-%s-pjsip-%s-r%s" % (__version__, PJ_VERSION, PJ_SVN_REVISION),
"log_level": 5,
"trace_sip": False,
"detect_sip_loops": True,
"ignore_missing_ack": False,
"rtp_port_range": (50000, 50500),
"codecs": ["G722", "speex", "PCMU", "PCMA"],
"events": {"conference": ["application/conference-info+xml"],
"message-summary": ["application/simple-message-summary"],
"presence": ["application/pidf+xml"],
"presence.winfo": ["application/watcherinfo+xml"],
"refer": ["message/sipfrag;version=2.0"],
"xcap-diff": ["application/xcap-diff+xml"]},
"incoming_events": set(),
"incoming_requests": set()}
def __init__(self):
self.notification_center = NotificationCenter()
self._thread_started = False
self._thread_stopping = False
atexit.register(self.stop)
self._lock = RLock()
Thread.__init__(self)
self.setDaemon(True)
@property
def is_running(self):
return (hasattr(self, "_ua") and hasattr(self, "_thread_started")
and self._thread_started and not self._thread_stopping)
def __getattr__(self, attr):
if attr not in ["_ua", "poll"] and hasattr(self, "_ua") and attr in dir(self._ua):
return getattr(self._ua, attr)
raise AttributeError("'%s' object has no attribute '%s'" % (self.__class__.__name__, attr))
def __setattr__(self, attr, value):
if attr not in ["_ua", "poll"] and hasattr(self, "_ua") and attr in dir(self._ua):
setattr(self._ua, attr, value)
return
object.__setattr__(self, attr, value)
def start(self, **kwargs):
if self._thread_started:
raise SIPCoreError("Worker thread was already started once")
init_options = Engine.default_start_options.copy()
init_options.update(kwargs)
self._post_notification('SIPEngineWillStart')
with self._lock:
try:
self._thread_started = True
self._ua = PJSIPUA(self._handle_event, **init_options)
Thread.start(self)
except:
self._thread_started = False
if hasattr(self, "_ua"):
self._ua.dealloc()
del self._ua
self._post_notification('SIPEngineDidFail')
raise
else:
self._post_notification('SIPEngineDidStart')
def stop(self):
if self._thread_stopping:
return
with self._lock:
if self._thread_started:
self._thread_stopping = True
# worker thread
def run(self):
failed = False
while not self._thread_stopping:
try:
failed = self._ua.poll()
except:
exc_type, exc_val, exc_tb = sys.exc_info()
self._post_notification('SIPEngineGotException', type=exc_type, value=exc_val, traceback="".join(traceback.format_exception(exc_type, exc_val, exc_tb)))
failed = True
if failed:
self._post_notification('SIPEngineDidFail')
break
if not failed:
self._post_notification('SIPEngineWillEnd')
self._ua.dealloc()
del self._ua
self._post_notification('SIPEngineDidEnd')
def _handle_event(self, event_name, **kwargs):
sender = kwargs.pop("obj", None)
if sender is None:
sender = self
if self.notification_center is not None:
self.notification_center.post_notification(event_name, sender, NotificationData(**kwargs))
def _post_notification(self, name, **kwargs):
if self.notification_center is not None:
self.notification_center.post_notification(name, self, NotificationData(timestamp=datetime.now(), **kwargs))
def setdefault(where, **what):
for k, x in what.iteritems():
where.setdefault(k, x)
-__all__ = ["Engine"]
+
diff --git a/sipsimple/core/_primitives.py b/sipsimple/core/_primitives.py
index 67bd96c2..d59f133d 100644
--- a/sipsimple/core/_primitives.py
+++ b/sipsimple/core/_primitives.py
@@ -1,350 +1,350 @@
# Copyright (C) 2008-2011 AG Projects. See LICENSE for details.
#
"""
Implements a high-level mechanism for SIP methods that can be used for
non-session based operations like REGISTER, SUBSCRIBE, PUBLISH and
MESSAGE.
"""
from __future__ import with_statement
+__all__ = ["Message", "Registration", "Publication", "PublicationError"]
+
from threading import RLock
from application.notification import IObserver, NotificationCenter
from application.python import Null
-
from zope.interface import implements
from sipsimple.core._core import ContactHeader, Header, Request, RouteHeader, SIPCoreError, SIPURI, ToHeader
from sipsimple.util import TimestampedNotificationData
class Registration(object):
implements(IObserver)
def __init__(self, from_header, credentials=None, duration=300):
self.from_header = from_header
self.credentials = credentials
self.duration = duration
self._current_request = None
self._last_request = None
self._unregistering = False
self._lock = RLock()
is_registered = property(lambda self: self._last_request is not None)
contact_uri = property(lambda self: None if self._last_request is None else self._last_request.contact_uri)
expires_in = property(lambda self: 0 if self._last_request is None else self._last_request.expires_in)
peer_address = property(lambda self: None if self._last_request is None else self._last_request.peer_address)
def register(self, contact_header, route_header, timeout=None):
with self._lock:
try:
self._make_and_send_request(contact_header, route_header, timeout, True)
except SIPCoreError, e:
NotificationCenter().post_notification('SIPRegistrationDidFail', sender=self,
data=TimestampedNotificationData(code=0, reason=e.args[0],
route_header=route_header))
def end(self, timeout=None):
with self._lock:
if self._last_request is None:
return
NotificationCenter().post_notification('SIPRegistrationWillEnd', sender=self, data=TimestampedNotificationData())
try:
self._make_and_send_request(ContactHeader.new(self._last_request.contact_header), RouteHeader.new(self._last_request.route_header), timeout, False)
except SIPCoreError, e:
NotificationCenter().post_notification('SIPRegistrationDidNotEnd', sender=self, data=TimestampedNotificationData(code=0, reason=e.args[0]))
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPRequestDidSucceed(self, notification):
request = notification.sender
notification_center = NotificationCenter()
with self._lock:
if request is not self._current_request:
return
self._current_request = None
if self._unregistering:
if self._last_request is not None:
self._last_request.end()
self._last_request = None
notification_center.post_notification('SIPRegistrationDidEnd', sender=self,
data=TimestampedNotificationData(expired=False))
else:
self._last_request = request
try:
contact_header_list = notification.data.headers["Contact"]
except KeyError:
contact_header_list = []
notification_center.post_notification('SIPRegistrationDidSucceed', sender=self,
data=TimestampedNotificationData(code=notification.data.code,
reason=notification.data.reason,
contact_header=request.contact_header,
contact_header_list=contact_header_list,
expires_in=notification.data.expires,
route_header=request.route_header))
def _NH_SIPRequestDidFail(self, notification):
request = notification.sender
notification_center = NotificationCenter()
with self._lock:
if request is not self._current_request:
return
self._current_request = None
if self._unregistering:
notification_center.post_notification('SIPRegistrationDidNotEnd', sender=self,
data=TimestampedNotificationData(code=notification.data.code,
reason=notification.data.reason))
else:
if hasattr(notification.data, 'headers'):
min_expires = notification.data.headers.get('Min-Expires', None)
else:
min_expires = None
notification_center.post_notification('SIPRegistrationDidFail', sender=self,
data=TimestampedNotificationData(code=notification.data.code,
reason=notification.data.reason,
route_header=request.route_header,
min_expires=min_expires))
def _NH_SIPRequestWillExpire(self, notification):
with self._lock:
if notification.sender is not self._last_request:
return
NotificationCenter().post_notification('SIPRegistrationWillExpire', sender=self,
data=TimestampedNotificationData(expires=notification.data.expires))
def _NH_SIPRequestDidEnd(self, notification):
request = notification.sender
notification_center = NotificationCenter()
with self._lock:
notification_center.remove_observer(self, sender=request)
if request is not self._last_request:
return
self._last_request = None
if self._current_request is not None:
self._current_request.end()
self._current_request = None
notification_center.post_notification('SIPRegistrationDidEnd', sender=self,
data=TimestampedNotificationData(expired=True))
def _make_and_send_request(self, contact_header, route_header, timeout, do_register):
notification_center = NotificationCenter()
prev_request = self._current_request or self._last_request
if prev_request is not None:
call_id = prev_request.call_id
cseq = prev_request.cseq + 1
else:
call_id = None
cseq = 1
request = Request("REGISTER", SIPURI(self.from_header.uri.host), self.from_header, ToHeader.new(self.from_header), route_header,
credentials=self.credentials, contact_header=contact_header, call_id=call_id,
cseq=cseq, extra_headers=[Header("Expires", str(int(self.duration) if do_register else 0))])
notification_center.add_observer(self, sender=request)
if self._current_request is not None:
# we are trying to send something already, cancel whatever it is
self._current_request.end()
self._current_request = None
try:
request.send(timeout=timeout)
except:
notification_center.remove_observer(self, sender=request)
raise
self._unregistering = not do_register
self._current_request = request
class Message(object):
implements(IObserver)
def __init__(self, from_header, to_header, route_header, content_type, body, credentials=None, extra_headers=[]):
self._request = Request("MESSAGE", to_header.uri, from_header, to_header, route_header,
credentials=credentials, extra_headers=extra_headers, content_type=content_type, body=body)
self._lock = RLock()
from_header = property(lambda self: self._request.from_header)
to_header = property(lambda self: self._request.to_header)
route_header = property(lambda self: self._request.route_header)
content_type = property(lambda self: self._request.content_type)
body = property(lambda self: self._request.body)
credentials = property(lambda self: self._request.credentials)
is_sent = property(lambda self: self._request.state != "INIT")
in_progress = property(lambda self: self._request.state == "IN_PROGRESS")
peer_address = property(lambda self: self._request.peer_address)
def send(self, timeout=None):
notification_center = NotificationCenter()
with self._lock:
if self.is_sent:
raise RuntimeError("This MESSAGE was already sent")
notification_center.add_observer(self, sender=self._request)
try:
self._request.send(timeout)
except:
notification_center.remove_observer(self, sender=self._request)
def end(self):
with self._lock:
self._request.end()
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPRequestDidSucceed(self, notification):
with self._lock:
if notification.data.expires:
# this shouldn't happen really
notification.sender.end()
NotificationCenter().post_notification('SIPMessageDidSucceed', sender=self,
data=TimestampedNotificationData(code=notification.data.code,
reason=notification.data.reason))
def _NH_SIPRequestDidFail(self, notification):
with self._lock:
NotificationCenter().post_notification('SIPMessageDidFail', sender=self,
data=TimestampedNotificationData(code=notification.data.code,
reason=notification.data.reason))
def _NH_SIPRequestDidEnd(self, notification):
with self._lock:
NotificationCenter().remove_observer(self, sender=notification.sender)
class PublicationError(Exception):
pass
class Publication(object):
implements(IObserver)
def __init__(self, from_header, event, content_type, credentials=None, duration=300, extra_headers=[]):
self.from_header = from_header
self.event = event
self.content_type = content_type
self.credentials = credentials
self.duration = duration
self.extra_headers = extra_headers
self._last_etag = None
self._current_request = None
self._last_request = None
self._unpublishing = False
self._lock = RLock()
is_published = property(lambda self: self._last_request is not None)
expires_in = property(lambda self: 0 if self._last_request is None else self._last_request.expires_in)
peer_address = property(lambda self: None if self._last_request is None else self._last_request.peer_address)
def publish(self, body, route_header, timeout=None):
with self._lock:
if body is None:
if self._last_request is None:
raise ValueError("Need body for initial PUBLISH")
elif self._last_etag is None:
raise PublicationError("Cannot refresh, last ETag was invalid")
self._make_and_send_request(body, route_header, timeout, True)
def end(self, timeout=None):
with self._lock:
if self._last_request is None:
raise PublicationError("Nothing is currently published")
self._make_and_send_request(None, RouteHeader.new(self._last_request.route_header), timeout, False)
NotificationCenter().post_notification('SIPPublicationWillEnd', sender=self, data=TimestampedNotificationData())
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPRequestDidSucceed(self, notification):
request = notification.sender
notification_center = NotificationCenter()
with self._lock:
if request is not self._current_request:
return
self._current_request = None
if self._unpublishing:
if self._last_request is not None:
self._last_request.end()
self._last_request = None
self._last_etag = None
notification_center.post_notification('SIPPublicationDidEnd', sender=self,
data=TimestampedNotificationData(expired=False))
else:
self._last_request = request
self._last_etag = notification.data.headers["SIP-ETag"].body if "SIP-ETag" in notification.data.headers else None
# TODO: add more data?
notification_center.post_notification('SIPPublicationDidSucceed', sender=self,
data=TimestampedNotificationData(code=notification.data.code,
reason=notification.data.reason,
expires_in=notification.data.expires,
route_header=request.route_header))
def _NH_SIPRequestDidFail(self, notification):
request = notification.sender
notification_center = NotificationCenter()
with self._lock:
if request is not self._current_request:
return
self._current_request = None
if notification.data.code == 412:
self._last_etag = None
if self._unpublishing:
notification_center.post_notification('SIPPublicationDidNotEnd', sender=self,
data=TimestampedNotificationData(code=notification.data.code,
reason=notification.data.reason))
else:
notification_center.post_notification('SIPPublicationDidFail', sender=self,
data=TimestampedNotificationData(code=notification.data.code,
reason=notification.data.reason,
route_header=request.route_header))
def _NH_SIPRequestWillExpire(self, notification):
with self._lock:
if notification.sender is not self._last_request:
return
NotificationCenter().post_notification('SIPPublicationWillExpire', sender=self,
data=TimestampedNotificationData(expires=notification.data.expires))
def _NH_SIPRequestDidEnd(self, notification):
notification_center = NotificationCenter()
with self._lock:
notification_center.remove_observer(self, sender=notification.sender)
if notification.sender is not self._last_request:
return
self._last_request = None
if self._current_request is not None:
self._current_request.end()
self._current_request = None
self._last_etag = None
notification_center.post_notification('SIPPublicationDidEnd', sender=self,
data=TimestampedNotificationData(expired=True))
def _make_and_send_request(self, body, route_header, timeout, do_publish):
notification_center = NotificationCenter()
extra_headers = []
extra_headers.append(Header("Event", self.event))
extra_headers.append(Header("Expires", str(int(self.duration) if do_publish else 0)))
if self._last_etag is not None:
extra_headers.append(Header("SIP-If-Match", self._last_etag))
extra_headers.extend(self.extra_headers)
content_type = (self.content_type if body is not None else None)
request = Request("PUBLISH", self.from_header.uri, self.from_header, ToHeader.new(self.from_header), route_header,
credentials=self.credentials, cseq=1, extra_headers=extra_headers,
content_type=content_type, body=body)
notification_center.add_observer(self, sender=request)
if self._current_request is not None:
# we are trying to send something already, cancel whatever it is
self._current_request.end()
self._current_request = None
try:
request.send(timeout=timeout)
except:
notification_center.remove_observer(self, sender=request)
raise
self._unpublishing = not do_publish
self._current_request = request
-__all__ = ["Registration", "Message", "PublicationError", "Publication"]
File Metadata
Details
Attached
Mime Type
text/x-diff
Expires
Sat, Nov 23, 4:50 AM (1 d, 3 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3408812
Default Alt Text
(24 KB)
Attached To
Mode
rPYNSIPSIMPLE python3-sipsimple
Attached
Detach File
Event Timeline
Log In to Comment