API reference

Synchronization primitives

class tricycle.RWLock(*, read_biased: bool = False)

A readers-writer lock.

Each acquisition of the lock specifies whether it is a “reader” or a “writer”. At any given time, the lock may be held by one writer and no readers, by many readers and no writer, or by no one.

This implementation is fair by default: if task A tried to acquire the lock before task B did, task B won’t get it first. This implies that new readers can’t acquire a reader-held lock after a writer has started waiting to acquire it, which helps avoid starvation of writers by readers. (The Wikipedia article linked above calls this “write-preferring”.) If you want different behavior, see the read_biased attribute.

read_biased

Whether new readers should be able to immediately acquire a readers-held lock even after some writers have started waiting for it. (The Wikipedia article linked above calls this “weakly read-preferring”.) Note that setting read_biased to True can result in indefinite starvation of writers if the read workload is busy enough. Changing this attribute to True will immediately wake up all waiting readers to grant them the lock if it is currently readers-held with writers waiting.

Type:

bool

await acquire(*, for_write: bool) None

Acquire the lock, blocking if necessary.

Parameters:

for_write – If True, acquire the lock in write mode, which provides exclusive access. If False, acquire the lock in read mode, which permits other readers to also hold it.

Raises:

RuntimeError – if the current task already holds the lock (in either read or write mode)

await acquire_read() None

Equivalent to acquire(for_write=False).

await acquire_write() None

Equivalent to acquire(for_write=True).

acquire_nowait(*, for_write: bool) None

Attempt to acquire the lock, without blocking.

Parameters:

for_write – If True, attempt to acquire the lock in write mode, which provides exclusive access. If False, attempt to acquire the lock in read mode, which permits other readers to also hold it.

Raises:
  • trio.WouldBlock – if the lock cannot be acquired without blocking

  • RuntimeError – if the current task already holds the lock (in either read or write mode)

acquire_read_nowait() None

Equivalent to acquire_nowait(for_write=False).

acquire_write_nowait() None

Equivalent to acquire_nowait(for_write=True).

release() None

Release the lock.

Raises:

RuntimeError – if the current task does not hold the lock (in either read or write mode)

async with read_locked() AsyncIterator[None]

Returns an async context manager whose __aenter__ blocks to acquire the lock in read mode, and whose __aexit__ synchronously releases it.

async with write_locked() AsyncIterator[None]

Returns an async context manager whose __aenter__ blocks to acquire the lock in write mode, and whose __aexit__ synchronously releases it.

locked() str

Check whether the lock is currently held.

Returns:

"read" if the lock is held by reader(s), "write" if the lock is held by a writer, or "" (which tests as false) if the lock is not held.

statistics() _RWLockStatistics

Return an object containing debugging information.

Currently the following fields are defined:

  • locked: boolean indicating whether the lock is held by anyone

  • state: string with one of the values "read" (held by one or more readers), "write" (held by one writer), or "unlocked" (held by no one)

  • readers: a frozenset of the Tasks currently holding the lock in read mode (may be empty)

  • writer: the trio.lowlevel.Task currently holding the lock in write mode, or None if the lock is not held in write mode

  • readers_waiting: the number of tasks blocked waiting to acquire the lock in read mode

  • writers_waiting: the number of tasks blocked waiting to acquire the lock in write mode

Stream helpers

tricycle comes with two wrappers around Trio receive streams: BufferedReceiveStream, which helps in parsing binary protocols that use fixed-length fields, and TextReceiveStream, which helps in parsing line-oriented textual data.

class tricycle.BufferedReceiveStream(transport_stream: ReceiveStream, chunk_size: int = 4096)

Bases: AsyncResource

Wraps a ReceiveStream with buffering capabilities, so you can receive known amounts of data at a time.

await aclose() None

Discard all buffered data and close the underlying stream.

await receive(num_bytes: int) bytes

Receive and return num_bytes bytes, or fewer if EOF is encountered.

Parameters:

num_bytes (int) – The number of bytes to return. Must be greater than zero.

Returns:

bytes or bytearray – The data received, exactly num_bytes bytes unless EOF is encountered. If there is no data left to return before EOF, returns an empty bytestring (b"").

Raises:

