aiotools


Nameaiotools JSON
Version 2.2.3 PyPI version JSON
download
home_pageNone
SummaryIdiomatic asyncio utilities
upload_time2025-10-20 08:33:54
maintainerNone
docs_urlNone
authorNone
requires_python>=3.11
licenseNone
keywords asyncio
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            aiotools
========

[![PyPI release version](https://badge.fury.io/py/aiotools.svg)](https://pypi.org/project/aiotools/)
![Supported Python versions](https://img.shields.io/pypi/pyversions/aiotools.svg)
[![CI Status](https://github.com/achimnol/aiotools/actions/workflows/ci.yml/badge.svg)](https://github.com/achimnol/aiotools/actions/workflows/ci.yml)
[![Code Coverage](https://codecov.io/gh/achimnol/aiotools/branch/master/graph/badge.svg)](https://codecov.io/gh/achimnol/aiotools)

Idiomatic asyncio utilities


Modules
-------

* [Safe Cancellation](https://aiotools.readthedocs.io/en/latest/aiotools.cancel.html)
* [Async Context Manager](https://aiotools.readthedocs.io/en/latest/aiotools.context.html)
* [Async Defer](https://aiotools.readthedocs.io/en/latest/aiotools.defer.html)
* [Async Fork](https://aiotools.readthedocs.io/en/latest/aiotools.fork.html)
* [Async Functools](https://aiotools.readthedocs.io/en/latest/aiotools.func.html)
* [Async Itertools](https://aiotools.readthedocs.io/en/latest/aiotools.iter.html)
* [Async Server](https://aiotools.readthedocs.io/en/latest/aiotools.server.html)
* [Async Timer](https://aiotools.readthedocs.io/en/latest/aiotools.timer.html)
* [TaskContext](https://aiotools.readthedocs.io/en/latest/aiotools.taskcontext.html)
* [TaskScope](https://aiotools.readthedocs.io/en/latest/aiotools.taskscope.html)
* (alias of TaskScope) [Supervisor](https://aiotools.readthedocs.io/en/latest/aiotools.supervisor.html)
* (deprecated since 2.0) [(Persistent)TaskGroup](https://aiotools.readthedocs.io/en/latest/aiotools.taskgroup.html)
* [High-level Coroutine Utilities](https://aiotools.readthedocs.io/en/latest/aiotools.utils.html)

Full API documentation: https://aiotools.readthedocs.io


See also
--------

* [anyio](https://github.com/agronholm/anyio): High level asynchronous concurrency and networking framework that works on top of either Trio or asyncio
* [trio](https://github.com/python-trio/trio): An alternative implementation of asyncio focusing on structured concurrency
* [aiometer](https://github.com/florimondmanca/aiometer): A Python concurrency scheduling library, compatible with asyncio and trio.
* [aiojobs](https://github.com/aio-libs/aiojobs): A concurrency-limiting, task-shielding scheduler for asyncio tasks for graceful shutdown

Currently aiotools targets the vanilly asyncio ecosystem only
(with some tight coupling with asyncio internals),
but you may find similar problem definitions and alternative solutions in the above libraries.


Examples
--------

Below are some highlighted usecases of aiotools.
Please refer more detailed logic and backgrounds in [the documentation](https://aiotools.readthedocs.io).

### Safe Cancellation

Consider the following commonly used pattern:
```python
task = asyncio.create_task(...)
task.cancel()
await task  # PROBLEM: would it raise CancelledError or not? should we propagate it or not?
```

It has been the reponsibility of the author of tasks and the caller of them to
coordinate whether to re-raise injected cancellation error.

Now we can use the structured cancellation introduced in Python 3.11:
```python
task = asyncio.create_task(...)
await aiotools.cancel_and_wait(task)
```
which will re-raise the cancellation when there is an external cancellation
request and absorb it otherwise.

Relying on this API whenever you need to cancel asyncio tasks will make your
codebase more consistent because you no longer need to decide whether to
(re-)raise or suppress `CancelledError` in your task codes.

You may also combine `cancel_and_wait()` with `ShieldScope` to guard
a block of codes from cancellation in the middle but defer the cancellation
to the end of the block.

```python
async def work():
    try:
        ...
    except asyncio.CancelledError:
        with aiotools.ShieldScope():
            await cleanup()  # any async code here is not affected by multiple cancellation
            raise

async def parent():
    work_task = asyncio.create_task(work())
    ...
    await cancel_and_wait(work_task)

parent_task = asyncio.create_task(parent())
...
await cancel_and_wait(parent_task)  # it may trigger double cancellation, but it will return after the shielded block completes.
```

### Async Context Manager

This is an asynchronous version of `contextlib.contextmanager` to make it
easier to write asynchronous context managers without creating boilerplate
classes.

```python
import asyncio
import aiotools

@aiotools.actxmgr
async def mygen(a):
    await asyncio.sleep(1)
    yield a + 1
    await asyncio.sleep(1)

async def somewhere():
    async with mygen(1) as b:
        assert b == 2
```

Note that you need to wrap `yield` with a try-finally block to
ensure resource releases (e.g., locks), even in the case when
an exception is ocurred inside the async-with block.

```python
import asyncio
import aiotools

lock = asyncio.Lock()

@aiotools.actxmgr
async def mygen(a):
    await lock.acquire()
    try:
        yield a + 1
    finally:
        lock.release()

async def somewhere():
    try:
        async with mygen(1) as b:
            raise RuntimeError('oops')
    except RuntimeError:
        print('caught!')  # you can catch exceptions here.
```

You can also create a group of async context managers, which
are entered/exited all at once using `asyncio.gather()`.

```python
import asyncio
import aiotools

@aiotools.actxmgr
async def mygen(a):
    yield a + 10

async def somewhere():
    ctxgrp = aiotools.actxgroup(mygen(i) for i in range(10))
    async with ctxgrp as values:
        assert len(values) == 10
        for i in range(10):
            assert values[i] == i + 10
```


### Async Server

This implements a common pattern to launch asyncio-based server daemons.

```python
import asyncio
import aiotools

async def echo(reader, writer):
    data = await reader.read(100)
    writer.write(data)
    await writer.drain()
    writer.close()

@aiotools.server
async def myworker(loop, pidx, args):
    server = await asyncio.start_server(echo, '0.0.0.0', 8888, reuse_port=True)
    print(f'[{pidx}] started')
    yield  # wait until terminated
    server.close()
    await server.wait_closed()
    print(f'[{pidx}] terminated')

if __name__ == '__main__':
    # Run the above server using 4 worker processes.
    aiotools.start_server(myworker, num_workers=4)
```

It handles SIGINT/SIGTERM signals automatically to stop the server,
as well as lifecycle management of event loops running on multiple processes.
Internally it uses `aiotools.fork` module to get kernel support to resolve
potential signal/PID related races via PID file descriptors on supported versions
(Python 3.9+ and Linux kernel 5.4+).


### Async TaskScope

TaskScope is a variant of TaskGroup which ensures all child tasks run to completion
(either having results or exceptions) unless the context manager is cancelled.

This could be considered as a safer version (in terms of lifecycle tracking) of
`asyncio.gather(*, return_exceptions=True)` and a more convenient version of it
because you can decouple the timing of task creation and their scheduling
within the TaskScope context.

```python
import aiotools

async def do():
    async with aiotools.TaskScope() as ts:
        ts.create_task(...)
        ts.create_task(...)
        # each task will run to completion regardless sibling failures.
        ...
    # at this point, all subtasks are either cancelled or done.
```

You may customize exception handler for each scope to receive and process
unhandled exceptions in child tasks.  For use in long-running server contexts,
TaskScope does not store any exceptions or results by itself.

See also high-level coroutine utilities such as `as_completed_safe()`,
`gather_safe()`, and `race()` functions in the utils module.

TaskScope itself and these utilities integrate with
[the call-graph inspection](https://docs.python.org/3/library/asyncio-graph.html)
introduced in Python 3.14.


### Async Timer

```python
import aiotools

i = 0

async def mytick(interval):
    print(i)
    i += 1

async def somewhere():
    task = aiotools.create_timer(mytick, 1.0)
    ...
    await aiotools.cancel_and_wait(task)
```

The returned `task` is an `asyncio.Task` object.
To stop the timer, it should be cancelled explicitly.
Use `cancel_and_wait()` to ensure complete shutdown of any ongoing tick tasks.
To make your timer function to be cancellable, add a try-except clause
catching `asyncio.CancelledError` since we use it as a termination
signal.

You may add `TimerDelayPolicy` argument to control the behavior when the
timer-fired task takes longer than the timer interval.
`DEFAULT` is to accumulate them and cancel all the remainings at once when
the timer is cancelled.
`CANCEL` is to cancel any pending previously fired tasks on every interval.

```python
import asyncio
import aiotools

async def mytick(interval):
    await asyncio.sleep(100)  # cancelled on every next interval.

async def somewhere():
    t = aiotools.create_timer(mytick, 1.0, aiotools.TimerDelayPolicy.CANCEL)
    ...
    await aiotools.cancel_and_wait(t)
```

#### Virtual Clock

It provides a virtual clock that advances the event loop time instantly upon
any combination of `asyncio.sleep()` calls in multiple coroutine tasks,
by temporarily patching the event loop selector.

This is also used in [our timer test suite](https://github.com/achimnol/aiotools/blob/master/tests/test_timer.py).

```python
import aiotools
import pytest

@pytest.mark.asyncio
async def test_sleeps():
    loop = aiotools.compat.get_running_loop()
    vclock = aiotools.VirtualClock()
    with vclock.patch_loop():
        print(loop.time())  # -> prints 0
        await asyncio.sleep(3600)
        print(loop.time())  # -> prints 3600
```

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "aiotools",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.11",
    "maintainer_email": null,
    "keywords": "asyncio",
    "author": null,
    "author_email": "Joongi Kim <me@daybreaker.info>",
    "download_url": "https://files.pythonhosted.org/packages/3b/82/c14fa975a4d9c3695d2fad07207a64bd9015ee53ee5cd488a7fc91ec6604/aiotools-2.2.3.tar.gz",
    "platform": null,
    "description": "aiotools\n========\n\n[![PyPI release version](https://badge.fury.io/py/aiotools.svg)](https://pypi.org/project/aiotools/)\n![Supported Python versions](https://img.shields.io/pypi/pyversions/aiotools.svg)\n[![CI Status](https://github.com/achimnol/aiotools/actions/workflows/ci.yml/badge.svg)](https://github.com/achimnol/aiotools/actions/workflows/ci.yml)\n[![Code Coverage](https://codecov.io/gh/achimnol/aiotools/branch/master/graph/badge.svg)](https://codecov.io/gh/achimnol/aiotools)\n\nIdiomatic asyncio utilities\n\n\nModules\n-------\n\n* [Safe Cancellation](https://aiotools.readthedocs.io/en/latest/aiotools.cancel.html)\n* [Async Context Manager](https://aiotools.readthedocs.io/en/latest/aiotools.context.html)\n* [Async Defer](https://aiotools.readthedocs.io/en/latest/aiotools.defer.html)\n* [Async Fork](https://aiotools.readthedocs.io/en/latest/aiotools.fork.html)\n* [Async Functools](https://aiotools.readthedocs.io/en/latest/aiotools.func.html)\n* [Async Itertools](https://aiotools.readthedocs.io/en/latest/aiotools.iter.html)\n* [Async Server](https://aiotools.readthedocs.io/en/latest/aiotools.server.html)\n* [Async Timer](https://aiotools.readthedocs.io/en/latest/aiotools.timer.html)\n* [TaskContext](https://aiotools.readthedocs.io/en/latest/aiotools.taskcontext.html)\n* [TaskScope](https://aiotools.readthedocs.io/en/latest/aiotools.taskscope.html)\n* (alias of TaskScope) [Supervisor](https://aiotools.readthedocs.io/en/latest/aiotools.supervisor.html)\n* (deprecated since 2.0) [(Persistent)TaskGroup](https://aiotools.readthedocs.io/en/latest/aiotools.taskgroup.html)\n* [High-level Coroutine Utilities](https://aiotools.readthedocs.io/en/latest/aiotools.utils.html)\n\nFull API documentation: https://aiotools.readthedocs.io\n\n\nSee also\n--------\n\n* [anyio](https://github.com/agronholm/anyio): High level asynchronous concurrency and networking framework that works on top of either Trio or asyncio\n* [trio](https://github.com/python-trio/trio): An alternative implementation of asyncio focusing on structured concurrency\n* [aiometer](https://github.com/florimondmanca/aiometer): A Python concurrency scheduling library, compatible with asyncio and trio.\n* [aiojobs](https://github.com/aio-libs/aiojobs): A concurrency-limiting, task-shielding scheduler for asyncio tasks for graceful shutdown\n\nCurrently aiotools targets the vanilly asyncio ecosystem only\n(with some tight coupling with asyncio internals),\nbut you may find similar problem definitions and alternative solutions in the above libraries.\n\n\nExamples\n--------\n\nBelow are some highlighted usecases of aiotools.\nPlease refer more detailed logic and backgrounds in [the documentation](https://aiotools.readthedocs.io).\n\n### Safe Cancellation\n\nConsider the following commonly used pattern:\n```python\ntask = asyncio.create_task(...)\ntask.cancel()\nawait task  # PROBLEM: would it raise CancelledError or not? should we propagate it or not?\n```\n\nIt has been the reponsibility of the author of tasks and the caller of them to\ncoordinate whether to re-raise injected cancellation error.\n\nNow we can use the structured cancellation introduced in Python 3.11:\n```python\ntask = asyncio.create_task(...)\nawait aiotools.cancel_and_wait(task)\n```\nwhich will re-raise the cancellation when there is an external cancellation\nrequest and absorb it otherwise.\n\nRelying on this API whenever you need to cancel asyncio tasks will make your\ncodebase more consistent because you no longer need to decide whether to\n(re-)raise or suppress `CancelledError` in your task codes.\n\nYou may also combine `cancel_and_wait()` with `ShieldScope` to guard\na block of codes from cancellation in the middle but defer the cancellation\nto the end of the block.\n\n```python\nasync def work():\n    try:\n        ...\n    except asyncio.CancelledError:\n        with aiotools.ShieldScope():\n            await cleanup()  # any async code here is not affected by multiple cancellation\n            raise\n\nasync def parent():\n    work_task = asyncio.create_task(work())\n    ...\n    await cancel_and_wait(work_task)\n\nparent_task = asyncio.create_task(parent())\n...\nawait cancel_and_wait(parent_task)  # it may trigger double cancellation, but it will return after the shielded block completes.\n```\n\n### Async Context Manager\n\nThis is an asynchronous version of `contextlib.contextmanager` to make it\neasier to write asynchronous context managers without creating boilerplate\nclasses.\n\n```python\nimport asyncio\nimport aiotools\n\n@aiotools.actxmgr\nasync def mygen(a):\n    await asyncio.sleep(1)\n    yield a + 1\n    await asyncio.sleep(1)\n\nasync def somewhere():\n    async with mygen(1) as b:\n        assert b == 2\n```\n\nNote that you need to wrap `yield` with a try-finally block to\nensure resource releases (e.g., locks), even in the case when\nan exception is ocurred inside the async-with block.\n\n```python\nimport asyncio\nimport aiotools\n\nlock = asyncio.Lock()\n\n@aiotools.actxmgr\nasync def mygen(a):\n    await lock.acquire()\n    try:\n        yield a + 1\n    finally:\n        lock.release()\n\nasync def somewhere():\n    try:\n        async with mygen(1) as b:\n            raise RuntimeError('oops')\n    except RuntimeError:\n        print('caught!')  # you can catch exceptions here.\n```\n\nYou can also create a group of async context managers, which\nare entered/exited all at once using `asyncio.gather()`.\n\n```python\nimport asyncio\nimport aiotools\n\n@aiotools.actxmgr\nasync def mygen(a):\n    yield a + 10\n\nasync def somewhere():\n    ctxgrp = aiotools.actxgroup(mygen(i) for i in range(10))\n    async with ctxgrp as values:\n        assert len(values) == 10\n        for i in range(10):\n            assert values[i] == i + 10\n```\n\n\n### Async Server\n\nThis implements a common pattern to launch asyncio-based server daemons.\n\n```python\nimport asyncio\nimport aiotools\n\nasync def echo(reader, writer):\n    data = await reader.read(100)\n    writer.write(data)\n    await writer.drain()\n    writer.close()\n\n@aiotools.server\nasync def myworker(loop, pidx, args):\n    server = await asyncio.start_server(echo, '0.0.0.0', 8888, reuse_port=True)\n    print(f'[{pidx}] started')\n    yield  # wait until terminated\n    server.close()\n    await server.wait_closed()\n    print(f'[{pidx}] terminated')\n\nif __name__ == '__main__':\n    # Run the above server using 4 worker processes.\n    aiotools.start_server(myworker, num_workers=4)\n```\n\nIt handles SIGINT/SIGTERM signals automatically to stop the server,\nas well as lifecycle management of event loops running on multiple processes.\nInternally it uses `aiotools.fork` module to get kernel support to resolve\npotential signal/PID related races via PID file descriptors on supported versions\n(Python 3.9+ and Linux kernel 5.4+).\n\n\n### Async TaskScope\n\nTaskScope is a variant of TaskGroup which ensures all child tasks run to completion\n(either having results or exceptions) unless the context manager is cancelled.\n\nThis could be considered as a safer version (in terms of lifecycle tracking) of\n`asyncio.gather(*, return_exceptions=True)` and a more convenient version of it\nbecause you can decouple the timing of task creation and their scheduling\nwithin the TaskScope context.\n\n```python\nimport aiotools\n\nasync def do():\n    async with aiotools.TaskScope() as ts:\n        ts.create_task(...)\n        ts.create_task(...)\n        # each task will run to completion regardless sibling failures.\n        ...\n    # at this point, all subtasks are either cancelled or done.\n```\n\nYou may customize exception handler for each scope to receive and process\nunhandled exceptions in child tasks.  For use in long-running server contexts,\nTaskScope does not store any exceptions or results by itself.\n\nSee also high-level coroutine utilities such as `as_completed_safe()`,\n`gather_safe()`, and `race()` functions in the utils module.\n\nTaskScope itself and these utilities integrate with\n[the call-graph inspection](https://docs.python.org/3/library/asyncio-graph.html)\nintroduced in Python 3.14.\n\n\n### Async Timer\n\n```python\nimport aiotools\n\ni = 0\n\nasync def mytick(interval):\n    print(i)\n    i += 1\n\nasync def somewhere():\n    task = aiotools.create_timer(mytick, 1.0)\n    ...\n    await aiotools.cancel_and_wait(task)\n```\n\nThe returned `task` is an `asyncio.Task` object.\nTo stop the timer, it should be cancelled explicitly.\nUse `cancel_and_wait()` to ensure complete shutdown of any ongoing tick tasks.\nTo make your timer function to be cancellable, add a try-except clause\ncatching `asyncio.CancelledError` since we use it as a termination\nsignal.\n\nYou may add `TimerDelayPolicy` argument to control the behavior when the\ntimer-fired task takes longer than the timer interval.\n`DEFAULT` is to accumulate them and cancel all the remainings at once when\nthe timer is cancelled.\n`CANCEL` is to cancel any pending previously fired tasks on every interval.\n\n```python\nimport asyncio\nimport aiotools\n\nasync def mytick(interval):\n    await asyncio.sleep(100)  # cancelled on every next interval.\n\nasync def somewhere():\n    t = aiotools.create_timer(mytick, 1.0, aiotools.TimerDelayPolicy.CANCEL)\n    ...\n    await aiotools.cancel_and_wait(t)\n```\n\n#### Virtual Clock\n\nIt provides a virtual clock that advances the event loop time instantly upon\nany combination of `asyncio.sleep()` calls in multiple coroutine tasks,\nby temporarily patching the event loop selector.\n\nThis is also used in [our timer test suite](https://github.com/achimnol/aiotools/blob/master/tests/test_timer.py).\n\n```python\nimport aiotools\nimport pytest\n\n@pytest.mark.asyncio\nasync def test_sleeps():\n    loop = aiotools.compat.get_running_loop()\n    vclock = aiotools.VirtualClock()\n    with vclock.patch_loop():\n        print(loop.time())  # -> prints 0\n        await asyncio.sleep(3600)\n        print(loop.time())  # -> prints 3600\n```\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "Idiomatic asyncio utilities",
    "version": "2.2.3",
    "project_urls": {
        "Code Coverage": "https://codecov.io/github/achimnol/aiotools",
        "Documentation": "https://aiotools.readthedocs.io",
        "Source": "https://github.com/achimnol/aiotools",
        "Tracker": "https://github.com/achimnol/aiotools/issues"
    },
    "split_keywords": [
        "asyncio"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "dc2373b496e20e1c615aaaa28fa016a2e1cfd802167632653945a853c7f46387",
                "md5": "40c1ad14f294082fe662dd959e629c65",
                "sha256": "f164eb41d17517d8d03da2715e827dd03a0aa99be48c9f7f4463f1129803e18e"
            },
            "downloads": -1,
            "filename": "aiotools-2.2.3-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "40c1ad14f294082fe662dd959e629c65",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.11",
            "size": 46149,
            "upload_time": "2025-10-20T08:33:53",
            "upload_time_iso_8601": "2025-10-20T08:33:53.202708Z",
            "url": "https://files.pythonhosted.org/packages/dc/23/73b496e20e1c615aaaa28fa016a2e1cfd802167632653945a853c7f46387/aiotools-2.2.3-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "3b82c14fa975a4d9c3695d2fad07207a64bd9015ee53ee5cd488a7fc91ec6604",
                "md5": "d8aaff5fa67db9ae83dde094ef66f4d6",
                "sha256": "cb6756db339dde5c9bb5091a4ee5fa25e577f074175f4f272c0b4e99e5d6f0de"
            },
            "downloads": -1,
            "filename": "aiotools-2.2.3.tar.gz",
            "has_sig": false,
            "md5_digest": "d8aaff5fa67db9ae83dde094ef66f4d6",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.11",
            "size": 148598,
            "upload_time": "2025-10-20T08:33:54",
            "upload_time_iso_8601": "2025-10-20T08:33:54.763875Z",
            "url": "https://files.pythonhosted.org/packages/3b/82/c14fa975a4d9c3695d2fad07207a64bd9015ee53ee5cd488a7fc91ec6604/aiotools-2.2.3.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-10-20 08:33:54",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "achimnol",
    "github_project": "aiotools",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "aiotools"
}
        
Elapsed time: 2.12414s