Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F7159617
coros.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
18 KB
Referenced Files
None
Subscribers
None
coros.py
View Options
# @author Donovan Preston
#
# Copyright (c) 2007, Linden Research, Inc.
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import
collections
import
time
from
eventlib
import
api
class
Cancelled
(
RuntimeError
):
pass
class
NOT_USED
:
def
__repr__
(
self
):
return
'NOT_USED'
NOT_USED
=
NOT_USED
()
class
event
(
object
):
"""An abstraction where an arbitrary number of coroutines
can wait for one event from another.
Events differ from channels in two ways:
1) calling send() does not unschedule the current coroutine
2) send() can only be called once; use reset() to prepare the event for
another send()
They are ideal for communicating return values between coroutines.
>>> from eventlib import coros, api
>>> evt = coros.event()
>>> def baz(b):
... evt.send(b + 1)
...
>>> _ = api.spawn(baz, 3)
>>> evt.wait()
4
"""
_result
=
None
def
__init__
(
self
):
self
.
_waiters
=
{}
self
.
reset
()
def
__str__
(
self
):
params
=
(
self
.
__class__
.
__name__
,
hex
(
id
(
self
)),
self
.
_result
,
self
.
_exc
,
len
(
self
.
_waiters
))
return
'<
%s
at
%s
result=
%r
_exc=
%r
_waiters[
%d
]>'
%
params
def
reset
(
self
):
""" Reset this event so it can be used to send again.
Can only be called after send has been called.
>>> from eventlib import coros
>>> evt = coros.event()
>>> evt.send(1)
>>> evt.reset()
>>> evt.send(2)
>>> evt.wait()
2
Calling reset multiple times in a row is an error.
>>> evt.reset()
>>> evt.reset()
Traceback (most recent call last):
...
AssertionError: Trying to re-reset() a fresh event.
"""
assert
self
.
_result
is
not
NOT_USED
,
'Trying to re-reset() a fresh event.'
self
.
epoch
=
time
.
time
()
self
.
_result
=
NOT_USED
self
.
_exc
=
None
def
ready
(
self
):
""" Return true if the wait() call will return immediately.
Used to avoid waiting for things that might take a while to time out.
For example, you can put a bunch of events into a list, and then visit
them all repeatedly, calling ready() until one returns True, and then
you can wait() on that one."""
return
self
.
_result
is
not
NOT_USED
def
has_exception
(
self
):
return
self
.
_exc
is
not
None
def
has_result
(
self
):
return
self
.
_result
is
not
NOT_USED
and
self
.
_exc
is
None
def
poll
(
self
,
notready
=
None
):
if
self
.
ready
():
return
self
.
wait
()
return
notready
# QQQ make it return tuple (type, value, tb) instead of raising
# because
# 1) "poll" does not imply raising
# 2) it's better not to screw up caller's sys.exc_info() by default
# (e.g. if caller wants to calls the function in except or finally)
def
poll_exception
(
self
,
notready
=
None
):
if
self
.
has_exception
():
return
self
.
wait
()
return
notready
def
poll_result
(
self
,
notready
=
None
):
if
self
.
has_result
():
return
self
.
wait
()
return
notready
def
wait
(
self
):
"""Wait until another coroutine calls send.
Returns the value the other coroutine passed to
send.
>>> from eventlib import coros, api
>>> evt = coros.event()
>>> def wait_on():
... retval = evt.wait()
... print "waited for", retval
>>> _ = api.spawn(wait_on)
>>> evt.send('result')
>>> api.sleep(0)
waited for result
Returns immediately if the event has already
occured.
>>> evt.wait()
'result'
"""
if
self
.
_result
is
NOT_USED
:
self
.
_waiters
[
api
.
getcurrent
()]
=
True
try
:
return
api
.
get_hub
()
.
switch
()
finally
:
self
.
_waiters
.
pop
(
api
.
getcurrent
(),
None
)
if
self
.
_exc
is
not
None
:
api
.
getcurrent
()
.
throw
(
*
self
.
_exc
)
return
self
.
_result
def
send
(
self
,
result
=
None
,
exc
=
None
):
"""Makes arrangements for the waiters to be woken with the
result and then returns immediately to the parent.
>>> from eventlib import coros, api
>>> evt = coros.event()
>>> def waiter():
... print 'about to wait'
... result = evt.wait()
... print 'waited for', result
>>> _ = api.spawn(waiter)
>>> api.sleep(0)
about to wait
>>> evt.send('a')
>>> api.sleep(0)
waited for a
It is an error to call send() multiple times on the same event.
>>> evt.send('whoops')
Traceback (most recent call last):
...
AssertionError: Trying to re-send() an already-triggered event.
Use reset() between send()s to reuse an event object.
"""
assert
self
.
_result
is
NOT_USED
,
'Trying to re-send() an already-triggered event.'
self
.
_result
=
result
if
exc
is
not
None
and
not
isinstance
(
exc
,
tuple
):
exc
=
(
exc
,
)
self
.
_exc
=
exc
hub
=
api
.
get_hub
()
if
self
.
_waiters
:
hub
.
schedule_call_global
(
0
,
self
.
_do_send
,
self
.
_result
,
self
.
_exc
,
self
.
_waiters
.
keys
())
def
_do_send
(
self
,
result
,
exc
,
waiters
):
while
waiters
:
waiter
=
waiters
.
pop
()
if
waiter
in
self
.
_waiters
:
if
exc
is
None
:
waiter
.
switch
(
result
)
else
:
waiter
.
throw
(
*
exc
)
def
send_exception
(
self
,
*
args
):
# the arguments and the same as for greenlet.throw
return
self
.
send
(
None
,
args
)
class
Semaphore
(
object
):
"""An unbounded semaphore.
Optionally initialize with a resource count, then acquire() and release()
resources as needed. Attempting to acquire() when count is zero suspends
the calling coroutine until count becomes nonzero again.
"""
def
__init__
(
self
,
count
=
0
):
self
.
counter
=
count
self
.
_waiters
=
{}
def
__str__
(
self
):
params
=
(
self
.
__class__
.
__name__
,
hex
(
id
(
self
)),
self
.
counter
)
return
'<
%s
at
%s
counter=
%r
>'
%
params
def
locked
(
self
):
return
self
.
counter
<=
0
def
bounded
(
self
):
# for consistency with BoundedSemaphore
return
False
def
acquire
(
self
,
blocking
=
True
):
if
not
blocking
and
self
.
locked
():
return
False
while
self
.
counter
<=
0
:
self
.
_waiters
[
api
.
getcurrent
()]
=
None
try
:
api
.
get_hub
()
.
switch
()
finally
:
self
.
_waiters
.
pop
(
api
.
getcurrent
(),
None
)
self
.
counter
-=
1
return
True
def
__enter__
(
self
):
self
.
acquire
()
def
release
(
self
,
blocking
=
True
):
# `blocking' parameter is for consistency with BoundedSemaphore and is ignored
self
.
counter
+=
1
if
self
.
_waiters
:
api
.
get_hub
()
.
schedule_call_global
(
0
,
self
.
_do_acquire
)
return
True
def
_do_acquire
(
self
):
if
self
.
_waiters
and
self
.
counter
>
0
:
waiter
,
_unused
=
self
.
_waiters
.
popitem
()
waiter
.
switch
()
def
__exit__
(
self
,
typ
,
val
,
tb
):
self
.
release
()
class
BoundedSemaphore
(
object
):
"""A bounded semaphore.
Optionally initialize with a resource count, then acquire() and release()
resources as needed. Attempting to acquire() when count is zero suspends
the calling coroutine until count becomes nonzero again. Attempting to
release() after count has reached limit suspends the calling coroutine until
count becomes less than limit again.
"""
def
__init__
(
self
,
count
,
limit
):
if
count
>
limit
:
# accidentally, this also catches the case when limit is None
raise
ValueError
(
"'count' cannot be more than 'limit'"
)
self
.
lower_bound
=
Semaphore
(
count
)
self
.
upper_bound
=
Semaphore
(
limit
-
count
)
def
__str__
(
self
):
params
=
(
self
.
__class__
.
__name__
,
hex
(
id
(
self
)),
self
.
lower_bound
.
counter
,
self
.
upper_bound
.
counter
)
return
'<
%s
at
%s
%r
/
%r
>'
%
params
def
locked
(
self
):
return
self
.
lower_bound
.
locked
()
def
bounded
(
self
):
return
self
.
upper_bound
.
locked
()
def
acquire
(
self
,
blocking
=
True
):
if
not
blocking
and
self
.
locked
():
return
False
self
.
upper_bound
.
release
()
try
:
return
self
.
lower_bound
.
acquire
()
except
:
self
.
upper_bound
.
counter
-=
1
# using counter directly means that it can be less than zero.
# however I certainly don't need to wait here and I don't seem to have
# a need to care about such inconsistency
raise
def
__enter__
(
self
):
self
.
acquire
()
def
release
(
self
,
blocking
=
True
):
if
not
blocking
and
self
.
bounded
():
return
False
self
.
lower_bound
.
release
()
try
:
return
self
.
upper_bound
.
acquire
()
except
:
self
.
lower_bound
.
counter
-=
1
raise
def
__exit__
(
self
,
typ
,
val
,
tb
):
self
.
release
()
@property
def
balance
(
self
):
return
self
.
lower_bound
.
counter
-
self
.
upper_bound
.
counter
def
semaphore
(
count
=
0
,
limit
=
None
):
if
limit
is
None
:
return
Semaphore
(
count
)
else
:
return
BoundedSemaphore
(
count
,
limit
)
class
metaphore
(
object
):
"""This is sort of an inverse semaphore: a counter that starts at 0 and
waits only if nonzero. It's used to implement a "wait for all" scenario.
>>> from eventlib import api, coros
>>> count = coros.metaphore()
>>> count.wait()
>>> def decrementer(count, id):
... print "%s decrementing" % id
... count.dec()
...
>>> _ = api.spawn(decrementer, count, 'A')
>>> _ = api.spawn(decrementer, count, 'B')
>>> count.inc(2)
>>> count.wait()
A decrementing
B decrementing
"""
def
__init__
(
self
):
self
.
counter
=
0
self
.
event
=
event
()
# send() right away, else we'd wait on the default 0 count!
self
.
event
.
send
()
def
inc
(
self
,
by
=
1
):
"""Increment our counter. If this transitions the counter from zero to
nonzero, make any subsequent wait() call wait.
"""
assert
by
>
0
self
.
counter
+=
by
if
self
.
counter
==
by
:
# If we just incremented self.counter by 'by', and the new count
# equals 'by', then the old value of self.counter was 0.
# Transitioning from 0 to a nonzero value means wait() must
# actually wait.
self
.
event
.
reset
()
def
dec
(
self
,
by
=
1
):
"""Decrement our counter. If this transitions the counter from nonzero
to zero, a current or subsequent wait() call need no longer wait.
"""
assert
by
>
0
self
.
counter
-=
by
if
self
.
counter
<=
0
:
# Don't leave self.counter < 0, that will screw things up in
# future calls.
self
.
counter
=
0
# Transitioning from nonzero to 0 means wait() need no longer wait.
self
.
event
.
send
()
def
wait
(
self
):
"""Suspend the caller only if our count is nonzero. In that case,
resume the caller once the count decrements to zero again.
"""
self
.
event
.
wait
()
def
execute
(
func
,
*
args
,
**
kw
):
""" Executes an operation asynchronously in a new coroutine, returning
an event to retrieve the return value.
This has the same api as the CoroutinePool.execute method; the only
difference is that this one creates a new coroutine instead of drawing
from a pool.
>>> from eventlib import coros
>>> evt = coros.execute(lambda a: ('foo', a), 1)
>>> evt.wait()
('foo', 1)
"""
evt
=
event
()
def
_really_execute
():
evt
.
send
(
func
(
*
args
,
**
kw
))
api
.
spawn
(
_really_execute
)
return
evt
def
CoroutinePool
(
*
args
,
**
kwargs
):
from
eventlib.pool
import
Pool
return
Pool
(
*
args
,
**
kwargs
)
class
queue
(
object
):
"""Cross-coroutine queue, using semaphore to synchronize.
The API is like a generalization of event to be able to hold more than one
item at a time (without reset() or cancel()).
>>> from eventlib import coros
>>> q = coros.queue(max_size=2)
>>> def putter(q):
... q.send("first")
...
>>> _ = api.spawn(putter, q)
>>> q.ready()
False
>>> q.wait()
'first'
>>> q.ready()
False
>>> q.send("second")
>>> q.ready()
True
>>> q.send("third")
>>> def getter(q):
... print q.wait()
...
>>> _ = api.spawn(getter, q)
>>> q.send("fourth")
second
"""
def
__init__
(
self
,
max_size
=
None
):
"""If you omit max_size, the queue will attempt to store an unlimited
number of items.
Specifying max_size means that when the queue already contains
max_size items, an attempt to send() one more item will suspend the
calling coroutine until someone else retrieves one.
"""
self
.
items
=
collections
.
deque
()
self
.
sem
=
semaphore
(
count
=
0
,
limit
=
max_size
)
def
__nonzero__
(
self
):
return
len
(
self
.
items
)
>
0
def
__len__
(
self
):
return
len
(
self
.
items
)
def
__str__
(
self
):
params
=
(
self
.
__class__
.
__name__
,
hex
(
id
(
self
)),
self
.
sem
,
len
(
self
.
items
))
return
'<
%s
at
%s
sem=
%s
items[
%d
]>'
%
params
def
send
(
self
,
result
=
None
,
exc
=
None
):
"""If you send(exc=SomeExceptionClass), the corresponding wait() call
will raise that exception.
Otherwise, the corresponding wait() will return result (default None).
"""
if
exc
is
not
None
and
not
isinstance
(
exc
,
tuple
):
exc
=
(
exc
,
)
self
.
items
.
append
((
result
,
exc
))
self
.
sem
.
release
()
def
send_exception
(
self
,
*
args
):
# the arguments are the same as for greenlet.throw
return
self
.
send
(
exc
=
args
)
def
wait
(
self
):
"""Wait for an item sent by a send() call, in FIFO order.
If the corresponding send() specifies exc=SomeExceptionClass, this
wait() will raise that exception.
Otherwise, this wait() will return the corresponding send() call's
result= parameter.
"""
self
.
sem
.
acquire
()
result
,
exc
=
self
.
items
.
popleft
()
if
exc
is
not
None
:
api
.
getcurrent
()
.
throw
(
*
exc
)
return
result
def
ready
(
self
):
# could also base this on self.sem.counter...
return
len
(
self
.
items
)
>
0
class
Actor
(
object
):
""" A free-running coroutine that accepts and processes messages.
Kind of the equivalent of an Erlang process, really. It processes
a queue of messages in the order that they were sent. You must
subclass this and implement your own version of receive().
The actor's reference count will never drop to zero while the
coroutine exists; if you lose all references to the actor object
it will never be freed.
"""
def
__init__
(
self
,
concurrency
=
1
):
""" Constructs an Actor, kicking off a new coroutine to process the messages.
The concurrency argument specifies how many messages the actor will try
to process concurrently. If it is 1, the actor will process messages
serially.
"""
self
.
_mailbox
=
collections
.
deque
()
self
.
_event
=
event
()
self
.
_killer
=
api
.
spawn
(
self
.
run_forever
)
self
.
_pool
=
CoroutinePool
(
min_size
=
0
,
max_size
=
concurrency
)
def
run_forever
(
self
):
""" Loops forever, continually checking the mailbox. """
while
True
:
if
not
self
.
_mailbox
:
self
.
_event
.
wait
()
self
.
_event
=
event
()
else
:
# leave the message in the mailbox until after it's
# been processed so the event doesn't get triggered
# while in the received method
self
.
_pool
.
execute_async
(
self
.
received
,
self
.
_mailbox
[
0
])
self
.
_mailbox
.
popleft
()
def
cast
(
self
,
message
):
""" Send a message to the actor.
If the actor is busy, the message will be enqueued for later
consumption. There is no return value.
>>> a = Actor()
>>> a.received = lambda msg: msg
>>> a.cast("hello")
"""
self
.
_mailbox
.
append
(
message
)
# if this is the only message, the coro could be waiting
if
len
(
self
.
_mailbox
)
==
1
:
self
.
_event
.
send
()
def
received
(
self
,
message
):
""" Called to process each incoming message.
The default implementation just raises an exception, so
replace it with something useful!
>>> class Greeter(Actor):
... def received(self, (message, evt) ):
... print "received", message
... if evt: evt.send()
...
>>> a = Greeter()
This example uses events to synchronize between the actor and the main
coroutine in a predictable manner, but this kinda defeats the point of
the Actor, so don't do it in a real application.
>>> evt = event()
>>> a.cast( ("message 1", evt) )
>>> evt.wait() # force it to run at this exact moment
received message 1
>>> evt.reset()
>>> a.cast( ("message 2", None) )
>>> a.cast( ("message 3", evt) )
>>> evt.wait()
received message 2
received message 3
>>> api.kill(a._killer) # test cleanup
"""
raise
NotImplementedError
()
def
_test
():
print
"Running doctests. There will be no further output if they succeed."
import
doctest
doctest
.
testmod
()
if
__name__
==
"__main__"
:
_test
()
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Sat, Nov 23, 7:32 AM (1 d, 18 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3409023
Default Alt Text
coros.py (18 KB)
Attached To
Mode
rPYEVENTLIB python3-eventlib
Attached
Detach File
Event Timeline
Log In to Comment