Exception – Anything raised by the receive_some() method of the underlying transport stream.

await receive_all_or_none(num_bytes: int) bytes | None

Receive and return exactly num_bytes bytes, or None if EOF is encountered before receiving any bytes.

Parameters:

num_bytes (int) – The number of bytes to return. Must be greater than zero.

Returns:

bytes or None – The data received, exactly num_bytes bytes; unless EOF is encountered before reading any data, in which case we return None.

Raises:

ValueError – if EOF is encountered after reading at least one byte but before reading num_bytes bytes.

await receive_exactly(num_bytes: int) bytes

Receive and return exactly num_bytes bytes, throwing an exception if EOF is encountered before then.

Parameters:

num_bytes (int) – The number of bytes to return. Must be greater than zero.

Returns:

bytes – The data received, exactly num_bytes bytes.

Raises:

ValueError – if EOF is encountered before reading num_bytes bytes.

unget(data: bytes) None

Put the bytes in data back into the buffer, so they will be the next thing received by a call to one of the receive methods.

class tricycle.TextReceiveStream(transport_stream: ReceiveStream, encoding: str | None = None, *, errors: str | None = None, newline: str | None = '', chunk_size: int = 8192)

Bases: AsyncResource

Wraps a ReceiveStream with buffering and decoding capabilities for receiving line-oriented text.

See io.TextIOWrapper for more documentation on the encoding, errors, and newline arguments.

Parameters:
  • transport_stream (ReceiveStream) – The stream to receive data on.

  • encoding (str) – The encoding with which to decode received data. If none is specified, we use the value returned by locale.getpreferredencoding().

  • errors (str) – Controls how to respond to decoding errors; common values include "strict" (throw an exception), "ignore" (drop the bad character), or "replace" (replace the bad character with a replacement marker). The default of None is equivalent to "strict".

  • newline (str) – Controls how line endings are handled. Use None to convert any newline format to "\n", "" to accept any newline format and pass it through unchanged, or "\r", "\n", or "\r\n" to only accept that sequence as a newline.

  • chunk_size (int) – The number of bytes to request in each call to the underlying transport stream’s receive_some() method.

transport_stream
encoding
errors
chunk_size

The values passed as constructor parameters are also available as attributes on the resulting TextReceiveStream object. errors and chunk_size are writable; the others are read-only. (For example, if a read fails with a UnicodeDecodeError, it is safe to set stream.errors = "replace" and retry the read.)

await aclose() None

Discard all buffered data and close the underlying stream.

property newlines: str | Tuple[str, ...] | None

The newline sequences that have actually been observed in the input.

If no newline sequences have been observed, or if you specified a particular newline type when constructing this stream, this attribute is None. Otherwise, it is a single string or a tuple of strings drawn from the set {"\r", "\n", "\r\n"}.

await receive_line(max_chars: int = -1) str

Receive and decode data on this stream until max_chars have been received or a newline or end-of-file is encountered. The meaning of “newline” depends on the newline argument passed at construction time.

Parameters:

max_chars (int) – The maximum number of characters to return if no newline sequence is received. If negative, read until newline or EOF.

Returns:

str – The line received. It always ends with a newline unless we reached max_chars or EOF. If there is no data left to return before EOF, returns an empty string ("").

Raises:
  • UnicodeDecodeError – if the received data can’t be decoded

  • Anything else – that was raised by the underlying transport stream’s receive_some() method.

Cancellation helpers

Gracefully shutting down a complex task tree can sometimes require tasks to be cancelled in a particular order. As a motivating example, we’ll consider a simple protocol implementation where the client and server exchange newline-terminated textual messages, and the client is supposed to send a message containing the text “goodbye” before it disconnects:

async def receive_messages(
    source: trio.abc.ReceiveStream, sink: trio.abc.SendChannel[str]
) -> None:
    async for line in TextReceiveStream(source, newline="\r\n"):
        await sink.send(line.rstrip("\r\n"))
    await sink.aclose()

async def send_messages(
    source: trio.abc.ReceiveChannel[str], sink: trio.abc.HalfCloseableStream
) -> None:
    async with source:
        async for msg in source:
            await sink.send_all(msg.encode("utf-8") + b"\r\n")
        await sink.send_eof()

