asynkit


Nameasynkit JSON
Version 0.12.0 PyPI version JSON
download
home_pagehttps://github.com/kristjanvalur/py-asynkit
SummaryA toolkit for Python coroutines
upload_time2023-12-27 13:02:14
maintainer
docs_urlNone
authorKristján Valur Jónsson
requires_python>=3.8,<4.0
licenseMIT
keywords asyncio eventloop
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # asynkit: A toolkit for Python coroutines

[![CI](https://github.com/kristjanvalur/py-asynkit/actions/workflows/ci.yml/badge.svg)](https://github.com/kristjanvalur/py-asynkit/actions/workflows/ci.yml)

This module provides some handy tools for those wishing to have better control over the
way Python's `asyncio` module does things.

- Helper tools for controlling coroutine execution, such as [`CoroStart`](#corostart) and [`Monitor`](#monitor)
- Utility classes such as [`GeneratorObject`](#generatorobject)
- Coroutine helpers such [`coro_iter()`](#coro_iter) and the [`awaitmethod()`](#awaitmethod) decorator
- Helpers to run _async_ code from _non-async_ code, such as `await_sync()` and `aiter_sync()`
- Scheduling helpers for `asyncio`, and extended event-loop implementations
- _eager_ execution of Tasks
- Exprerimental support for [Priority Scheduling](#priority-scheduling) of Tasks
- Other experimental features such as [`task_interrupt()`](#task_interrupt)
- Limited support for `anyio` and `trio`.

## Installation

```bash
pip install asynkit
```

## Coroutine Tools

### `eager()` - lower latency IO

Did you ever wish that your _coroutines_ started right away, and only returned control to
the caller once they become blocked?  Like the way the `async` and `await` keywords work in the __C#__ language?

Now they can. Just decorate or convert them with `acynkit.eager`:

```python
@asynkit.eager
async def get_slow_remote_data():
    result = await execute_remote_request()
    return result.important_data


async def my_complex_thing():
    # kick off the request as soon as possible
    future = get_slow_remote_data()
    # The remote execution may now already be in flight. Do some work taking time
    intermediate_result = await some_local_computation()
    # wait for the result of the request
    return compute_result(intermediate_result, await future)
```

By decorating your function with `eager`, the coroutine will start executing __right away__ and
control will return to the calling function as soon as it _suspends_, _returns_, or _raises_
an exception. In case it is suspended, a _Task_ is created and returned, ready to resume
execution from that point.

Notice how, in either case, control is returned __directly__ back to the
calling function, maintaining synchronous execution. In effect, conventional code
calling order is maintained as much as possible. We call this _depth-first-execution_.

This allows you to prepare and dispatch long running operations __as soon as possible__ while
still being able to asynchronously wait for the result.

`asynkit.eager` can also be used directly on the returned coroutine:

```python
log = []


async def test():
    log.append(1)
    await asyncio.sleep(0.2)  # some long IO
    log.append(2)


async def caller(convert):
    del log[:]
    log.append("a")
    future = convert(test())
    log.append("b")
    await asyncio.sleep(0.1)  # some other IO
    log.append("c")
    await future


# do nothing
asyncio.run(caller(lambda c: c))
assert log == ["a", "b", "c", 1, 2]

# Create a Task
asyncio.run(caller(asyncio.create_task))
assert log == ["a", "b", 1, "c", 2]

# eager
asyncio.run(caller(asynkit.eager))
assert log == ["a", 1, "b", "c", 2]
```

`eager()` is actually a convenience function, invoking either `coro_eager()` or `func_eager()` (see below) depending on context.
Decorating your function makes sense if you __always__ intend
To _await_ its result at some later point. Otherwise, just apply it at the point
of invocation in each such case.

### `coro_eager()`, `func_eager()`

`coro_eager()` is the magic coroutine wrapper providing the __eager__ behaviour:

1. It copies the current _context_
2. It initializes a `CoroStart()` object for the coroutine, starting it in the copied context.
3. If it subsequently is `done()` It returns `CoroStart.as_future()`, otherwise
   it creates and returns a `Task` (using `asyncio.create_task` by default.)

The result is an _awaitable_ which can be either directly awaited or passed
to `asyncio.gather()`. The coroutine is executed in its own copy of the current context,
just as would happen if it were directly turned into a `Task`.

`func_eager()` is a decorator which automatically applies `coro_eager()` to the coroutine returned by an async function.

### `await_sync(), aiter_sync()` - Running coroutines synchronously

If you are writing code which should work both synchronously and asynchronously,
you can now write the code fully _async_ and then run it _synchronously_ in the absence
of an event loop.  As long as the code doesn't _block_ (await unfinished _futures_) and doesn't try to access the event loop, it can successfully be executed.  This helps avoid writing duplicate code.

```python
async def async_get_processed_data(datagetter):
    data = datagetter()  # an optionally async callback
    data = await data if isawaitable(data) else data
    return process_data(data)


# raises SynchronousError if datagetter blocks
def sync_get_processed_data(datagetter):
    return asynkit.await_sync(async_get_processed_data(datagetter))
```

This sort of code might previously have been written thus:

```python
# A hybrid function, _may_ return an _awaitable_
def hybrid_get_processed_data(datagetter):
    data = datagetter()
    if isawaitable(data):
        # return an awaitable helper closure
        async def helper():
            data = await data
            return process_data(data)

        return helper
    return process_data(data)  # duplication


async def async_get_processed_data(datagetter):
    r = hybrid_get_processed_data(datagetter)
    return await r if isawaitable(r) else r


def sync_get_processed_data(datagetter):
    r = hybrid_get_processed_data(datagetter)
    if isawaitable(r):
        raise RuntimeError("callbacks failed to run synchronously")
    return r
```

The above pattern, writing async methods as sync and returning async helpers,
is common in library code which needs to work both in synchronous and asynchronous
context.  Needless to say, it is very convoluted, hard to debug and contains a lot
of code duplication where the same logic is repeated inside async helper closures.

Using `await_sync()` it is possible to write the entire logic as `async` methods and
then simply fail if the code tries to invoke any truly async operations.
If the invoked coroutine blocks, a `SynchronousError` is raised _from_ a `SynchronousAbort` exception which
contains a traceback.  This makes it easy to pinpoint the location in the code where the
async code blocked.  If the code tries to access the event loop, e.g. by creating a `Task`, a `RuntimeError` will be raised.  

The `syncfunction()` decorator can be used to automatically wrap an async function
so that it is executed using `await_sync()`:

```pycon
>>> @asynkit.syncfunction
... async def sync_function():
...     async def async_function():
...         return "look, no async!"
...     return await async_function()
...
>>> sync_function()
'look, no async!'
>>>
```

the `asyncfunction()` utility can be used when passing synchronous callbacks to async
code, to make them async.  This, along with `syncfunction()` and `await_sync()`,
can be used to integrate synchronous code with async middleware:

```python
@asynkit.syncfunction
async def sync_client(sync_callback):
    middleware = AsyncMiddleware(asynkit.asyncfunction(sync_callback))
    return await middleware.run()
```

Using this pattern, one can write the middleware completely async, make it also work
for synchronous code, while avoiding the hybrid function _antipattern._

#### `aiter_sync()`

A helper function is provided, which turns an `AsyncIterable` into
a generator, leveraging the `await_sync()` method:

```python
async def agen():
    for v in range(3):
        yield v


assert list(aiter_sync(agen())) == [1, 2, 3]
```

This is useful if using patterns such as `GeneratorObject` in a synchronous
application.

### `CoroStart`

This class manages the state of a partially run coroutine and is what what powers the `coro_eager()` and `await_sync()` functions.
When initialized, it will _start_ the coroutine, running it until it either suspends, returns, or raises
an exception.  It can subsequently be _awaited_ to retrieve the result.

Similarly to a `Future`, it has these methods:

- `done()` - returns `True` if the coroutine finished without blocking. In this case, the following two methods may be called to get the result.
- `result()` - Returns the _return value_ of the coroutine or __raises__ any _exception_ that it produced.
- `exception()` - Returns any _exception_ raised, or `None` otherwise.

 But more importantly it has these:

- `__await__()` - A magic method making it directly _awaitable_. If it has already finished, awaiting this coroutine is the same as calling `result()`, otherwise it awaits the original coroutine's continued execution
- `as_coroutine()` - A helper which returns a proper _coroutine_ object to await the `CoroStart`
- `as_future()` - If `done()`, returns a `Future` holding its result, otherwise, a `RuntimeError`
  is raised.
- `as_awaitable()` - If `done()`, returns `as_future()`, else returns `self`.
  This is a convenience method for use with functions such as `asyncio.gather()`, which would otherwise wrap a completed coroutine in a `Task`.

In addition it has:

- `aclose()` - If `not done()`, will throw a `GeneratorError` into the coroutine and wait for it to finish.  Otherwise does nothing.
- `athrow(exc)` - If `not done()`, will throw the given error into the coroutine and wait for it to raise or return a value.
- `close()` and `throw(exc)` - Synchronous versions of the above, will raise `RuntimeError` if the coroutine does not immediately exit.

This means that a context manager such as `aclosing()` can be used to ensure
that the coroutine is cleaned up in case of errors before it is awaited:

```python
# start foo() and run until it blocks
async with aclosing(CoroStart(foo())) as coro:
    ...  # do things, which may result in an error
    return await coro
```

CoroStart can be provided with a `contextvars.Context` object, in which case the coroutine will be run using that
context.

### Context helper

`coro_await()` is a helper function to await a coroutine, optionally with a `contextvars.Context`
object to activate:

```python
var1 = contextvars.ContextVar("myvar")


async def my_method():
    var1.set("foo")


async def main():
    context = contextvars.copy_context()
    var1.set("bar")
    await asynkit.coro_await(my_method(), context=context)
    # the coroutine didn't modify _our_ context
    assert var1.get() == "bar"
    # ... but it did modify the copied context
    assert context.get(var1) == "foo"
```

This is similar to `contextvars.Context.run()` but works for async functions.  This function is
implemented using [`CoroStart`](#corostart)

### `awaitmethod()`

This decorator turns the decorated method into a `Generator` as required for
`__await__` methods, which must only return `Iterator` objects.
It does so by invoking the `coro_iter()` helper.

This makes it simple to make a class instance _awaitable_ by decorating an `async`
`__await__()` method.

```python
class Awaitable:
    def __init__(self, cofunc):
        self.cofunc = cofunc
        self.count = 0

    @asynkit.awaitmethod
    async def __await__(self):
        await self.cofunc()
        return self.count
        self.count += 1


async def main():
    async def sleeper():
        await asyncio.sleep(1)

    a = Awaitable(sleeper)
    assert (await a) == 0  # sleep once
    assert (await a) == 1  # sleep again


asyncio.run(main())
```

Unlike a regular _coroutine_ (the result of calling a _coroutine function_), an object with an `__await__` method can potentially be awaited multiple times.

The method can also be a `classmethod` or `staticmethod:`

```python
class Constructor:
    @staticmethod
    @asynkit.awaitmethod
    async def __await__():
        await asyncio.sleep(0)
        return Constructor()


async def construct():
    return await Constructor
```

### `awaitmethod_iter()`

An alternative way of creating an __await__ method, it uses
the `coro_iter()` method to to create a coroutine iterator.  It
is provided for completeness.

### `coro_iter()`

This helper function turns a coroutine function into an iterator.  It is primarily
intended to be used by the [`awaitmethod_iter()`](#awaitmethod_iter) function decorator.

### Monitors and Generators

#### Monitor

A `Monitor` object can be used to await a coroutine, while listening for _out of band_ messages
from the coroutine.  As the coroutine sends messages, it is suspended, until the caller resumes
awaiting for it.

```python
async def coro(monitor):
    await monitor.oob("hello")
    await asyncio.sleep(0)
    await monitor.oob("dolly")
    return "done"


async def runner():
    m = Monitor()
    c = coro(m)
    while True:
        try:
            print(await m.aawait(c))
            break
        except OOBData as oob:
            print(oob.data)
```

which will result in the output

```bash
hello
dolly
done
```

For convenience, the `Monitor` can be _bound_ so that the caller does not have
to keep the coroutine around.  Calling the monitor with the coroutine returns a `BoundMonitor`:

```python
async def coro(m):
    await m.oob("foo")
    return "bar"


m = Monitor()
b = m(coro(m))
try:
    await b
except OOBData as oob:
    assert oob.data == "foo"
assert await b == "bar"
```

Notice how the `BoundMonitor` can be _awaited_ directly, which is the same as awaiting
`b.aawait(None)`.

The caller can pass in _data_ to the coroutine via the `aawait(data=None)` method and
it will become the _return value_ of the `Monitor.oob()` call in the coroutine.
`Monitor.athrow()` can similarly be used to raise an exception out of the `Montitor.oob()` call.
Neither data nor an exception can be sent the first time the coroutine is awaited,
only as a response to a previous `OOBData` exception.

A `Monitor` can be used when a coroutine wants to suspend itself, maybe waiting for some external
condition, without resorting to the relatively heavy mechanism of creating, managing and synchronizing
`Task` objects.  This can be useful if the coroutine needs to maintain state.  Additionally,
this kind of messaging does not require an _event loop_ to be present and can can be driven
using `await_sync()` (see below.)

Consider the following scenario. A _parser_ wants to read a line from a buffer, but fails, signalling
this to the monitor:

```python
async def readline(m, buffer):
    l = buffer.readline()
    while not l.endswith("\n"):
        await m.oob(None)  # ask for more data in the buffer
        l += buffer.readline()
    return l


async def manager(buffer, io):
    m = Monitor()
    a = m(readline(m, buffer))
    while True:
        try:
            return await a
        except OOBData:
            try:
                buffer.fill(await io.read())
            except Exception as exc:
                await a.athrow(exc)
```

In this example, `readline()` is trivial, but if it were a stateful parser with hierarchical
invocation structure, then this pattern allows the decoupling of IO and the parsing of buffered data, maintaining the state of the parser while _the caller_ fills up the buffer.

Any IO exception is sent to the coroutine in this example.  This ensures that it cleans
up properly.  Alternatively, `aclose()` could have been used:

```python
m = Monitor()
with aclosing(m(readline(m, buffer))) as a:
    # the aclosing context manager ensures that the coroutine is closed
    # with `await a.aclose()`
    # even if we don't finish running it.
    ...
```

A standalone parser can also be simply implemented by two helper methods, `start()` and
`try_await()`.

```python
async def stateful_parser(monitor, input_data):
    while input_short(input_data):
        input_data += await monitor.oob()  # request more
    # continue parsing, maybe requesting more data
    return await parsed_data(monitor, input_data)


m: Monitor[Tuple[Any, bytes]] = Monitor()
initial_data = b""
p = m(stateful_parser(m, b""))
await p.start()  # set the parser running, calling oob()

# feed data until a value is returned
while True:
    parsed = await p.try_await(await get_more_data())
    if parsed is not None:
        break
```

This pattern can even be employed in non-async applications, by using
the `await_sync()` method instead of the `await` keyword to drive the `Monitor`.

For a more complete example, have a look at [example_resp.py](examples/example_resp.py)

#### GeneratorObject

A `GeneratorObject` builds on top of the `Monitor` to create an `AsyncGenerator`.  It is in many ways
similar to an _asynchronous generator_ constructed using the _generator function_ syntax.
But whereas those return values using the `yield` _keyword_,
a GeneratorObject has an `ayield()` _method_, which means that data can be sent to the generator
by anyone, and not just by using `yield`, which makes composing such generators much simpler.

The `GeneratorObject` leverages the `Monitor.oob()` method to deliver the _ayielded_ data to whomever is iterating over it:

```python
async def generator(gen_obj):
    # yield directly to the generator
    await gen_obj.ayield(1)
    # have someone else yield to it
    async def helper():
        await gen_obj.ayield(2)

    await asyncio.create_task(helper())


async def runner():
    gen_obj = GeneratorObject()
    values = [val async for val in gen_obj(generator(gen_obj))]
    assert values == [1, 2]
```

The `GeneratorObject`, when called, returns a `GeneratorObjectIterator` which behaves in
the same way as an `AsyncGenerator` object.  It can be iterated over and supports the
`asend()`, `athrow()` and `aclose()` methods.

A `GeneratorObject` is a flexible way to asynchronously generate results without
resorting to `Task` and `Queue` objects.  What is more, it allows this sort
of generating pattern to be used in non-async programs, via `aiter_sync()`:

```python
def sync_runner():
    gen_obj = GeneratorObject()
    values = [val for val in aiter_sync(gen_obj(generator(gen_obj)))]
    assert values == [1, 2]
```

## Scheduling tools

A set of functions are provided to perform advanced scheduling of `Task` objects
with `asyncio`.  They work with the built-in event loop, and also with any event loop
implementing the `AbstractSchedulingLoop` abstract base class, such as the `SchedulingMixin`
class which can be used to extend the built-in event loops.

### Scheduling functions

#### `sleep_insert(pos)`

Similar to `asyncio.sleep()` but sleeps only for `pos` places in the runnable queue.
Whereas `asyncio.sleep(0)` will place the executing task at the end of the queue, which is
appropriate for fair scheduling, in some advanced cases you want to wake up sooner than that, perhaps
after a specific task.

#### `task_reinsert(task, pos)`

Takes a _runnable_ task (for example just created with `asyncio.create_task()` or similar) and
reinserts it at a given position in the queue.
Similarly as for `sleep_insert()`, this can be useful to achieve
certain scheduling goals.

#### `task_switch(task, *, insert_pos=None)`

Immediately moves the given task to the head of the ready queue and switches to it, assuming it is runnable.
If `insert_pos is not None`, the current task will be
put to sleep at that position, using `sleep_insert()`. Otherwise the current task is put at the end
of the ready queue.  If `insert_pos == 1` the current task will be inserted directly after the target
task, making it the next to be run.  If `insert_pos == 0`, the current task will execute _before_ the target.

#### `task_is_blocked(task)`

Returns True if the task is waiting for some awaitable, such as a Future or another Task, and is thus not
on the ready queue.

#### `task_is_runnable(task)`

Roughly the opposite of `task_is_blocked()`, returns True if the task is neither `done()` nor __blocked__ and
awaits execution.

#### `create_task_descend(coro)`

Implements depth-first task scheduling.

Similar to `asyncio.create_task()` this creates a task but starts it running right away, and positions the caller to be woken
up right after it blocks. The effect is similar to using `asynkit.eager()` but
it achieves its goals solely by modifying the runnable queue. A `Task` is always
created, unlike `eager`, which only creates a task if the target blocks.

### Runnable task helpers

A few functions are added to help working with tasks.

The following identity applies:

```python
asyncio.all_tasks() = (
    asynkit.runnable_tasks() | asynkit.blocked_tasks() | asyncio.current_task()
)
```

#### `runnable_tasks(loop=None)`

Returns a set of the tasks that are currently runnable in the given loop

#### `blocked_tasks(loop=None)`

Returns a set of the tasks that are currently blocked on some future in the given loop.

### Event Loop tools

Also provided is a mixin for the built-in event loop implementations in python, providing some primitives for advanced scheduling of tasks.  These primitives are what is used by the
scheduling functions above, and so custom event loop implementations can provide custom
implementations of these methods.

#### `SchedulingMixin` mixin class

This class adds some handy scheduling functions to the event loop. The are intended
to facilitate some scheduling tricks, particularly switching to tasks, which require
finding items in the queue and re-inserting them at an _early_ position.  Nothing
is assumed about the underlying implementation of the queue.

- `queue_len()` - returns the length of the ready queue
- `queue_find(self, key, remove)` - finds and optionally removes an element in the queue
- `queue_insert_pos(self, pos, element)` - inserts an element at position `pos` the queue
- `call_pos(self, pos, ...)` - schedules a callback at position `pos` in the queue

#### Concrete event loop classes

Concrete subclasses of Python's built-in event loop classes are provided.

- `SchedulingSelectorEventLoop` is a subclass of `asyncio.SelectorEventLoop` with the `SchedulingMixin`
- `SchedulingProactorEventLoop` is a subclass of `asyncio.ProactorEventLoop` with the `SchedulingMixin` on those platforms that support it.

#### Event Loop Policy

A policy class is provided to automatically create the appropriate event loops.

- `SchedulingEventLoopPolicy` is a subclass of `asyncio.DefaultEventLoopPolicy` which instantiates either of the above event loop classes as appropriate.

Use this either directly:

```python
asyncio.set_event_loop_policy(asynkit.SchedulingEventLoopPolicy())
asyncio.run(myprogram())
```

or with a context manager:

```python
with asynkit.event_loop_policy():
    asyncio.run(myprogram())
```

## Priority Scheduling

### FIFO scheduling

Since the beginning, _scheduling_ of Tasks in `asyncio` has always been _FIFO_, meaning "first-in, first-out".  This is a design principle which provides a certain _fairness_ to tasks, ensuring that all tasks run and a certain predictability is achieved with execution.  FIFO is maintained in the following places:

- In the _Event Loop_, where tasks are executed in the order in which they become _runnable_
- In locking primitives (such as `asyncio.Lock` or `asyncio.Condition`) where tasks are able to _acquire_ the lock or get notified in the order
  in which they arrive.

All tasks are treated equally.

### The `asynkit.experimental.priority` module

- __Note__:  This is currently an __experimental__ feature.

In pre-emptive system, such as scheduling of `threads` or `processes` there is usually some sort of `priority` involved too,
to allow designating some tasks as more important than others, thus requiring more rapid servicing, and others as having
lower priority and thus be relegated to background tasks where other more important work is not pending.

The `asynkit.experimental.priority` module now allows us to do something similar.

You can define the `priority` of Task objects.  A task defining the `effective_priority()` method returning
a `float` will get priority treatment in the following areas:

- When awaiting a `PriorityLock` or `PriorityCondition`
- When waiting in to be executed by a `PrioritySelectorEventLoop` or a `PriorityProactorEventLoop`.

The floating point _priority value_ returned by `effective_priority()` is used to determine the task's priority, with _lower
values_ giving _higher priority_ (in the same way that low values are _sorted before_ high values).
If this method is missing, the default priority of `0.0` is assumed.  The `Priority` enum class can be
used for some basic priority values, defining `Priority.HIGH` as -10.0 and `Priority.LOW` as 10.0.
In case of identical priority values, FIFO order is respected.

The locking primitives provided are fully compatible with the standard
locks in `asyncio` and also fully support the experimental [task interruption](#task-interruption) feature.

#### `PriorityTask`

This is an `asyncio.Task` subclass which implements the `effective_priority()` method.  It can be constructed with a `priority` keyword
or a `priority_value` attribute.  It also participates in [Priority Inheritance](#priority-inheritance).

#### `PriorityLock`

This is a `asyncio.Lock` subclass which respects the priorities of any `Task` objects attempting to acquire it.  It also participates in [Priority Inheritance](#priority-inheritance).

#### `PriorityCondition`

This is an `asyncio.Condition` subclass which respects the priorities of any `Task` objects awaiting to be woken up.  Its default
`lock` is of type `PriorityLock`.

#### `DefaultPriorityEventLoop`

This is an `asyncio.AbstractEventLoop` subclass which respects the priorities of any Task objects waiting to be executed.  It also
provides all the scheduling extensions from `AbstractSchedulingLoop`.  It also participates in [Priority Inheritance](#priority-inheritance).

This is either a `PrioritySelectorEventLoop` or a `PriorityProactorEventLoop`, both instances of the `PrioritySchedulingMixin` class.

### Priority Inversion

A well known problem with priority scheduling is the so-called [Priority Inversion](https://en.wikipedia.org/wiki/Priority_inversion)
problem.  This implementation addresses that by two different means:

#### Priority Inheritance

A `PriorityTask` keeps track of all the `PriorityLock` objects it has acquired, and a `PriorityLock` keeps track of all the `asyncio.Task` objects waiting to acquire it.  A `PriorityTask`'s `effective_priority()` method will be the highest _effective_priority_ of any
task waiting to acquire a lock held by it.  Thus, a high priority-task which starts waiting for a lock which is held by a
low-priority task, will temporarily _propagate_ its priority to that task, so that ultimately, the `PrioritySchedulingMixin` event
loop with ensure that the previously low-priority task is now executed with the higher priority.

This mechanism requires the co-operation of both the tasks, locks and the event-loop to properly function.

#### Priority Boosting

The `PrioritySchedulingMixin` will regularly do "queue maintenance" and will identify Tasks that have sat around in the queue for
many cycles without being executed.  It will randomly "boost" the priority of these tasks in the queue, so that they have a chance
to run.

This mechanism does not require the co-operation of locks and tasks to work, and is in place as a safety mechanism in applications
where it is not feasible to replace all instances of `Lock`s and `Task`s with their _priority_inheritance_-aware counterparts.

### How to use Priority Scheduling

To make use of Priority scheduling, you need to use either the priority scheduling event loop (e.g.
`DefaultPriorityEventLoop`) or a priority-aware synchronization primitive, i.e. `PriorityLock` or `PriorityCondition`.  In addition, you need `Task` objects which support the `effective_priority()`
method, such as `PriorityTask`

It is possible to get priority behaviour from locks without having a priority event loop, and
vice versa.  But when using the priority event loop, it is recommended to use the accompanying
lock and task classes which co-operate to provide _priority inheritance_.

A good first step, in your application, is to identify tasks
that perform background work, such as housekeeping tasks, and assign to them the `Priority.LOW` priority.

Subsequently you may want to identify areas of your application that require more attention than others.  For a web-application's URL handler may elect to temporarily raise the priority (change `PriorityTask.priority_value`) for certain endpoints to give them better response.

This is new territory and it remains to be seen how having priority scheduling in a `co-operative`` environment such as `asyncio` actually works in practice.

## Coroutine helpers

A couple of functions are provided to introspect the state of coroutine objects. They
work on both regular __async__ coroutines, __classic__ coroutines (using `yield from`) and
__async generators__.

- `coro_is_new(coro)` -
  Returns true if the object has just been created and hasn't started executing yet

- `coro_is_suspended(coro)` - Returns true if the object is in a suspended state.

- `coro_is_done(coro)` - Returns true if the object has finished executing, e.g. by returning or raising an exception.

- `coro_get_frame(coro)` - Returns the current frame object of the coroutine, if it has one, or `None`.

## `anyio` support

The library has been tested to work with the `anyio`.  However, not everything is supported on the `trio` backend.
Currently only the `asyncio` backend can be assumed to work reliably.

When using the asyncio backend, the module `asynkit.experimental.anyio` can be used, to provide "eager"-like
behaviour to task creation.  It will return an `EagerTaskGroup` context manager:

```python
from asynkit.experimental.anyio import create_eager_task_group
from anyio import run, sleep


async def func(task_status):
    print("hello")
    task_status.started("world")
    await sleep(0.01)
    print("goodbye")


async def main():

    async with create_eager_task_group() as tg:
        start = tg.start(func)
        print("fine")
        print(await start)
    print("world")


run(main, backend="asyncio")
```

This will result in the following output:

```bash
hello
fine
world
goodbye
world
```

The first part of the function `func` is run even before calling `await` on the result from `EagerTaskGroup.start()`

Similarly, `EagerTaskGroup.start_soon()` will run the provided coroutine up to its first blocking point before
returning.

### `trio` limitations

`trio` differs significantly from `asyncio` and therefore enjoys only limited support.

- The event loop is completely different and proprietary and so the event loop extensions don't work
  for `trio`.

- `CoroStart` when used with `Task` objects, such as by using `EagerTaskGroup`,
  does not work reliably with `trio`.
  This is because the synchronization primitives
  are not based on `Future` objects but rather perform `Task`-based actions both before going to sleep
  and upon waking up.  If a `CoroStart` initially blocks on a primitive such as `Event.wait()` or
  `sleep(x)` it will be surprised and throw an error when it wakes up on in a different
  `Task` than when it was in when it fell asleep.

`CoroStart` works by intercepting a `Future` being passed up via the `await` protocol to
the event loop to perform the task scheduling.  If any part of the task scheduling has happened
before this, and the _continuation_ happens on a different `Task` then things may break
in various ways.   For `asyncio`, the event loop never sees the `Future` object until
`as_coroutine()` has been called and awaited, and so if this happens in a new task, all is good.

## Experimental features

Some features are currently available experimentally.  They may work only on some platforms or be experimental in nature, not stable or mature enough to be officially part of the library

### Task Interruption

Methods are provided to raise exceptions on a `Task`.  This is somewhat similar to
`task.cancel()` but different:

- The caller specifies the exception instance to be raised on the task.
- The target task made to run immediately, precluding interference with other operations.
- The exception does not propagate into awaited objects.  In particular, if the task
  is _awaiting_ another task, the wait is interrupted, but that other task is not otherwise
  affected.
  
A task which is blocked, waiting for a future, is immediately freed and scheduled to run.
If the task is already scheduled to run, i.e. it is _new_, or the future has triggered but
the task hasn't become active yet, it is still awoken with an exception.

Please note the following cases:

1. The Python library in places assumes that the only exception that can be
   raised out of awaitables is `CancelledError`.  In particular, there are edge cases
   in `asyncio.Lock`, `asyncio.Semaphore` and `asyncio.Condition` where raising something
   else when acquiring these primitives will leave them in an incorrect state.

   Therefore, we provide a base class, `InterruptError`, deriving from `CancelledError` which
   should be used for interrupts in general.

   However, currently `asyncio.Condition` will not correctly pass on such a subclass
   for `wait()` in all cases, so  a safer version, `InterruptCondition` is provided.

2. Even subclasses of `CancelledError` will be converted to a new `CancelledError`
   instance when not handled in a task, and awaited.

3. These functions currently are only work __reliably__ with `Task` object implemented in Python.
   Modern implementation often have a native "C" implementation of `Task` objects and they contain inaccessible code which cannot be used by the library.  In particular, the
   `Task.__step` method cannot be explicitly scheduled to the event loop.  For that reason,
   a special `create_pytask()` helper is provided to create a suitable python `Task` instance.
4. __However:__ This library does go through extra hoops to make it usable with C Tasks.
   It almost works, but with two caveats:

   - CTasks which have plain `TaskStepMethWrapper` callbacks scheduled cannot be interrupted.
    These are typically tasks executing `await asyncio.sleep(0)` or freshly created
    tasks that haven't started executing.
   - The CTask's `_fut_waiting` member _cannot_ be cleared from our code, so there exists a time
    where it can point to a valid, not-done, Future, even though the Task is about
    to wake up.  This will make methods such as `task_is_blocked()` return incorrect
    values.  It __will__ get cleared when the interrupted task starts executing, however. All the more reason to use `task_interrupt()` over `task_throw()` since
    the former allows no space for code to see the task in such an intermediate state.

#### `task_throw()`

```python
def task_throw(task: Task, exc: BaseException):
    pass
```

This method will make the target `Task` immediately runnable with the given exception
pending.

- If the Task was runnable due to a _previous_ call to `task_throw()`, this will override
  that call and its exception.

- Because of that, this method should probably not be used directly.  It is better to ensure that the
  target _takes delivery_ of the exception right away, because there is no way to
  queue pending exceptions and they do not add up in any meaningful way.
  Prefer to use `task_interrupt()` below.

- This method will __fail__ if the target task has a pending _cancellation_, that is,
  it is in the process of waking up with a pending `CancelledError`.  Cancellation is
  currently asynchronous, while throwing exceptions is intended to be synchronous.

#### `task_interrupt()`

```python
async def task_interrupt(task: Task, exc: BaseException):
    pass
```

An `async` version of `task_throw()`.  When awaited, `task_interrupt()` is called,
followed by a `task_switch()` to the target.  Once awaited, the exception
**has been raised** on the target task.

By ensuring that the target task runs immediately, it is possible to reason about task
execution without having to rely on external synchronization primitives and the cooperation
of the target task.  An interrupt is never _pending_ on the task (as a _cancellation_ can
be) and therefore it cannot cause collisions with other interrupts.

```python
async def test():
    async def task():
        await asyncio.sleep(1)

    create_pytask(task)
    await asyncio.sleep(0)
    assert task_is_blocked(task)
    await task_interrupt(task, InterruptException())
    assert task.done()  # the error has already been raised.
    try:
        await task
    except CancelledError:  # original error is substituted
        pass
    else:
        assert False, "never happens"
```

#### `create_pytask()`

Similar to `asyncio.create_task()` but will create a pure __Python__ `Task` which can safely
be used as the target for `task_throw()`and `task_interrupt()`.  Because of implementation
issues, regular __C__ `Task` objects, as returned by `asyncio.create_task()`, cannot
be interrupted in all cases, in particular when doing an `await asyncio.sleep(0)` or
directly after having been created.

### `task_timeout()`

This is a context manager providing a timeout functionality, similar to `asyncio.timeout()`.
By leveraging `task_throw()` and a custom `BaseException` subclass, `TimeoutInterrupt`,
the logic becomes very simple and there is no unintended interaction with regular
task cancellation().


            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/kristjanvalur/py-asynkit",
    "name": "asynkit",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.8,<4.0",
    "maintainer_email": "",
    "keywords": "asyncio,eventloop",
    "author": "Kristj\u00e1n Valur J\u00f3nsson",
    "author_email": "sweskman@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/26/ce/ed883d333f4bafe58d85f3a3abc9fc6dc29a2fc51621103847664361b30f/asynkit-0.12.0.tar.gz",
    "platform": null,
    "description": "# asynkit: A toolkit for Python coroutines\n\n[![CI](https://github.com/kristjanvalur/py-asynkit/actions/workflows/ci.yml/badge.svg)](https://github.com/kristjanvalur/py-asynkit/actions/workflows/ci.yml)\n\nThis module provides some handy tools for those wishing to have better control over the\nway Python's `asyncio` module does things.\n\n- Helper tools for controlling coroutine execution, such as [`CoroStart`](#corostart) and [`Monitor`](#monitor)\n- Utility classes such as [`GeneratorObject`](#generatorobject)\n- Coroutine helpers such [`coro_iter()`](#coro_iter) and the [`awaitmethod()`](#awaitmethod) decorator\n- Helpers to run _async_ code from _non-async_ code, such as `await_sync()` and `aiter_sync()`\n- Scheduling helpers for `asyncio`, and extended event-loop implementations\n- _eager_ execution of Tasks\n- Exprerimental support for [Priority Scheduling](#priority-scheduling) of Tasks\n- Other experimental features such as [`task_interrupt()`](#task_interrupt)\n- Limited support for `anyio` and `trio`.\n\n## Installation\n\n```bash\npip install asynkit\n```\n\n## Coroutine Tools\n\n### `eager()` - lower latency IO\n\nDid you ever wish that your _coroutines_ started right away, and only returned control to\nthe caller once they become blocked?  Like the way the `async` and `await` keywords work in the __C#__ language?\n\nNow they can. Just decorate or convert them with `acynkit.eager`:\n\n```python\n@asynkit.eager\nasync def get_slow_remote_data():\n    result = await execute_remote_request()\n    return result.important_data\n\n\nasync def my_complex_thing():\n    # kick off the request as soon as possible\n    future = get_slow_remote_data()\n    # The remote execution may now already be in flight. Do some work taking time\n    intermediate_result = await some_local_computation()\n    # wait for the result of the request\n    return compute_result(intermediate_result, await future)\n```\n\nBy decorating your function with `eager`, the coroutine will start executing __right away__ and\ncontrol will return to the calling function as soon as it _suspends_, _returns_, or _raises_\nan exception. In case it is suspended, a _Task_ is created and returned, ready to resume\nexecution from that point.\n\nNotice how, in either case, control is returned __directly__ back to the\ncalling function, maintaining synchronous execution. In effect, conventional code\ncalling order is maintained as much as possible. We call this _depth-first-execution_.\n\nThis allows you to prepare and dispatch long running operations __as soon as possible__ while\nstill being able to asynchronously wait for the result.\n\n`asynkit.eager` can also be used directly on the returned coroutine:\n\n```python\nlog = []\n\n\nasync def test():\n    log.append(1)\n    await asyncio.sleep(0.2)  # some long IO\n    log.append(2)\n\n\nasync def caller(convert):\n    del log[:]\n    log.append(\"a\")\n    future = convert(test())\n    log.append(\"b\")\n    await asyncio.sleep(0.1)  # some other IO\n    log.append(\"c\")\n    await future\n\n\n# do nothing\nasyncio.run(caller(lambda c: c))\nassert log == [\"a\", \"b\", \"c\", 1, 2]\n\n# Create a Task\nasyncio.run(caller(asyncio.create_task))\nassert log == [\"a\", \"b\", 1, \"c\", 2]\n\n# eager\nasyncio.run(caller(asynkit.eager))\nassert log == [\"a\", 1, \"b\", \"c\", 2]\n```\n\n`eager()` is actually a convenience function, invoking either `coro_eager()` or `func_eager()` (see below) depending on context.\nDecorating your function makes sense if you __always__ intend\nTo _await_ its result at some later point. Otherwise, just apply it at the point\nof invocation in each such case.\n\n### `coro_eager()`, `func_eager()`\n\n`coro_eager()` is the magic coroutine wrapper providing the __eager__ behaviour:\n\n1. It copies the current _context_\n2. It initializes a `CoroStart()` object for the coroutine, starting it in the copied context.\n3. If it subsequently is `done()` It returns `CoroStart.as_future()`, otherwise\n   it creates and returns a `Task` (using `asyncio.create_task` by default.)\n\nThe result is an _awaitable_ which can be either directly awaited or passed\nto `asyncio.gather()`. The coroutine is executed in its own copy of the current context,\njust as would happen if it were directly turned into a `Task`.\n\n`func_eager()` is a decorator which automatically applies `coro_eager()` to the coroutine returned by an async function.\n\n### `await_sync(), aiter_sync()` - Running coroutines synchronously\n\nIf you are writing code which should work both synchronously and asynchronously,\nyou can now write the code fully _async_ and then run it _synchronously_ in the absence\nof an event loop.  As long as the code doesn't _block_ (await unfinished _futures_) and doesn't try to access the event loop, it can successfully be executed.  This helps avoid writing duplicate code.\n\n```python\nasync def async_get_processed_data(datagetter):\n    data = datagetter()  # an optionally async callback\n    data = await data if isawaitable(data) else data\n    return process_data(data)\n\n\n# raises SynchronousError if datagetter blocks\ndef sync_get_processed_data(datagetter):\n    return asynkit.await_sync(async_get_processed_data(datagetter))\n```\n\nThis sort of code might previously have been written thus:\n\n```python\n# A hybrid function, _may_ return an _awaitable_\ndef hybrid_get_processed_data(datagetter):\n    data = datagetter()\n    if isawaitable(data):\n        # return an awaitable helper closure\n        async def helper():\n            data = await data\n            return process_data(data)\n\n        return helper\n    return process_data(data)  # duplication\n\n\nasync def async_get_processed_data(datagetter):\n    r = hybrid_get_processed_data(datagetter)\n    return await r if isawaitable(r) else r\n\n\ndef sync_get_processed_data(datagetter):\n    r = hybrid_get_processed_data(datagetter)\n    if isawaitable(r):\n        raise RuntimeError(\"callbacks failed to run synchronously\")\n    return r\n```\n\nThe above pattern, writing async methods as sync and returning async helpers,\nis common in library code which needs to work both in synchronous and asynchronous\ncontext.  Needless to say, it is very convoluted, hard to debug and contains a lot\nof code duplication where the same logic is repeated inside async helper closures.\n\nUsing `await_sync()` it is possible to write the entire logic as `async` methods and\nthen simply fail if the code tries to invoke any truly async operations.\nIf the invoked coroutine blocks, a `SynchronousError` is raised _from_ a `SynchronousAbort` exception which\ncontains a traceback.  This makes it easy to pinpoint the location in the code where the\nasync code blocked.  If the code tries to access the event loop, e.g. by creating a `Task`, a `RuntimeError` will be raised.  \n\nThe `syncfunction()` decorator can be used to automatically wrap an async function\nso that it is executed using `await_sync()`:\n\n```pycon\n>>> @asynkit.syncfunction\n... async def sync_function():\n...     async def async_function():\n...         return \"look, no async!\"\n...     return await async_function()\n...\n>>> sync_function()\n'look, no async!'\n>>>\n```\n\nthe `asyncfunction()` utility can be used when passing synchronous callbacks to async\ncode, to make them async.  This, along with `syncfunction()` and `await_sync()`,\ncan be used to integrate synchronous code with async middleware:\n\n```python\n@asynkit.syncfunction\nasync def sync_client(sync_callback):\n    middleware = AsyncMiddleware(asynkit.asyncfunction(sync_callback))\n    return await middleware.run()\n```\n\nUsing this pattern, one can write the middleware completely async, make it also work\nfor synchronous code, while avoiding the hybrid function _antipattern._\n\n#### `aiter_sync()`\n\nA helper function is provided, which turns an `AsyncIterable` into\na generator, leveraging the `await_sync()` method:\n\n```python\nasync def agen():\n    for v in range(3):\n        yield v\n\n\nassert list(aiter_sync(agen())) == [1, 2, 3]\n```\n\nThis is useful if using patterns such as `GeneratorObject` in a synchronous\napplication.\n\n### `CoroStart`\n\nThis class manages the state of a partially run coroutine and is what what powers the `coro_eager()` and `await_sync()` functions.\nWhen initialized, it will _start_ the coroutine, running it until it either suspends, returns, or raises\nan exception.  It can subsequently be _awaited_ to retrieve the result.\n\nSimilarly to a `Future`, it has these methods:\n\n- `done()` - returns `True` if the coroutine finished without blocking. In this case, the following two methods may be called to get the result.\n- `result()` - Returns the _return value_ of the coroutine or __raises__ any _exception_ that it produced.\n- `exception()` - Returns any _exception_ raised, or `None` otherwise.\n\n But more importantly it has these:\n\n- `__await__()` - A magic method making it directly _awaitable_. If it has already finished, awaiting this coroutine is the same as calling `result()`, otherwise it awaits the original coroutine's continued execution\n- `as_coroutine()` - A helper which returns a proper _coroutine_ object to await the `CoroStart`\n- `as_future()` - If `done()`, returns a `Future` holding its result, otherwise, a `RuntimeError`\n  is raised.\n- `as_awaitable()` - If `done()`, returns `as_future()`, else returns `self`.\n  This is a convenience method for use with functions such as `asyncio.gather()`, which would otherwise wrap a completed coroutine in a `Task`.\n\nIn addition it has:\n\n- `aclose()` - If `not done()`, will throw a `GeneratorError` into the coroutine and wait for it to finish.  Otherwise does nothing.\n- `athrow(exc)` - If `not done()`, will throw the given error into the coroutine and wait for it to raise or return a value.\n- `close()` and `throw(exc)` - Synchronous versions of the above, will raise `RuntimeError` if the coroutine does not immediately exit.\n\nThis means that a context manager such as `aclosing()` can be used to ensure\nthat the coroutine is cleaned up in case of errors before it is awaited:\n\n```python\n# start foo() and run until it blocks\nasync with aclosing(CoroStart(foo())) as coro:\n    ...  # do things, which may result in an error\n    return await coro\n```\n\nCoroStart can be provided with a `contextvars.Context` object, in which case the coroutine will be run using that\ncontext.\n\n### Context helper\n\n`coro_await()` is a helper function to await a coroutine, optionally with a `contextvars.Context`\nobject to activate:\n\n```python\nvar1 = contextvars.ContextVar(\"myvar\")\n\n\nasync def my_method():\n    var1.set(\"foo\")\n\n\nasync def main():\n    context = contextvars.copy_context()\n    var1.set(\"bar\")\n    await asynkit.coro_await(my_method(), context=context)\n    # the coroutine didn't modify _our_ context\n    assert var1.get() == \"bar\"\n    # ... but it did modify the copied context\n    assert context.get(var1) == \"foo\"\n```\n\nThis is similar to `contextvars.Context.run()` but works for async functions.  This function is\nimplemented using [`CoroStart`](#corostart)\n\n### `awaitmethod()`\n\nThis decorator turns the decorated method into a `Generator` as required for\n`__await__` methods, which must only return `Iterator` objects.\nIt does so by invoking the `coro_iter()` helper.\n\nThis makes it simple to make a class instance _awaitable_ by decorating an `async`\n`__await__()` method.\n\n```python\nclass Awaitable:\n    def __init__(self, cofunc):\n        self.cofunc = cofunc\n        self.count = 0\n\n    @asynkit.awaitmethod\n    async def __await__(self):\n        await self.cofunc()\n        return self.count\n        self.count += 1\n\n\nasync def main():\n    async def sleeper():\n        await asyncio.sleep(1)\n\n    a = Awaitable(sleeper)\n    assert (await a) == 0  # sleep once\n    assert (await a) == 1  # sleep again\n\n\nasyncio.run(main())\n```\n\nUnlike a regular _coroutine_ (the result of calling a _coroutine function_), an object with an `__await__` method can potentially be awaited multiple times.\n\nThe method can also be a `classmethod` or `staticmethod:`\n\n```python\nclass Constructor:\n    @staticmethod\n    @asynkit.awaitmethod\n    async def __await__():\n        await asyncio.sleep(0)\n        return Constructor()\n\n\nasync def construct():\n    return await Constructor\n```\n\n### `awaitmethod_iter()`\n\nAn alternative way of creating an __await__ method, it uses\nthe `coro_iter()` method to to create a coroutine iterator.  It\nis provided for completeness.\n\n### `coro_iter()`\n\nThis helper function turns a coroutine function into an iterator.  It is primarily\nintended to be used by the [`awaitmethod_iter()`](#awaitmethod_iter) function decorator.\n\n### Monitors and Generators\n\n#### Monitor\n\nA `Monitor` object can be used to await a coroutine, while listening for _out of band_ messages\nfrom the coroutine.  As the coroutine sends messages, it is suspended, until the caller resumes\nawaiting for it.\n\n```python\nasync def coro(monitor):\n    await monitor.oob(\"hello\")\n    await asyncio.sleep(0)\n    await monitor.oob(\"dolly\")\n    return \"done\"\n\n\nasync def runner():\n    m = Monitor()\n    c = coro(m)\n    while True:\n        try:\n            print(await m.aawait(c))\n            break\n        except OOBData as oob:\n            print(oob.data)\n```\n\nwhich will result in the output\n\n```bash\nhello\ndolly\ndone\n```\n\nFor convenience, the `Monitor` can be _bound_ so that the caller does not have\nto keep the coroutine around.  Calling the monitor with the coroutine returns a `BoundMonitor`:\n\n```python\nasync def coro(m):\n    await m.oob(\"foo\")\n    return \"bar\"\n\n\nm = Monitor()\nb = m(coro(m))\ntry:\n    await b\nexcept OOBData as oob:\n    assert oob.data == \"foo\"\nassert await b == \"bar\"\n```\n\nNotice how the `BoundMonitor` can be _awaited_ directly, which is the same as awaiting\n`b.aawait(None)`.\n\nThe caller can pass in _data_ to the coroutine via the `aawait(data=None)` method and\nit will become the _return value_ of the `Monitor.oob()` call in the coroutine.\n`Monitor.athrow()` can similarly be used to raise an exception out of the `Montitor.oob()` call.\nNeither data nor an exception can be sent the first time the coroutine is awaited,\nonly as a response to a previous `OOBData` exception.\n\nA `Monitor` can be used when a coroutine wants to suspend itself, maybe waiting for some external\ncondition, without resorting to the relatively heavy mechanism of creating, managing and synchronizing\n`Task` objects.  This can be useful if the coroutine needs to maintain state.  Additionally,\nthis kind of messaging does not require an _event loop_ to be present and can can be driven\nusing `await_sync()` (see below.)\n\nConsider the following scenario. A _parser_ wants to read a line from a buffer, but fails, signalling\nthis to the monitor:\n\n```python\nasync def readline(m, buffer):\n    l = buffer.readline()\n    while not l.endswith(\"\\n\"):\n        await m.oob(None)  # ask for more data in the buffer\n        l += buffer.readline()\n    return l\n\n\nasync def manager(buffer, io):\n    m = Monitor()\n    a = m(readline(m, buffer))\n    while True:\n        try:\n            return await a\n        except OOBData:\n            try:\n                buffer.fill(await io.read())\n            except Exception as exc:\n                await a.athrow(exc)\n```\n\nIn this example, `readline()` is trivial, but if it were a stateful parser with hierarchical\ninvocation structure, then this pattern allows the decoupling of IO and the parsing of buffered data, maintaining the state of the parser while _the caller_ fills up the buffer.\n\nAny IO exception is sent to the coroutine in this example.  This ensures that it cleans\nup properly.  Alternatively, `aclose()` could have been used:\n\n```python\nm = Monitor()\nwith aclosing(m(readline(m, buffer))) as a:\n    # the aclosing context manager ensures that the coroutine is closed\n    # with `await a.aclose()`\n    # even if we don't finish running it.\n    ...\n```\n\nA standalone parser can also be simply implemented by two helper methods, `start()` and\n`try_await()`.\n\n```python\nasync def stateful_parser(monitor, input_data):\n    while input_short(input_data):\n        input_data += await monitor.oob()  # request more\n    # continue parsing, maybe requesting more data\n    return await parsed_data(monitor, input_data)\n\n\nm: Monitor[Tuple[Any, bytes]] = Monitor()\ninitial_data = b\"\"\np = m(stateful_parser(m, b\"\"))\nawait p.start()  # set the parser running, calling oob()\n\n# feed data until a value is returned\nwhile True:\n    parsed = await p.try_await(await get_more_data())\n    if parsed is not None:\n        break\n```\n\nThis pattern can even be employed in non-async applications, by using\nthe `await_sync()` method instead of the `await` keyword to drive the `Monitor`.\n\nFor a more complete example, have a look at [example_resp.py](examples/example_resp.py)\n\n#### GeneratorObject\n\nA `GeneratorObject` builds on top of the `Monitor` to create an `AsyncGenerator`.  It is in many ways\nsimilar to an _asynchronous generator_ constructed using the _generator function_ syntax.\nBut whereas those return values using the `yield` _keyword_,\na GeneratorObject has an `ayield()` _method_, which means that data can be sent to the generator\nby anyone, and not just by using `yield`, which makes composing such generators much simpler.\n\nThe `GeneratorObject` leverages the `Monitor.oob()` method to deliver the _ayielded_ data to whomever is iterating over it:\n\n```python\nasync def generator(gen_obj):\n    # yield directly to the generator\n    await gen_obj.ayield(1)\n    # have someone else yield to it\n    async def helper():\n        await gen_obj.ayield(2)\n\n    await asyncio.create_task(helper())\n\n\nasync def runner():\n    gen_obj = GeneratorObject()\n    values = [val async for val in gen_obj(generator(gen_obj))]\n    assert values == [1, 2]\n```\n\nThe `GeneratorObject`, when called, returns a `GeneratorObjectIterator` which behaves in\nthe same way as an `AsyncGenerator` object.  It can be iterated over and supports the\n`asend()`, `athrow()` and `aclose()` methods.\n\nA `GeneratorObject` is a flexible way to asynchronously generate results without\nresorting to `Task` and `Queue` objects.  What is more, it allows this sort\nof generating pattern to be used in non-async programs, via `aiter_sync()`:\n\n```python\ndef sync_runner():\n    gen_obj = GeneratorObject()\n    values = [val for val in aiter_sync(gen_obj(generator(gen_obj)))]\n    assert values == [1, 2]\n```\n\n## Scheduling tools\n\nA set of functions are provided to perform advanced scheduling of `Task` objects\nwith `asyncio`.  They work with the built-in event loop, and also with any event loop\nimplementing the `AbstractSchedulingLoop` abstract base class, such as the `SchedulingMixin`\nclass which can be used to extend the built-in event loops.\n\n### Scheduling functions\n\n#### `sleep_insert(pos)`\n\nSimilar to `asyncio.sleep()` but sleeps only for `pos` places in the runnable queue.\nWhereas `asyncio.sleep(0)` will place the executing task at the end of the queue, which is\nappropriate for fair scheduling, in some advanced cases you want to wake up sooner than that, perhaps\nafter a specific task.\n\n#### `task_reinsert(task, pos)`\n\nTakes a _runnable_ task (for example just created with `asyncio.create_task()` or similar) and\nreinserts it at a given position in the queue.\nSimilarly as for `sleep_insert()`, this can be useful to achieve\ncertain scheduling goals.\n\n#### `task_switch(task, *, insert_pos=None)`\n\nImmediately moves the given task to the head of the ready queue and switches to it, assuming it is runnable.\nIf `insert_pos is not None`, the current task will be\nput to sleep at that position, using `sleep_insert()`. Otherwise the current task is put at the end\nof the ready queue.  If `insert_pos == 1` the current task will be inserted directly after the target\ntask, making it the next to be run.  If `insert_pos == 0`, the current task will execute _before_ the target.\n\n#### `task_is_blocked(task)`\n\nReturns True if the task is waiting for some awaitable, such as a Future or another Task, and is thus not\non the ready queue.\n\n#### `task_is_runnable(task)`\n\nRoughly the opposite of `task_is_blocked()`, returns True if the task is neither `done()` nor __blocked__ and\nawaits execution.\n\n#### `create_task_descend(coro)`\n\nImplements depth-first task scheduling.\n\nSimilar to `asyncio.create_task()` this creates a task but starts it running right away, and positions the caller to be woken\nup right after it blocks. The effect is similar to using `asynkit.eager()` but\nit achieves its goals solely by modifying the runnable queue. A `Task` is always\ncreated, unlike `eager`, which only creates a task if the target blocks.\n\n### Runnable task helpers\n\nA few functions are added to help working with tasks.\n\nThe following identity applies:\n\n```python\nasyncio.all_tasks() = (\n    asynkit.runnable_tasks() | asynkit.blocked_tasks() | asyncio.current_task()\n)\n```\n\n#### `runnable_tasks(loop=None)`\n\nReturns a set of the tasks that are currently runnable in the given loop\n\n#### `blocked_tasks(loop=None)`\n\nReturns a set of the tasks that are currently blocked on some future in the given loop.\n\n### Event Loop tools\n\nAlso provided is a mixin for the built-in event loop implementations in python, providing some primitives for advanced scheduling of tasks.  These primitives are what is used by the\nscheduling functions above, and so custom event loop implementations can provide custom\nimplementations of these methods.\n\n#### `SchedulingMixin` mixin class\n\nThis class adds some handy scheduling functions to the event loop. The are intended\nto facilitate some scheduling tricks, particularly switching to tasks, which require\nfinding items in the queue and re-inserting them at an _early_ position.  Nothing\nis assumed about the underlying implementation of the queue.\n\n- `queue_len()` - returns the length of the ready queue\n- `queue_find(self, key, remove)` - finds and optionally removes an element in the queue\n- `queue_insert_pos(self, pos, element)` - inserts an element at position `pos` the queue\n- `call_pos(self, pos, ...)` - schedules a callback at position `pos` in the queue\n\n#### Concrete event loop classes\n\nConcrete subclasses of Python's built-in event loop classes are provided.\n\n- `SchedulingSelectorEventLoop` is a subclass of `asyncio.SelectorEventLoop` with the `SchedulingMixin`\n- `SchedulingProactorEventLoop` is a subclass of `asyncio.ProactorEventLoop` with the `SchedulingMixin` on those platforms that support it.\n\n#### Event Loop Policy\n\nA policy class is provided to automatically create the appropriate event loops.\n\n- `SchedulingEventLoopPolicy` is a subclass of `asyncio.DefaultEventLoopPolicy` which instantiates either of the above event loop classes as appropriate.\n\nUse this either directly:\n\n```python\nasyncio.set_event_loop_policy(asynkit.SchedulingEventLoopPolicy())\nasyncio.run(myprogram())\n```\n\nor with a context manager:\n\n```python\nwith asynkit.event_loop_policy():\n    asyncio.run(myprogram())\n```\n\n## Priority Scheduling\n\n### FIFO scheduling\n\nSince the beginning, _scheduling_ of Tasks in `asyncio` has always been _FIFO_, meaning \"first-in, first-out\".  This is a design principle which provides a certain _fairness_ to tasks, ensuring that all tasks run and a certain predictability is achieved with execution.  FIFO is maintained in the following places:\n\n- In the _Event Loop_, where tasks are executed in the order in which they become _runnable_\n- In locking primitives (such as `asyncio.Lock` or `asyncio.Condition`) where tasks are able to _acquire_ the lock or get notified in the order\n  in which they arrive.\n\nAll tasks are treated equally.\n\n### The `asynkit.experimental.priority` module\n\n- __Note__:  This is currently an __experimental__ feature.\n\nIn pre-emptive system, such as scheduling of `threads` or `processes` there is usually some sort of `priority` involved too,\nto allow designating some tasks as more important than others, thus requiring more rapid servicing, and others as having\nlower priority and thus be relegated to background tasks where other more important work is not pending.\n\nThe `asynkit.experimental.priority` module now allows us to do something similar.\n\nYou can define the `priority` of Task objects.  A task defining the `effective_priority()` method returning\na `float` will get priority treatment in the following areas:\n\n- When awaiting a `PriorityLock` or `PriorityCondition`\n- When waiting in to be executed by a `PrioritySelectorEventLoop` or a `PriorityProactorEventLoop`.\n\nThe floating point _priority value_ returned by `effective_priority()` is used to determine the task's priority, with _lower\nvalues_ giving _higher priority_ (in the same way that low values are _sorted before_ high values).\nIf this method is missing, the default priority of `0.0` is assumed.  The `Priority` enum class can be\nused for some basic priority values, defining `Priority.HIGH` as -10.0 and `Priority.LOW` as 10.0.\nIn case of identical priority values, FIFO order is respected.\n\nThe locking primitives provided are fully compatible with the standard\nlocks in `asyncio` and also fully support the experimental [task interruption](#task-interruption) feature.\n\n#### `PriorityTask`\n\nThis is an `asyncio.Task` subclass which implements the `effective_priority()` method.  It can be constructed with a `priority` keyword\nor a `priority_value` attribute.  It also participates in [Priority Inheritance](#priority-inheritance).\n\n#### `PriorityLock`\n\nThis is a `asyncio.Lock` subclass which respects the priorities of any `Task` objects attempting to acquire it.  It also participates in [Priority Inheritance](#priority-inheritance).\n\n#### `PriorityCondition`\n\nThis is an `asyncio.Condition` subclass which respects the priorities of any `Task` objects awaiting to be woken up.  Its default\n`lock` is of type `PriorityLock`.\n\n#### `DefaultPriorityEventLoop`\n\nThis is an `asyncio.AbstractEventLoop` subclass which respects the priorities of any Task objects waiting to be executed.  It also\nprovides all the scheduling extensions from `AbstractSchedulingLoop`.  It also participates in [Priority Inheritance](#priority-inheritance).\n\nThis is either a `PrioritySelectorEventLoop` or a `PriorityProactorEventLoop`, both instances of the `PrioritySchedulingMixin` class.\n\n### Priority Inversion\n\nA well known problem with priority scheduling is the so-called [Priority Inversion](https://en.wikipedia.org/wiki/Priority_inversion)\nproblem.  This implementation addresses that by two different means:\n\n#### Priority Inheritance\n\nA `PriorityTask` keeps track of all the `PriorityLock` objects it has acquired, and a `PriorityLock` keeps track of all the `asyncio.Task` objects waiting to acquire it.  A `PriorityTask`'s `effective_priority()` method will be the highest _effective_priority_ of any\ntask waiting to acquire a lock held by it.  Thus, a high priority-task which starts waiting for a lock which is held by a\nlow-priority task, will temporarily _propagate_ its priority to that task, so that ultimately, the `PrioritySchedulingMixin` event\nloop with ensure that the previously low-priority task is now executed with the higher priority.\n\nThis mechanism requires the co-operation of both the tasks, locks and the event-loop to properly function.\n\n#### Priority Boosting\n\nThe `PrioritySchedulingMixin` will regularly do \"queue maintenance\" and will identify Tasks that have sat around in the queue for\nmany cycles without being executed.  It will randomly \"boost\" the priority of these tasks in the queue, so that they have a chance\nto run.\n\nThis mechanism does not require the co-operation of locks and tasks to work, and is in place as a safety mechanism in applications\nwhere it is not feasible to replace all instances of `Lock`s and `Task`s with their _priority_inheritance_-aware counterparts.\n\n### How to use Priority Scheduling\n\nTo make use of Priority scheduling, you need to use either the priority scheduling event loop (e.g.\n`DefaultPriorityEventLoop`) or a priority-aware synchronization primitive, i.e. `PriorityLock` or `PriorityCondition`.  In addition, you need `Task` objects which support the `effective_priority()`\nmethod, such as `PriorityTask`\n\nIt is possible to get priority behaviour from locks without having a priority event loop, and\nvice versa.  But when using the priority event loop, it is recommended to use the accompanying\nlock and task classes which co-operate to provide _priority inheritance_.\n\nA good first step, in your application, is to identify tasks\nthat perform background work, such as housekeeping tasks, and assign to them the `Priority.LOW` priority.\n\nSubsequently you may want to identify areas of your application that require more attention than others.  For a web-application's URL handler may elect to temporarily raise the priority (change `PriorityTask.priority_value`) for certain endpoints to give them better response.\n\nThis is new territory and it remains to be seen how having priority scheduling in a `co-operative`` environment such as `asyncio` actually works in practice.\n\n## Coroutine helpers\n\nA couple of functions are provided to introspect the state of coroutine objects. They\nwork on both regular __async__ coroutines, __classic__ coroutines (using `yield from`) and\n__async generators__.\n\n- `coro_is_new(coro)` -\n  Returns true if the object has just been created and hasn't started executing yet\n\n- `coro_is_suspended(coro)` - Returns true if the object is in a suspended state.\n\n- `coro_is_done(coro)` - Returns true if the object has finished executing, e.g. by returning or raising an exception.\n\n- `coro_get_frame(coro)` - Returns the current frame object of the coroutine, if it has one, or `None`.\n\n## `anyio` support\n\nThe library has been tested to work with the `anyio`.  However, not everything is supported on the `trio` backend.\nCurrently only the `asyncio` backend can be assumed to work reliably.\n\nWhen using the asyncio backend, the module `asynkit.experimental.anyio` can be used, to provide \"eager\"-like\nbehaviour to task creation.  It will return an `EagerTaskGroup` context manager:\n\n```python\nfrom asynkit.experimental.anyio import create_eager_task_group\nfrom anyio import run, sleep\n\n\nasync def func(task_status):\n    print(\"hello\")\n    task_status.started(\"world\")\n    await sleep(0.01)\n    print(\"goodbye\")\n\n\nasync def main():\n\n    async with create_eager_task_group() as tg:\n        start = tg.start(func)\n        print(\"fine\")\n        print(await start)\n    print(\"world\")\n\n\nrun(main, backend=\"asyncio\")\n```\n\nThis will result in the following output:\n\n```bash\nhello\nfine\nworld\ngoodbye\nworld\n```\n\nThe first part of the function `func` is run even before calling `await` on the result from `EagerTaskGroup.start()`\n\nSimilarly, `EagerTaskGroup.start_soon()` will run the provided coroutine up to its first blocking point before\nreturning.\n\n### `trio` limitations\n\n`trio` differs significantly from `asyncio` and therefore enjoys only limited support.\n\n- The event loop is completely different and proprietary and so the event loop extensions don't work\n  for `trio`.\n\n- `CoroStart` when used with `Task` objects, such as by using `EagerTaskGroup`,\n  does not work reliably with `trio`.\n  This is because the synchronization primitives\n  are not based on `Future` objects but rather perform `Task`-based actions both before going to sleep\n  and upon waking up.  If a `CoroStart` initially blocks on a primitive such as `Event.wait()` or\n  `sleep(x)` it will be surprised and throw an error when it wakes up on in a different\n  `Task` than when it was in when it fell asleep.\n\n`CoroStart` works by intercepting a `Future` being passed up via the `await` protocol to\nthe event loop to perform the task scheduling.  If any part of the task scheduling has happened\nbefore this, and the _continuation_ happens on a different `Task` then things may break\nin various ways.   For `asyncio`, the event loop never sees the `Future` object until\n`as_coroutine()` has been called and awaited, and so if this happens in a new task, all is good.\n\n## Experimental features\n\nSome features are currently available experimentally.  They may work only on some platforms or be experimental in nature, not stable or mature enough to be officially part of the library\n\n### Task Interruption\n\nMethods are provided to raise exceptions on a `Task`.  This is somewhat similar to\n`task.cancel()` but different:\n\n- The caller specifies the exception instance to be raised on the task.\n- The target task made to run immediately, precluding interference with other operations.\n- The exception does not propagate into awaited objects.  In particular, if the task\n  is _awaiting_ another task, the wait is interrupted, but that other task is not otherwise\n  affected.\n  \nA task which is blocked, waiting for a future, is immediately freed and scheduled to run.\nIf the task is already scheduled to run, i.e. it is _new_, or the future has triggered but\nthe task hasn't become active yet, it is still awoken with an exception.\n\nPlease note the following cases:\n\n1. The Python library in places assumes that the only exception that can be\n   raised out of awaitables is `CancelledError`.  In particular, there are edge cases\n   in `asyncio.Lock`, `asyncio.Semaphore` and `asyncio.Condition` where raising something\n   else when acquiring these primitives will leave them in an incorrect state.\n\n   Therefore, we provide a base class, `InterruptError`, deriving from `CancelledError` which\n   should be used for interrupts in general.\n\n   However, currently `asyncio.Condition` will not correctly pass on such a subclass\n   for `wait()` in all cases, so  a safer version, `InterruptCondition` is provided.\n\n2. Even subclasses of `CancelledError` will be converted to a new `CancelledError`\n   instance when not handled in a task, and awaited.\n\n3. These functions currently are only work __reliably__ with `Task` object implemented in Python.\n   Modern implementation often have a native \"C\" implementation of `Task` objects and they contain inaccessible code which cannot be used by the library.  In particular, the\n   `Task.__step` method cannot be explicitly scheduled to the event loop.  For that reason,\n   a special `create_pytask()` helper is provided to create a suitable python `Task` instance.\n4. __However:__ This library does go through extra hoops to make it usable with C Tasks.\n   It almost works, but with two caveats:\n\n   - CTasks which have plain `TaskStepMethWrapper` callbacks scheduled cannot be interrupted.\n    These are typically tasks executing `await asyncio.sleep(0)` or freshly created\n    tasks that haven't started executing.\n   - The CTask's `_fut_waiting` member _cannot_ be cleared from our code, so there exists a time\n    where it can point to a valid, not-done, Future, even though the Task is about\n    to wake up.  This will make methods such as `task_is_blocked()` return incorrect\n    values.  It __will__ get cleared when the interrupted task starts executing, however. All the more reason to use `task_interrupt()` over `task_throw()` since\n    the former allows no space for code to see the task in such an intermediate state.\n\n#### `task_throw()`\n\n```python\ndef task_throw(task: Task, exc: BaseException):\n    pass\n```\n\nThis method will make the target `Task` immediately runnable with the given exception\npending.\n\n- If the Task was runnable due to a _previous_ call to `task_throw()`, this will override\n  that call and its exception.\n\n- Because of that, this method should probably not be used directly.  It is better to ensure that the\n  target _takes delivery_ of the exception right away, because there is no way to\n  queue pending exceptions and they do not add up in any meaningful way.\n  Prefer to use `task_interrupt()` below.\n\n- This method will __fail__ if the target task has a pending _cancellation_, that is,\n  it is in the process of waking up with a pending `CancelledError`.  Cancellation is\n  currently asynchronous, while throwing exceptions is intended to be synchronous.\n\n#### `task_interrupt()`\n\n```python\nasync def task_interrupt(task: Task, exc: BaseException):\n    pass\n```\n\nAn `async` version of `task_throw()`.  When awaited, `task_interrupt()` is called,\nfollowed by a `task_switch()` to the target.  Once awaited, the exception\n**has been raised** on the target task.\n\nBy ensuring that the target task runs immediately, it is possible to reason about task\nexecution without having to rely on external synchronization primitives and the cooperation\nof the target task.  An interrupt is never _pending_ on the task (as a _cancellation_ can\nbe) and therefore it cannot cause collisions with other interrupts.\n\n```python\nasync def test():\n    async def task():\n        await asyncio.sleep(1)\n\n    create_pytask(task)\n    await asyncio.sleep(0)\n    assert task_is_blocked(task)\n    await task_interrupt(task, InterruptException())\n    assert task.done()  # the error has already been raised.\n    try:\n        await task\n    except CancelledError:  # original error is substituted\n        pass\n    else:\n        assert False, \"never happens\"\n```\n\n#### `create_pytask()`\n\nSimilar to `asyncio.create_task()` but will create a pure __Python__ `Task` which can safely\nbe used as the target for `task_throw()`and `task_interrupt()`.  Because of implementation\nissues, regular __C__ `Task` objects, as returned by `asyncio.create_task()`, cannot\nbe interrupted in all cases, in particular when doing an `await asyncio.sleep(0)` or\ndirectly after having been created.\n\n### `task_timeout()`\n\nThis is a context manager providing a timeout functionality, similar to `asyncio.timeout()`.\nBy leveraging `task_throw()` and a custom `BaseException` subclass, `TimeoutInterrupt`,\nthe logic becomes very simple and there is no unintended interaction with regular\ntask cancellation().\n\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "A toolkit for Python coroutines",
    "version": "0.12.0",
    "project_urls": {
        "Homepage": "https://github.com/kristjanvalur/py-asynkit",
        "Repository": "https://github.com/kristjanvalur/py-asynkit"
    },
    "split_keywords": [
        "asyncio",
        "eventloop"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "9d8074151c9672e11afda9c31a9b45ddc8a49c9c86bb8c93b3c920366389f112",
                "md5": "6730c4862213a7c24910964223f1c060",
                "sha256": "0be29c8f8baf01bf0a01f4ab7f95512f5e82f61480709b05e586d0a5b4cedba6"
            },
            "downloads": -1,
            "filename": "asynkit-0.12.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "6730c4862213a7c24910964223f1c060",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.8,<4.0",
            "size": 48296,
            "upload_time": "2023-12-27T13:02:12",
            "upload_time_iso_8601": "2023-12-27T13:02:12.330715Z",
            "url": "https://files.pythonhosted.org/packages/9d/80/74151c9672e11afda9c31a9b45ddc8a49c9c86bb8c93b3c920366389f112/asynkit-0.12.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "26ceed883d333f4bafe58d85f3a3abc9fc6dc29a2fc51621103847664361b30f",
                "md5": "4d235ad91b9af709df6b4b0fab17a253",
                "sha256": "35203f7018bc9cebb52ac8ec6cc78100fb0bb56f588130674243051915e8d99a"
            },
            "downloads": -1,
            "filename": "asynkit-0.12.0.tar.gz",
            "has_sig": false,
            "md5_digest": "4d235ad91b9af709df6b4b0fab17a253",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8,<4.0",
            "size": 53486,
            "upload_time": "2023-12-27T13:02:14",
            "upload_time_iso_8601": "2023-12-27T13:02:14.464905Z",
            "url": "https://files.pythonhosted.org/packages/26/ce/ed883d333f4bafe58d85f3a3abc9fc6dc29a2fc51621103847664361b30f/asynkit-0.12.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2023-12-27 13:02:14",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "kristjanvalur",
    "github_project": "py-asynkit",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "asynkit"
}
        
Elapsed time: 0.25094s