API reference¶
Synchronization primitives¶
- class tricycle.RWLock(*, read_biased: bool = False)¶
-
Each acquisition of the lock specifies whether it is a “reader” or a “writer”. At any given time, the lock may be held by one writer and no readers, by many readers and no writer, or by no one.
This implementation is fair by default: if task A tried to acquire the lock before task B did, task B won’t get it first. This implies that new readers can’t acquire a reader-held lock after a writer has started waiting to acquire it, which helps avoid starvation of writers by readers. (The Wikipedia article linked above calls this “write-preferring”.) If you want different behavior, see the
read_biased
attribute.- read_biased¶
Whether new readers should be able to immediately acquire a readers-held lock even after some writers have started waiting for it. (The Wikipedia article linked above calls this “weakly read-preferring”.) Note that setting
read_biased
toTrue
can result in indefinite starvation of writers if the read workload is busy enough. Changing this attribute toTrue
will immediately wake up all waiting readers to grant them the lock if it is currently readers-held with writers waiting.- Type:
- await acquire(*, for_write: bool) None ¶
Acquire the lock, blocking if necessary.
- Parameters:
for_write – If True, acquire the lock in write mode, which provides exclusive access. If False, acquire the lock in read mode, which permits other readers to also hold it.
- Raises:
RuntimeError – if the current task already holds the lock (in either read or write mode)
- acquire_nowait(*, for_write: bool) None ¶
Attempt to acquire the lock, without blocking.
- Parameters:
for_write – If True, attempt to acquire the lock in write mode, which provides exclusive access. If False, attempt to acquire the lock in read mode, which permits other readers to also hold it.
- Raises:
trio.WouldBlock – if the lock cannot be acquired without blocking
RuntimeError – if the current task already holds the lock (in either read or write mode)
- release() None ¶
Release the lock.
- Raises:
RuntimeError – if the current task does not hold the lock (in either read or write mode)
- async with read_locked() AsyncIterator[None] ¶
Returns an async context manager whose
__aenter__
blocks to acquire the lock in read mode, and whose__aexit__
synchronously releases it.
- async with write_locked() AsyncIterator[None] ¶
Returns an async context manager whose
__aenter__
blocks to acquire the lock in write mode, and whose__aexit__
synchronously releases it.
- locked() str ¶
Check whether the lock is currently held.
- Returns:
"read"
if the lock is held by reader(s),"write"
if the lock is held by a writer, or""
(which tests as false) if the lock is not held.
- statistics() _RWLockStatistics ¶
Return an object containing debugging information.
Currently the following fields are defined:
locked
: boolean indicating whether the lock is held by anyonestate
: string with one of the values"read"
(held by one or more readers),"write"
(held by one writer), or"unlocked"
(held by no one)readers
: a frozenset of theTask
s currently holding the lock in read mode (may be empty)writer
: thetrio.lowlevel.Task
currently holding the lock in write mode, or None if the lock is not held in write modereaders_waiting
: the number of tasks blocked waiting to acquire the lock in read modewriters_waiting
: the number of tasks blocked waiting to acquire the lock in write mode
Stream helpers¶
tricycle comes with two wrappers around Trio receive streams:
BufferedReceiveStream
, which helps in parsing binary protocols that
use fixed-length fields, and TextReceiveStream
, which helps in
parsing line-oriented textual data.
- class tricycle.BufferedReceiveStream(transport_stream: ReceiveStream, chunk_size: int = 4096)¶
Bases:
AsyncResource
Wraps a
ReceiveStream
with buffering capabilities, so you can receive known amounts of data at a time.- await receive(num_bytes: int) bytes ¶
Receive and return
num_bytes
bytes, or fewer if EOF is encountered.- Parameters:
num_bytes (int) – The number of bytes to return. Must be greater than zero.
- Returns:
bytes or bytearray – The data received, exactly
num_bytes
bytes unless EOF is encountered. If there is no data left to return before EOF, returns an empty bytestring (b""
).- Raises:
Exception – Anything raised by the
receive_some()
method of the underlying transport stream.
- await receive_all_or_none(num_bytes: int) bytes | None ¶
Receive and return exactly
num_bytes
bytes, orNone
if EOF is encountered before receiving any bytes.- Parameters:
num_bytes (int) – The number of bytes to return. Must be greater than zero.
- Returns:
bytes or None – The data received, exactly
num_bytes
bytes; unless EOF is encountered before reading any data, in which case we returnNone
.- Raises:
ValueError – if EOF is encountered after reading at least one byte but before reading
num_bytes
bytes.
- await receive_exactly(num_bytes: int) bytes ¶
Receive and return exactly
num_bytes
bytes, throwing an exception if EOF is encountered before then.- Parameters:
num_bytes (int) – The number of bytes to return. Must be greater than zero.
- Returns:
bytes – The data received, exactly
num_bytes
bytes.- Raises:
ValueError – if EOF is encountered before reading
num_bytes
bytes.
- class tricycle.TextReceiveStream(transport_stream: ReceiveStream, encoding: str | None = None, *, errors: str | None = None, newline: str | None = '', chunk_size: int = 8192)¶
Bases:
AsyncResource
Wraps a
ReceiveStream
with buffering and decoding capabilities for receiving line-oriented text.See
io.TextIOWrapper
for more documentation on theencoding
,errors
, andnewline
arguments.- Parameters:
transport_stream (ReceiveStream) – The stream to receive data on.
encoding (str) – The encoding with which to decode received data. If none is specified, we use the value returned by
locale.getpreferredencoding()
.errors (str) – Controls how to respond to decoding errors; common values include
"strict"
(throw an exception),"ignore"
(drop the bad character), or"replace"
(replace the bad character with a replacement marker). The default ofNone
is equivalent to"strict"
.newline (str) – Controls how line endings are handled. Use
None
to convert any newline format to"\n"
,""
to accept any newline format and pass it through unchanged, or"\r"
,"\n"
, or"\r\n"
to only accept that sequence as a newline.chunk_size (int) – The number of bytes to request in each call to the underlying transport stream’s
receive_some()
method.
- transport_stream¶
- encoding¶
- errors¶
- chunk_size¶
The values passed as constructor parameters are also available as attributes on the resulting
TextReceiveStream
object.errors
andchunk_size
are writable; the others are read-only. (For example, if a read fails with aUnicodeDecodeError
, it is safe to setstream.errors = "replace"
and retry the read.)
- property newlines: str | Tuple[str, ...] | None¶
The newline sequences that have actually been observed in the input.
If no newline sequences have been observed, or if you specified a particular
newline
type when constructing this stream, this attribute isNone
. Otherwise, it is a single string or a tuple of strings drawn from the set{"\r", "\n", "\r\n"}
.
- await receive_line(max_chars: int = -1) str ¶
Receive and decode data on this stream until
max_chars
have been received or a newline or end-of-file is encountered. The meaning of “newline” depends on thenewline
argument passed at construction time.- Parameters:
max_chars (int) – The maximum number of characters to return if no newline sequence is received. If negative, read until newline or EOF.
- Returns:
str – The line received. It always ends with a newline unless we reached
max_chars
or EOF. If there is no data left to return before EOF, returns an empty string (""
).- Raises:
UnicodeDecodeError – if the received data can’t be decoded
Anything else – that was raised by the underlying transport stream’s
receive_some()
method.
Cancellation helpers¶
Gracefully shutting down a complex task tree can sometimes require tasks to be cancelled in a particular order. As a motivating example, we’ll consider a simple protocol implementation where the client and server exchange newline-terminated textual messages, and the client is supposed to send a message containing the text “goodbye” before it disconnects:
async def receive_messages(
source: trio.abc.ReceiveStream, sink: trio.abc.SendChannel[str]
) -> None:
async for line in TextReceiveStream(source, newline="\r\n"):
await sink.send(line.rstrip("\r\n"))
await sink.aclose()
async def send_messages(
source: trio.abc.ReceiveChannel[str], sink: trio.abc.HalfCloseableStream
) -> None:
async with source:
async for msg in source:
await sink.send_all(msg.encode("utf-8") + b"\r\n")
await sink.send_eof()
@asynccontextmanager
async def wrap_stream(
stream: trio.abc.HalfCloseableStream
) -> AsyncIterator[trio.abc.ReceiveChannel[str], trio.abc.SendChannel[str]]:
async with trio.open_nursery() as nursery:
incoming_w, incoming_r = trio.open_memory_channel[str](0)
outgoing_w, outgoing_r = trio.open_memory_channel[str](0)
nursery.start_soon(receive_messages, stream, incoming_w)
nursery.start_soon(send_messages, outgoing_r, stream)
try:
yield (incoming_r, outgoing_w)
finally:
with trio.move_on_after(1) as scope:
scope.shield = True
await outgoing_w.send("goodbye")
async def example() -> None:
with trio.move_on_after(5):
async with trio.open_tcp_stream("example.com", 1234) as stream, \
wrap_stream(stream) as (incoming, outgoing):
async for line in incoming:
await outgoing.send("you said: " + line)
if line == "quit":
break
The intent is that example()
will echo back each message it receives,
until either it receives a “quit” message or five seconds have elapsed.
wrap_stream()
has carefully set up a shielded cancel scope around
the place where it sends the goodbye message, so that the message can
still be sent if the async with wrap_stream(...)
block is
cancelled. (Without this shield, the call to send()
would
immediately raise Cancelled
without sending anything.)
If you run this, though, you’ll find that it doesn’t quite work.
Exiting due to a “quit” will send the goodbye, but exiting on a
cancellation won’t. In fact, the cancellation case will probably
crash with a BrokenResourceError
when it tries to send
the goodbye. Why is this?
The problem is that the call to send()
isn’t sufficient on its own to
cause the message to be transmitted. It only places the message into a
channel; nothing will actually be sent until the send_messages()
task
reads from that channel and passes some bytes to send_all()
.
Before that can happen, send_messages()
will itself have been cancelled.
The pattern in this example is a common one: some work running in the body
of a nursery is reliant on services provided by background tasks in that
nursery. A normal Trio nursery doesn’t draw any distinctions between the
body of the async with
and the background tasks; if the nursery is
cancelled, everything in it will receive that cancellation immediately.
In this case, though, it seems that all of our troubles would be resolved
if only we could somehow ensure that those background tasks stay running
until the body of the async with
has completed.
tricycle’s service nursery does exactly this.
- async with tricycle.open_service_nursery() AsyncIterator[Nursery] ¶
Provides a nursery augmented with a cancellation ordering constraint.
If an entire service nursery becomes cancelled, either due to an exception raised by some task in the nursery or due to the cancellation of a scope that surrounds the nursery, the body of the nursery
async with
block will receive the cancellation first, and no other tasks in the nursery will be cancelled until the body of theasync with
block has been exited.This is intended to support the common pattern where the body of the
async with
block uses some service that the other task(s) in the nursery provide. For example, if you have:async with open_websocket(host, port) as conn: await communicate_with_websocket(conn)
where
open_websocket()
enters a nursery and spawns some tasks into that nursery to manage the connection, you probably wantconn
to remain usable in anyfinally
or__aexit__
blocks incommunicate_with_websocket()
. With a regular nursery, this is not guaranteed; with a service nursery, it is. An example hinting at general usage:@asynccontextmanager async def open_websocket(host, port): async with open_service_nursery() as nursery: try: # ... make some child tasks ... yield connection finally: # The yield body is already cancelled, and # child tasks are still available here for cleanup... pass
Now, anything in the body of the
open_websocket()
context, includingcommunicate_with_websocket()
, will be given first opportunity to cancel gracefully. Subsequently, thefinally
block in theopen_websocket()
implementation runs, and tasks spawned within thetry
body are still available during cleanup.Note that child tasks spawned using
start()
gain their protection from premature cancellation only at the point of their call totask_status.started()
.
If you need to do manipulations of this sort yourself, it can be helpful to be able to treat multiple cancel scopes as a single unit.
- class tricycle.MultiCancelScope(*, shield: bool = False, cancel_called: bool = False)¶
Manages a dynamic set of
trio.CancelScope
s that can be shielded and cancelled as a unit.New cancel scopes are added to the managed set using
open_child()
, which returns the child scope so you can enter it with awith
statement. Calls tocancel()
and changes toshield
apply to all existing children and set the initial state for future children. Each child scope has its owndeadline
andshield
attributes; changes to these do not modify the parent.There is no
cancelled_caught
attribute onMultiCancelScope
because it would be ambiguous; some of the child scopes might exit via atrio.Cancelled
exception and others not. Look at the childtrio.CancelScope
if you want to see whether it was cancelled or not.- open_child(*, shield: bool | None = None) CancelScope ¶
Return a new child cancel scope.
The child will start out cancelled if the parent
cancel()
method has been called. Its initial shield state is given by theshield
argument, or by the parent’sshield
attribute if theshield
argument is not specified.
- cancel() None ¶
Cancel all child cancel scopes.
Additional children created after a call to
cancel()
will start out in the cancelled state.
- shield¶
The overall shielding state for this
MultiCancelScope
.Setting this attribute sets the
shield
attribute of all children, as well as the default initial shielding for future children. Individual children may modify their shield state to be different from the parent value, but further changes to the parentMultiCancelScope.shield
will override their local choice.
Scoped objects¶
Trio follows the principles of structured concurrency:
its general-purpose APIs for spawning background tasks all require that
the lifetime of each task be bounded by an async with
block
in its parent (represented by the nursery
object).
Sometimes this can seem rather inconvenient; for example, what if you want
to create a class whose instances spawn tasks that live for the lifetime of
the instance? The traditional approach goes something like this:
class WebsocketConnection:
def __init__(self, nursery: trio.Nursery, **etc):
self._nursery = nursery
# initialize other members from **etc
async def connect(self):
await foo() # can't be in __init__ because __init__ is synchronous
self._nursery.start_soon(self._manage_connection)
@asynccontextmanager
async def open_websocket_connection(**etc) -> AsyncIterator[WebsocketConnection]:
async with open_service_nursery() as nursery:
conn = WebsocketConnection(nursery, **etc)
await conn.connect()
yield conn
nursery.cancel_scope.cancel()
async def use_websocket():
async with open_websocket_connection(**etc) as conn:
await conn.send("Hi!")
tricycle improves on this by providing the ability to define scoped objects,
which can only be instantiated as part of an async with
block.
In addition to the usual synchronous __init__
method, their class can
define async methods called __open__
and/or __close__
which run at the
start and end (respectively) of the async with
block. For greater expressive
power, it is also possible to define a __wrap__
method which returns the
entire async context manager to use.
- class tricycle.ScopedObject(*args: Any, **kwds: Any)¶
An object whose lifetime must be bound to an
async with
block.Suppose that
Foo
is aScopedObject
subclass. Then if you sayFoo(*args)
, you won’t actually get aFoo
object; instead, you’ll get an async context manager that evaluates to aFoo
object. So you would need to say:async with Foo(*args) as my_foo: # do stuff with my_foo
This allows
Foo
to have reliable control of its lifetime, so it can spawn background tasks, deterministically execute cleanup code, and so on.If you want to implement such an object, inherit from
ScopedObject
and indicate what should happen on entry and exit of the context. This should be done in one of the following two ways:Define async
__open__
and/or__close__
methods, which will be called from the context__aenter__
and__aexit__
respectively, taking no arguments and returningNone
.__close__
will be called no matter whether the context exits normally or due to an exception. (It can tell whether there is an active exception by usingsys.exc_info()
, but cannot suppress it.) If you use this approach,ScopedObject
takes care of invoking any initialization and finalization logic supplied by your base classes.Define a
__wrap__
method that returns an async context manager. This gives you more flexibility than implementing__open__
and__close__
, because you can run some code outside of your base classes’ scope and can swallow exceptions, but means you have to enter the base classes’ scope yourself.
It is an error to define both
__wrap__
and (__open__
or__close__
). If you don’t define__wrap__
,ScopedObject
generates it for you in terms of__open__
and__close__
, with semantics equivalent to the following:@asynccontextmanager async def __wrap__(self): async with super().__wrap__(): if hasattr(self, "__open__"): await self.__open__() try: yield finally: if hasattr(self, "__close__"): await self.__close__()
A subclass is provided to handle the common case where a nursery should be created and remain open for the lifetime of the object:
- class tricycle.BackgroundObject(*args: Any, **kwds: Any)¶
Bases:
ScopedObject
A
ScopedObject
that automatically creates aservice nursery
for running background tasks.If you pass
daemon=True
when inheriting fromBackgroundObject
, like so:class MyObject(BackgroundObject, daemon=True): ...
then the tasks spawned in the nursery will automatically be cancelled when the
async with MyObject(...) as obj:
block exits. Otherwise, the parent waits for the children to exit normally, like the default Trio nursery behavior.- nursery¶
The nursery that was created for this object. This attribute only exists within the scope of the object’s
async with
block, so it cannot be used from__init__
, nor after the block has been exited.
If made to use BackgroundObject
, the websocket example above
from above would reduce to:
class WebsocketConnection(BackgroundObject, daemon=True):
def __init__(self, **etc):
# initialize other members from **etc
async def __open__(self) -> None:
await foo()
self.nursery.start_soon(self._manage_connection)
async def use_websocket():
async with WebsocketConnection(**etc) as conn:
await conn.send("Hi!")
Tree variables¶
When you start a new Trio task, the initial values of its context variables
(contextvars.ContextVar
) are inherited from the environment of the
start_soon
or start
call that
started the new task. For example, this code:
some_cvar = contextvars.ContextVar()
async def print_in_child(tag):
print("In child", tag, "some_cvar has value", some_cvar.get())
some_cvar.set(1)
async with trio.open_nursery() as nursery:
nursery.start_soon(print_in_child, 1)
some_cvar.set(2)
nursery.start_soon(print_in_child, 2)
some_cvar.set(3)
print("In parent some_cvar has value", some_cvar.get())
will produce output like:
In parent some_cvar has value 3
In child 1 some_cvar has value 1
In child 2 some_cvar has value 2
(If you run it yourself, you might find that the “child 2” line comes before “child 1”, but it will still be the case that child 1 sees value 1 while child 2 sees value 2.)
You might wonder why this differs from the behavior of cancel scopes,
which only apply to a new task if they surround the new task’s entire
nursery (as explained in the Trio documentation about
child tasks and cancellation). The difference is that a cancel
scope has a limited lifetime (it can’t cancel anything once you exit
its with
block), while a context variable’s value is just a value
(request #42 can keep being request #42 for as long as it likes,
without any cooperation from the task that created it).
In specialized cases, you might want to provide a task-local value
that’s inherited only from the parent nursery, like cancel scopes are.
For example, maybe you’re trying to provide child tasks with access to
a limited-lifetime resource such as a nursery or network connection,
and you only want a task to be able to use the resource if it’s going
to remain available for the task’s entire lifetime. You can support
this use case using TreeVar
, which is like contextvars.ContextVar
except for the way that it’s inherited by new tasks. (It’s a “tree”
variable because it’s inherited along the parent-child links that form
the Trio task tree.)
If the above example used TreeVar
, then its output would be:
In parent some_cvar has value 3
In child 1 some_cvar has value 1
In child 2 some_cvar has value 1
because child 2 would inherit the value from its parent nursery, rather than
from the environment of the start_soon()
call that creates it.
- class tricycle.TreeVar(name[, *, default])¶
A “tree variable”: like a context variable except that its value in a new task is inherited from the new task’s parent nursery rather than from the new task’s spawner.
TreeVar
objects support all the same methods and attributes asContextVar
objects (get()
,set()
,reset()
, andname
), and they are constructed the same way. They also provide the additional methodsbeing()
andget_in()
, documented below.Accessing or changing the value of a
TreeVar
outside of a Trio task will raiseRuntimeError
. (Exception:get_in()
still works outside of a task, as long as you have a reference to the task or nursery of interest.)Note
TreeVar
values are not directly stored in thecontextvars.Context
, so you can’t useContext.get()
to access them. If you need the value in a context other than your own, useget_in()
.- with being(value: T) Iterator[None] ¶
Returns a context manager which sets the value of this
TreeVar
to value upon entry and restores its previous value upon exit.
- get_in(task_or_nursery: Task | Nursery) T ¶
- get_in(task_or_nursery: Task | Nursery, default: U) T | U
Gets the value of this
TreeVar
in the givenTask
orNursery
.The value in a task is the value that would be returned by a call to
get()
in that task. The value in a nursery is the value that would be returned byget()
at the beginning of a new child task started in that nursery. The default argument has the same semantics as it does forget()
.