@asynccontextmanager
async def wrap_stream(
    stream: trio.abc.HalfCloseableStream
) -> AsyncIterator[trio.abc.ReceiveChannel[str], trio.abc.SendChannel[str]]:
    async with trio.open_nursery() as nursery:
        incoming_w, incoming_r = trio.open_memory_channel[str](0)
        outgoing_w, outgoing_r = trio.open_memory_channel[str](0)
        nursery.start_soon(receive_messages, stream, incoming_w)
        nursery.start_soon(send_messages, outgoing_r, stream)
        try:
            yield (incoming_r, outgoing_w)
        finally:
            with trio.move_on_after(1) as scope:
                scope.shield = True
                await outgoing_w.send("goodbye")

async def example() -> None:
    with trio.move_on_after(5):
        async with trio.open_tcp_stream("example.com", 1234) as stream, \
                   wrap_stream(stream) as (incoming, outgoing):
            async for line in incoming:
                await outgoing.send("you said: " + line)
                if line == "quit":
                    break

The intent is that example() will echo back each message it receives, until either it receives a “quit” message or five seconds have elapsed. wrap_stream() has carefully set up a shielded cancel scope around the place where it sends the goodbye message, so that the message can still be sent if the async with wrap_stream(...) block is cancelled. (Without this shield, the call to send() would immediately raise Cancelled without sending anything.)

If you run this, though, you’ll find that it doesn’t quite work. Exiting due to a “quit” will send the goodbye, but exiting on a cancellation won’t. In fact, the cancellation case will probably crash with a BrokenResourceError when it tries to send the goodbye. Why is this?

The problem is that the call to send() isn’t sufficient on its own to cause the message to be transmitted. It only places the message into a channel; nothing will actually be sent until the send_messages() task reads from that channel and passes some bytes to send_all(). Before that can happen, send_messages() will itself have been cancelled.

The pattern in this example is a common one: some work running in the body of a nursery is reliant on services provided by background tasks in that nursery. A normal Trio nursery doesn’t draw any distinctions between the body of the async with and the background tasks; if the nursery is cancelled, everything in it will receive that cancellation immediately. In this case, though, it seems that all of our troubles would be resolved if only we could somehow ensure that those background tasks stay running until the body of the async with has completed.

tricycle’s service nursery does exactly this.

async with tricycle.open_service_nursery() AsyncIterator[Nursery]

Provides a nursery augmented with a cancellation ordering constraint.

If an entire service nursery becomes cancelled, either due to an exception raised by some task in the nursery or due to the cancellation of a scope that surrounds the nursery, the body of the nursery async with block will receive the cancellation first, and no other tasks in the nursery will be cancelled until the body of the async with block has been exited.

This is intended to support the common pattern where the body of the async with block uses some service that the other task(s) in the nursery provide. For example, if you have:

async with open_websocket(host, port) as conn:
    await communicate_with_websocket(conn)

where open_websocket() enters a nursery and spawns some tasks into that nursery to manage the connection, you probably want conn to remain usable in any finally or __aexit__ blocks in communicate_with_websocket(). With a regular nursery, this is not guaranteed; with a service nursery, it is. An example hinting at general usage:

@asynccontextmanager
async def open_websocket(host, port):
    async with open_service_nursery() as nursery:
        try:
            # ... make some child tasks ...
            yield connection
        finally:
            # The yield body is already cancelled, and
            # child tasks are still available here for cleanup...
            pass

Now, anything in the body of the open_websocket() context, including communicate_with_websocket(), will be given first opportunity to cancel gracefully. Subsequently, the finally block in the open_websocket() implementation runs, and tasks spawned within the try body are still available during cleanup.

Note that child tasks spawned using start() gain their protection from premature cancellation only at the point of their call to task_status.started().

If you need to do manipulations of this sort yourself, it can be helpful to be able to treat multiple cancel scopes as a single unit.

class tricycle.MultiCancelScope(*, shield: bool = False, cancel_called: bool = False)

Manages a dynamic set of trio.CancelScopes that can be shielded and cancelled as a unit.

