Page MenuHomePhabricator

No OneTemporary

diff --git a/sipsimple/core/_engine.py b/sipsimple/core/_engine.py
index 03acecb9..79c4ded9 100644
--- a/sipsimple/core/_engine.py
+++ b/sipsimple/core/_engine.py
@@ -1,139 +1,141 @@
# Copyright (C) 2008-2011 AG Projects. See LICENSE for details.
#
"""
Implements a mechanism for starting the SIP core engine based on PJSIP
(http://pjsip.org) stack.
"""
from __future__ import with_statement
+__all__ = ["Engine"]
+
import sys
import traceback
import atexit
from datetime import datetime
from threading import Thread, RLock
from application.notification import NotificationCenter, NotificationData
from application.python.types import Singleton
from sipsimple.core._core import PJSIPUA, PJ_VERSION, PJ_SVN_REVISION, SIPCoreError
from sipsimple import __version__
class Engine(Thread):
__metaclass__ = Singleton
default_start_options = {"ip_address": None,
"udp_port": 0,
"tcp_port": None,
"tls_port": None,
"tls_protocol": "TLSv1",
"tls_verify_server": False,
"tls_ca_file": None,
"tls_cert_file": None,
"tls_privkey_file": None,
"tls_timeout": 3000,
"user_agent": "sipsimple-%s-pjsip-%s-r%s" % (__version__, PJ_VERSION, PJ_SVN_REVISION),
"log_level": 5,
"trace_sip": False,
"detect_sip_loops": True,
"ignore_missing_ack": False,
"rtp_port_range": (50000, 50500),
"codecs": ["G722", "speex", "PCMU", "PCMA"],
"events": {"conference": ["application/conference-info+xml"],
"message-summary": ["application/simple-message-summary"],
"presence": ["application/pidf+xml"],
"presence.winfo": ["application/watcherinfo+xml"],
"refer": ["message/sipfrag;version=2.0"],
"xcap-diff": ["application/xcap-diff+xml"]},
"incoming_events": set(),
"incoming_requests": set()}
def __init__(self):
self.notification_center = NotificationCenter()
self._thread_started = False
self._thread_stopping = False
atexit.register(self.stop)
self._lock = RLock()
Thread.__init__(self)
self.setDaemon(True)
@property
def is_running(self):
return (hasattr(self, "_ua") and hasattr(self, "_thread_started")
and self._thread_started and not self._thread_stopping)
def __getattr__(self, attr):
if attr not in ["_ua", "poll"] and hasattr(self, "_ua") and attr in dir(self._ua):
return getattr(self._ua, attr)
raise AttributeError("'%s' object has no attribute '%s'" % (self.__class__.__name__, attr))
def __setattr__(self, attr, value):
if attr not in ["_ua", "poll"] and hasattr(self, "_ua") and attr in dir(self._ua):
setattr(self._ua, attr, value)
return
object.__setattr__(self, attr, value)
def start(self, **kwargs):
if self._thread_started:
raise SIPCoreError("Worker thread was already started once")
init_options = Engine.default_start_options.copy()
init_options.update(kwargs)
self._post_notification('SIPEngineWillStart')
with self._lock:
try:
self._thread_started = True
self._ua = PJSIPUA(self._handle_event, **init_options)
Thread.start(self)
except:
self._thread_started = False
if hasattr(self, "_ua"):
self._ua.dealloc()
del self._ua
self._post_notification('SIPEngineDidFail')
raise
else:
self._post_notification('SIPEngineDidStart')
def stop(self):
if self._thread_stopping:
return
with self._lock:
if self._thread_started:
self._thread_stopping = True
# worker thread
def run(self):
failed = False
while not self._thread_stopping:
try:
failed = self._ua.poll()
except:
exc_type, exc_val, exc_tb = sys.exc_info()
self._post_notification('SIPEngineGotException', type=exc_type, value=exc_val, traceback="".join(traceback.format_exception(exc_type, exc_val, exc_tb)))
failed = True
if failed:
self._post_notification('SIPEngineDidFail')
break
if not failed:
self._post_notification('SIPEngineWillEnd')
self._ua.dealloc()
del self._ua
self._post_notification('SIPEngineDidEnd')
def _handle_event(self, event_name, **kwargs):
sender = kwargs.pop("obj", None)
if sender is None:
sender = self
if self.notification_center is not None:
self.notification_center.post_notification(event_name, sender, NotificationData(**kwargs))
def _post_notification(self, name, **kwargs):
if self.notification_center is not None:
self.notification_center.post_notification(name, self, NotificationData(timestamp=datetime.now(), **kwargs))
def setdefault(where, **what):
for k, x in what.iteritems():
where.setdefault(k, x)
-__all__ = ["Engine"]
+
diff --git a/sipsimple/core/_primitives.py b/sipsimple/core/_primitives.py
index 67bd96c2..d59f133d 100644
--- a/sipsimple/core/_primitives.py
+++ b/sipsimple/core/_primitives.py
@@ -1,350 +1,350 @@
# Copyright (C) 2008-2011 AG Projects. See LICENSE for details.
#
"""
Implements a high-level mechanism for SIP methods that can be used for
non-session based operations like REGISTER, SUBSCRIBE, PUBLISH and
MESSAGE.
"""
from __future__ import with_statement
+__all__ = ["Message", "Registration", "Publication", "PublicationError"]
+
from threading import RLock
from application.notification import IObserver, NotificationCenter
from application.python import Null
-
from zope.interface import implements
from sipsimple.core._core import ContactHeader, Header, Request, RouteHeader, SIPCoreError, SIPURI, ToHeader
from sipsimple.util import TimestampedNotificationData
class Registration(object):
implements(IObserver)
def __init__(self, from_header, credentials=None, duration=300):
self.from_header = from_header
self.credentials = credentials
self.duration = duration
self._current_request = None
self._last_request = None
self._unregistering = False
self._lock = RLock()
is_registered = property(lambda self: self._last_request is not None)
contact_uri = property(lambda self: None if self._last_request is None else self._last_request.contact_uri)
expires_in = property(lambda self: 0 if self._last_request is None else self._last_request.expires_in)
peer_address = property(lambda self: None if self._last_request is None else self._last_request.peer_address)
def register(self, contact_header, route_header, timeout=None):
with self._lock:
try:
self._make_and_send_request(contact_header, route_header, timeout, True)
except SIPCoreError, e:
NotificationCenter().post_notification('SIPRegistrationDidFail', sender=self,
data=TimestampedNotificationData(code=0, reason=e.args[0],
route_header=route_header))
def end(self, timeout=None):
with self._lock:
if self._last_request is None:
return
NotificationCenter().post_notification('SIPRegistrationWillEnd', sender=self, data=TimestampedNotificationData())
try:
self._make_and_send_request(ContactHeader.new(self._last_request.contact_header), RouteHeader.new(self._last_request.route_header), timeout, False)
except SIPCoreError, e:
NotificationCenter().post_notification('SIPRegistrationDidNotEnd', sender=self, data=TimestampedNotificationData(code=0, reason=e.args[0]))
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPRequestDidSucceed(self, notification):
request = notification.sender
notification_center = NotificationCenter()
with self._lock:
if request is not self._current_request:
return
self._current_request = None
if self._unregistering:
if self._last_request is not None:
self._last_request.end()
self._last_request = None
notification_center.post_notification('SIPRegistrationDidEnd', sender=self,
data=TimestampedNotificationData(expired=False))
else:
self._last_request = request
try:
contact_header_list = notification.data.headers["Contact"]
except KeyError:
contact_header_list = []
notification_center.post_notification('SIPRegistrationDidSucceed', sender=self,
data=TimestampedNotificationData(code=notification.data.code,
reason=notification.data.reason,
contact_header=request.contact_header,
contact_header_list=contact_header_list,
expires_in=notification.data.expires,
route_header=request.route_header))
def _NH_SIPRequestDidFail(self, notification):
request = notification.sender
notification_center = NotificationCenter()
with self._lock:
if request is not self._current_request:
return
self._current_request = None
if self._unregistering:
notification_center.post_notification('SIPRegistrationDidNotEnd', sender=self,
data=TimestampedNotificationData(code=notification.data.code,
reason=notification.data.reason))
else:
if hasattr(notification.data, 'headers'):
min_expires = notification.data.headers.get('Min-Expires', None)
else:
min_expires = None
notification_center.post_notification('SIPRegistrationDidFail', sender=self,
data=TimestampedNotificationData(code=notification.data.code,
reason=notification.data.reason,
route_header=request.route_header,
min_expires=min_expires))
def _NH_SIPRequestWillExpire(self, notification):
with self._lock:
if notification.sender is not self._last_request:
return
NotificationCenter().post_notification('SIPRegistrationWillExpire', sender=self,
data=TimestampedNotificationData(expires=notification.data.expires))
def _NH_SIPRequestDidEnd(self, notification):
request = notification.sender
notification_center = NotificationCenter()
with self._lock:
notification_center.remove_observer(self, sender=request)
if request is not self._last_request:
return
self._last_request = None
if self._current_request is not None:
self._current_request.end()
self._current_request = None
notification_center.post_notification('SIPRegistrationDidEnd', sender=self,
data=TimestampedNotificationData(expired=True))
def _make_and_send_request(self, contact_header, route_header, timeout, do_register):
notification_center = NotificationCenter()
prev_request = self._current_request or self._last_request
if prev_request is not None:
call_id = prev_request.call_id
cseq = prev_request.cseq + 1
else:
call_id = None
cseq = 1
request = Request("REGISTER", SIPURI(self.from_header.uri.host), self.from_header, ToHeader.new(self.from_header), route_header,
credentials=self.credentials, contact_header=contact_header, call_id=call_id,
cseq=cseq, extra_headers=[Header("Expires", str(int(self.duration) if do_register else 0))])
notification_center.add_observer(self, sender=request)
if self._current_request is not None:
# we are trying to send something already, cancel whatever it is
self._current_request.end()
self._current_request = None
try:
request.send(timeout=timeout)
except:
notification_center.remove_observer(self, sender=request)
raise
self._unregistering = not do_register
self._current_request = request
class Message(object):
implements(IObserver)
def __init__(self, from_header, to_header, route_header, content_type, body, credentials=None, extra_headers=[]):
self._request = Request("MESSAGE", to_header.uri, from_header, to_header, route_header,
credentials=credentials, extra_headers=extra_headers, content_type=content_type, body=body)
self._lock = RLock()
from_header = property(lambda self: self._request.from_header)
to_header = property(lambda self: self._request.to_header)
route_header = property(lambda self: self._request.route_header)
content_type = property(lambda self: self._request.content_type)
body = property(lambda self: self._request.body)
credentials = property(lambda self: self._request.credentials)
is_sent = property(lambda self: self._request.state != "INIT")
in_progress = property(lambda self: self._request.state == "IN_PROGRESS")
peer_address = property(lambda self: self._request.peer_address)
def send(self, timeout=None):
notification_center = NotificationCenter()
with self._lock:
if self.is_sent:
raise RuntimeError("This MESSAGE was already sent")
notification_center.add_observer(self, sender=self._request)
try:
self._request.send(timeout)
except:
notification_center.remove_observer(self, sender=self._request)
def end(self):
with self._lock:
self._request.end()
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPRequestDidSucceed(self, notification):
with self._lock:
if notification.data.expires:
# this shouldn't happen really
notification.sender.end()
NotificationCenter().post_notification('SIPMessageDidSucceed', sender=self,
data=TimestampedNotificationData(code=notification.data.code,
reason=notification.data.reason))
def _NH_SIPRequestDidFail(self, notification):
with self._lock:
NotificationCenter().post_notification('SIPMessageDidFail', sender=self,
data=TimestampedNotificationData(code=notification.data.code,
reason=notification.data.reason))
def _NH_SIPRequestDidEnd(self, notification):
with self._lock:
NotificationCenter().remove_observer(self, sender=notification.sender)
class PublicationError(Exception):
pass
class Publication(object):
implements(IObserver)
def __init__(self, from_header, event, content_type, credentials=None, duration=300, extra_headers=[]):
self.from_header = from_header
self.event = event
self.content_type = content_type
self.credentials = credentials
self.duration = duration
self.extra_headers = extra_headers
self._last_etag = None
self._current_request = None
self._last_request = None
self._unpublishing = False
self._lock = RLock()
is_published = property(lambda self: self._last_request is not None)
expires_in = property(lambda self: 0 if self._last_request is None else self._last_request.expires_in)
peer_address = property(lambda self: None if self._last_request is None else self._last_request.peer_address)
def publish(self, body, route_header, timeout=None):
with self._lock:
if body is None:
if self._last_request is None:
raise ValueError("Need body for initial PUBLISH")
elif self._last_etag is None:
raise PublicationError("Cannot refresh, last ETag was invalid")
self._make_and_send_request(body, route_header, timeout, True)
def end(self, timeout=None):
with self._lock:
if self._last_request is None:
raise PublicationError("Nothing is currently published")
self._make_and_send_request(None, RouteHeader.new(self._last_request.route_header), timeout, False)
NotificationCenter().post_notification('SIPPublicationWillEnd', sender=self, data=TimestampedNotificationData())
def handle_notification(self, notification):
handler = getattr(self, '_NH_%s' % notification.name, Null)
handler(notification)
def _NH_SIPRequestDidSucceed(self, notification):
request = notification.sender
notification_center = NotificationCenter()
with self._lock:
if request is not self._current_request:
return
self._current_request = None
if self._unpublishing:
if self._last_request is not None:
self._last_request.end()
self._last_request = None
self._last_etag = None
notification_center.post_notification('SIPPublicationDidEnd', sender=self,
data=TimestampedNotificationData(expired=False))
else:
self._last_request = request
self._last_etag = notification.data.headers["SIP-ETag"].body if "SIP-ETag" in notification.data.headers else None
# TODO: add more data?
notification_center.post_notification('SIPPublicationDidSucceed', sender=self,
data=TimestampedNotificationData(code=notification.data.code,
reason=notification.data.reason,
expires_in=notification.data.expires,
route_header=request.route_header))
def _NH_SIPRequestDidFail(self, notification):
request = notification.sender
notification_center = NotificationCenter()
with self._lock:
if request is not self._current_request:
return
self._current_request = None
if notification.data.code == 412:
self._last_etag = None
if self._unpublishing:
notification_center.post_notification('SIPPublicationDidNotEnd', sender=self,
data=TimestampedNotificationData(code=notification.data.code,
reason=notification.data.reason))
else:
notification_center.post_notification('SIPPublicationDidFail', sender=self,
data=TimestampedNotificationData(code=notification.data.code,
reason=notification.data.reason,
route_header=request.route_header))
def _NH_SIPRequestWillExpire(self, notification):
with self._lock:
if notification.sender is not self._last_request:
return
NotificationCenter().post_notification('SIPPublicationWillExpire', sender=self,
data=TimestampedNotificationData(expires=notification.data.expires))
def _NH_SIPRequestDidEnd(self, notification):
notification_center = NotificationCenter()
with self._lock:
notification_center.remove_observer(self, sender=notification.sender)
if notification.sender is not self._last_request:
return
self._last_request = None
if self._current_request is not None:
self._current_request.end()
self._current_request = None
self._last_etag = None
notification_center.post_notification('SIPPublicationDidEnd', sender=self,
data=TimestampedNotificationData(expired=True))
def _make_and_send_request(self, body, route_header, timeout, do_publish):
notification_center = NotificationCenter()
extra_headers = []
extra_headers.append(Header("Event", self.event))
extra_headers.append(Header("Expires", str(int(self.duration) if do_publish else 0)))
if self._last_etag is not None:
extra_headers.append(Header("SIP-If-Match", self._last_etag))
extra_headers.extend(self.extra_headers)
content_type = (self.content_type if body is not None else None)
request = Request("PUBLISH", self.from_header.uri, self.from_header, ToHeader.new(self.from_header), route_header,
credentials=self.credentials, cseq=1, extra_headers=extra_headers,
content_type=content_type, body=body)
notification_center.add_observer(self, sender=request)
if self._current_request is not None:
# we are trying to send something already, cancel whatever it is
self._current_request.end()
self._current_request = None
try:
request.send(timeout=timeout)
except:
notification_center.remove_observer(self, sender=request)
raise
self._unpublishing = not do_publish
self._current_request = request
-__all__ = ["Registration", "Message", "PublicationError", "Publication"]

File Metadata

Mime Type
text/x-diff
Expires
Sat, Nov 23, 4:50 AM (1 d, 3 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3408812
Default Alt Text
(24 KB)

Event Timeline