diff --git a/xcap/xcapdiff.py b/xcap/xcapdiff.py index 3fe8134..ddd058d 100644 --- a/xcap/xcapdiff.py +++ b/xcap/xcapdiff.py @@ -1,194 +1,194 @@ """Track changes of the documents and notify subscribers Create a Notifier object: >>> n = Notifier(xcap_root, publish_xcapdiff_func) When a change occurs, call on_change >>> n.on_change(xcap_uri_updated, old_etag, new_etag) (old_etag being None means the document was just created, new_etag being None means the document was deleted) Notifier will call publish_xcapdiff_func with 2 args: user's uri and xcap-diff document. Number of calls is limited to no more than 1 call per MIN_WAIT seconds for a given user uri. """ from time import time from functools import wraps from twisted.internet import reactor def xml_xcapdiff(xcap_root, content): return """ %s """ % (xcap_root, content) def xml_document(sel, old_etag, new_etag): if old_etag: old_etag = ( ' previous-etag="%s"' % old_etag ) else: - old_etag = '' + old_etag = '' if new_etag: new_etag = ( ' new-etag="%s"' % new_etag ) else: - new_etag = '' + new_etag = '' return '' % (new_etag, sel, old_etag) class UserChanges(object): MIN_WAIT = 5 def __init__(self, publish_xcapdiff): self.changes = {} self.rate_limit = RateLimit(self.MIN_WAIT) self.publish_xcapdiff = publish_xcapdiff def add_change(self, uri, old_etag, etag, xcap_root): self.changes.setdefault(uri, [old_etag, etag])[1] = etag self.rate_limit.callAtLimitedRate(self.publish, uri.user.uri, xcap_root) def publish(self, user_uri, xcap_root): if self.changes: self.publish_xcapdiff(user_uri, self.unload_changes(xcap_root)) def unload_changes(self, xcap_root): docs = [] for uri, (old_etag, etag) in self.changes.iteritems(): docs.append(xml_document(uri, old_etag, etag)) result = xml_xcapdiff(xcap_root, '\n'.join(docs)) self.changes.clear() return result def __nonzero__(self): return self.changes.__nonzero__() class Notifier(object): def __init__(self, xcap_root, publish_xcapdiff_func): self.publish_xcapdiff = publish_xcapdiff_func self.xcap_root = xcap_root # maps user_uri to UserChanges self.users_changes = {} def on_change(self, uri, old_etag, new_etag): changes = self.users_changes.setdefault(uri.user, UserChanges(self.publish_xcapdiff)) changes.add_change(uri, old_etag, new_etag, self.xcap_root) class RateLimit(object): def __init__(self, min_wait): # minimum number of seconds between calls self.min_wait = min_wait # time() of the last call self.last_call = 0 # DelayedCall object of scheduled call self.delayed_call = None def callAtLimitedRate(self, f, *args, **kwargs): """Call f(*args, **kw) if it wasn't called in the last self.min_wait seconds. If it was, schedule it for later. Don't do anything if it's already scheduled. >>> rate = RateLimit(1) >>> def f(a, start = time()): ... print "%d %s" % (time()-start, a) ... return 'return value is lost!' >>> rate.callAtLimitedRate(f, 'a') 0 a >>> rate.callAtLimitedRate(f, 'b') # scheduled for 1 second later >>> rate.callAtLimitedRate(f, 'c') # ignored as there's already call in progress >>> _ = reactor.callLater(1.5, rate.callAtLimitedRate, f, 'd') >>> _ = reactor.callLater(2.1, reactor_stop) >>> reactor_run() 1 b 2 d """ current = time() delta = current - self.last_call if not self.delayed_call or \ self.delayed_call.called or \ self.delayed_call.cancelled: @wraps(f) def wrapped_f(): try: return f(*args, **kwargs) finally: self.last_call = time() self.delayed_call = callMaybeLater(self.min_wait - delta, wrapped_f) class RateLimitedFun(RateLimit): def __init__(self, min_wait, function): RateLimit.__init__(self, min_wait) self.function = function def __call__(self, *args, **kwargs): return self.callAtLimitedRate(self.function, *args, **kwargs) def limit_rate(min_wait): """Decorator for limiting rate of the function. The resulting value of the new function will be None regardless of what the wrapped function returned. >>> @limit_rate(1) ... def f(a, start = time()): ... print "%d %s" % (time()-start, a) ... return 'return value is lost!' >>> f('a') 0 a >>> f('b') # scheduled for 1 second later >>> f('c') # ignored as there's already call in progress >>> _ = reactor.callLater(1.5, f, 'd') >>> _ = reactor.callLater(2.1, reactor_stop) >>> reactor_run() 1 b 2 d """ rate = RateLimit(min_wait) def decorate(f): @wraps(f) def wrapped(*args, **kwargs): rate.callAtLimitedRate(f, *args, **kwargs) return wrapped return decorate def callMaybeLater(seconds, f, *args, **kw): "execute f and return None if seconds is zero, callLater otherwise" if seconds <= 0: f(*args, **kw) else: return reactor.callLater(seconds, f, *args, **kw) if __name__=='__main__': def reactor_run(first_time = [True]): if first_time[0]: reactor.run() first_time[0] = False else: reactor.running = True reactor.mainLoop() def reactor_stop(): reactor.running = False import doctest doctest.testmod()