New cancel scopes are added to the managed set using open_child(), which returns the child scope so you can enter it with a with statement. Calls to cancel() and changes to shield apply to all existing children and set the initial state for future children. Each child scope has its own deadline and shield attributes; changes to these do not modify the parent.

There is no cancelled_caught attribute on MultiCancelScope because it would be ambiguous; some of the child scopes might exit via a trio.Cancelled exception and others not. Look at the child trio.CancelScope if you want to see whether it was cancelled or not.

open_child(*, shield: bool | None = None) CancelScope

Return a new child cancel scope.

The child will start out cancelled if the parent cancel() method has been called. Its initial shield state is given by the shield argument, or by the parent’s shield attribute if the shield argument is not specified.

cancel() None

Cancel all child cancel scopes.

Additional children created after a call to cancel() will start out in the cancelled state.

shield

The overall shielding state for this MultiCancelScope.

Setting this attribute sets the shield attribute of all children, as well as the default initial shielding for future children. Individual children may modify their shield state to be different from the parent value, but further changes to the parent MultiCancelScope.shield will override their local choice.

cancel_called

Returns true if cancel() has been called.

Scoped objects

Trio follows the principles of structured concurrency: its general-purpose APIs for spawning background tasks all require that the lifetime of each task be bounded by an async with block in its parent (represented by the nursery object). Sometimes this can seem rather inconvenient; for example, what if you want to create a class whose instances spawn tasks that live for the lifetime of the instance? The traditional approach goes something like this:

class WebsocketConnection:
    def __init__(self, nursery: trio.Nursery, **etc):
        self._nursery = nursery
        # initialize other members from **etc

    async def connect(self):
        await foo()  # can't be in __init__ because __init__ is synchronous
        self._nursery.start_soon(self._manage_connection)

@asynccontextmanager
async def open_websocket_connection(**etc) -> AsyncIterator[WebsocketConnection]:
    async with open_service_nursery() as nursery:
        conn = WebsocketConnection(nursery, **etc)
        await conn.connect()
        yield conn
        nursery.cancel_scope.cancel()

async def use_websocket():
    async with open_websocket_connection(**etc) as conn:
        await conn.send("Hi!")

tricycle improves on this by providing the ability to define scoped objects, which can only be instantiated as part of an async with block. In addition to the usual synchronous __init__ method, their class can define async methods called __open__ and/or __close__ which run at the start and end (respectively) of the async with block. For greater expressive power, it is also possible to define a __wrap__ method which returns the entire async context manager to use.

class tricycle.ScopedObject(*args: Any, **kwds: Any)

An object whose lifetime must be bound to an async with block.

Suppose that Foo is a ScopedObject subclass. Then if you say Foo(*args), you won’t actually get a Foo object; instead, you’ll get an async context manager that evaluates to a Foo object. So you would need to say:

async with Foo(*args) as my_foo:
    # do stuff with my_foo

This allows Foo to have reliable control of its lifetime, so it can spawn background tasks, deterministically execute cleanup code, and so on.

If you want to implement such an object, inherit from ScopedObject and indicate what should happen on entry and exit of the context. This should be done in one of the following two ways:

  • Define async __open__ and/or __close__ methods, which will be called from the context __aenter__ and __aexit__ respectively, taking no arguments and returning None. __close__ will be called no matter whether the context exits normally or due to an exception. (It can tell whether there is an active exception by using sys.exc_info(), but cannot suppress it.) If you use this approach, ScopedObject takes care of invoking any initialization and finalization logic supplied by your base classes.

  • Define a __wrap__ method that returns an async context manager. This gives you more flexibility than implementing __open__ and __close__, because you can run some code outside of your base classes’ scope and can swallow exceptions, but means you have to enter the base classes’ scope yourself.

It is an error to define both __wrap__ and (__open__ or __close__). If you don’t define __wrap__, ScopedObject generates it for you in terms of __open__ and __close__, with semantics equivalent to the following:

@asynccontextmanager
async def __wrap__(self):
    async with super().__wrap__():
        if hasattr(self, "__open__"):
            await self.__open__()
        try:
            yield
        finally:
            if hasattr(self, "__close__"):
                await self.__close__()

A subclass is provided to handle the common case where a nursery should be created and remain open for the lifetime of the object:

class tricycle.BackgroundObject(*args: Any, **kwds: Any)

Bases: ScopedObject

A ScopedObject that automatically creates a service nursery for running background tasks.

If you pass daemon=True when inheriting from BackgroundObject, like so:

class MyObject(BackgroundObject, daemon=True):
    ...

then the tasks spawned in the nursery will automatically be cancelled when the async with MyObject(...) as obj: block exits. Otherwise, the parent waits for the children to exit normally, like the default Trio nursery behavior.

nursery

The nursery that was created for this object. This attribute only exists within the scope of the object’s async with block, so it cannot be used from __init__, nor after the block has been exited.

If made to use BackgroundObject, the websocket example above from above would reduce to:

class WebsocketConnection(BackgroundObject, daemon=True):
    def __init__(self, **etc):
        # initialize other members from **etc

    async def __open__(self) -> None:
        await foo()
        self.nursery.start_soon(self._manage_connection)

async def use_websocket():
    async with WebsocketConnection(**etc) as conn:
        await conn.send("Hi!")

Tree variables

When you start a new Trio task, the initial values of its context variables (contextvars.ContextVar) are inherited from the environment of the start_soon or start call that started the new task. For example, this code:

some_cvar = contextvars.ContextVar()

async def print_in_child(tag):
    print("In child", tag, "some_cvar has value", some_cvar.get())

some_cvar.set(1)
async with trio.open_nursery() as nursery:
    nursery.start_soon(print_in_child, 1)
    some_cvar.set(2)
    nursery.start_soon(print_in_child, 2)
    some_cvar.set(3)
    print("In parent some_cvar has value", some_cvar.get())

will produce output like:

In parent some_cvar has value 3
In child 1 some_cvar has value 1
In child 2 some_cvar has value 2

(If you run it yourself, you might find that the “child 2” line comes before “child 1”, but it will still be the case that child 1 sees value 1 while child 2 sees value 2.)

You might wonder why this differs from the behavior of cancel scopes, which only apply to a new task if they surround the new task’s entire nursery (as explained in the Trio documentation about child tasks and cancellation). The difference is that a cancel scope has a limited lifetime (it can’t cancel anything once you exit its with block), while a context variable’s value is just a value (request #42 can keep being request #42 for as long as it likes, without any cooperation from the task that created it).

In specialized cases, you might want to provide a task-local value that’s inherited only from the parent nursery, like cancel scopes are. For example, maybe you’re trying to provide child tasks with access to a limited-lifetime resource such as a nursery or network connection, and you only want a task to be able to use the resource if it’s going to remain available for the task’s entire lifetime. You can support this use case using TreeVar, which is like contextvars.ContextVar except for the way that it’s inherited by new tasks. (It’s a “tree” variable because it’s inherited along the parent-child links that form the Trio task tree.)

If the above example used TreeVar, then its output would be:

In parent some_cvar has value 3
In child 1 some_cvar has value 1
In child 2 some_cvar has value 1

because child 2 would inherit the value from its parent nursery, rather than from the environment of the start_soon() call that creates it.

class tricycle.TreeVar(name[, *, default])

A “tree variable”: like a context variable except that its value in a new task is inherited from the new task’s parent nursery rather than from the new task’s spawner.

TreeVar objects support all the same methods and attributes as ContextVar objects (get(), set(), reset(), and name), and they are constructed the same way. They also provide the additional methods being() and get_in(), documented below.

Accessing or changing the value of a TreeVar outside of a Trio task will raise RuntimeError. (Exception: get_in() still works outside of a task, as long as you have a reference to the task or nursery of interest.)

Note

TreeVar values are not directly stored in the contextvars.Context, so you can’t use Context.get() to access them. If you need the value in a context other than your own, use get_in().

with being(value: T) Iterator[None]

Returns a context manager which sets the value of this TreeVar to value upon entry and restores its previous value upon exit.

get_in(task_or_nursery: Task | Nursery) T
get_in(task_or_nursery: Task | Nursery, default: U) T | U

Gets the value of this TreeVar in the given Task or Nursery.

The value in a task is the value that would be returned by a call to get() in that task. The value in a nursery is the value that would be returned by get() at the beginning of a new child task started in that nursery. The default argument has the same semantics as it does for get().