aio-celery
==========
What is aio-celery?
-------------------
This project is an alternative independent asyncio implementation of [Celery](https://docs.celeryq.dev).
Quoting Celery [documentation](https://docs.celeryq.dev/en/latest/getting-started/introduction.html#what-s-a-task-queue):
> Celery is written in Python, but the protocol can be implemented in any language.
And aio-celery does exactly this, it (re)implements
[Celery Message Protocol](https://docs.celeryq.dev/en/latest/internals/protocol.html)
(in Python) in order to unlock access to asyncio tasks and workers.
The most notable feature of aio-celery is that it does not depend on Celery codebase.
It is written completely from scratch as a thin wrapper around [aio-pika](https://github.com/mosquito/aio-pika)
(which is an asyncronous RabbitMQ python driver)
and it has no other dependencies (except for [redis-py](https://github.com/redis/redis-py) for result backend support, but this dependency is optional).
There have been attempts to create asyncio Celery Pools before, and [celery-pool-asyncio](https://pypi.org/project/celery-pool-asyncio/)
is one such example, but its implementation, due to convoluted structure
of the original Celery codebase, is (by necessity) full of monkeypatching and other
fragile techniques. This fragility was apparently the [reason](https://github.com/kai3341/celery-pool-asyncio/issues/29)
why this library became incompatible with Celery version 5.
Celery project itself clearly [struggles](https://github.com/celery/celery/issues/7874) with implementing Asyncio Coroutine support,
constantly delaying this feature due to apparent architectural difficulties.
This project was created in an attempt to solve the same problem but using the opposite approach.
It implements only a limited (but still usable — that is the whole point) subset of Celery functionality
without relying on Celery code at all — the goal is to mimic the basic
wire protocol and to support a subset of Celery API minimally required for running and manipulating
tasks.
Features
--------
What is supported:
* Basic tasks API: `@app.task` decorator, `delay` and `apply_async` task methods, `AsyncResult` class etc.
* Everything is asyncio-friendly and awaitable
* Asyncronous Celery worker that is started from the command line
* Routing and publishing options such as `countdown`, `eta`, `queue`, `priority`, etc.
* Task retries
* Only RabbitMQ as a message broker
* Only Redis as a result backend
Important design decisions for aio-celery:
* Complete feature parity with upstream Celery project is not the goal
* The parts that are implemented mimic original Celery API as close as possible, down to
class and attribute names
* The codebase of this project is kept as simple and as concise, it strives to be easy to understand and reason about
* The codebase is maintained to be as small as possible – the less code, the fewer bugs
* External dependencies are kept to a minimum for the same purpose
* This project must not at any point have celery as its external dependency
Installation
------------
Install using [pip](https://pip.pypa.io/en/stable/getting-started/):
```bash
pip install aio-celery
```
If you intend to use Redis result backend for storing task results, run this command:
```bash
pip install aio-celery[redis]
```
Usage
-----
Define `Celery` application instance and register a task:
```python
# hello.py
import asyncio
from aio_celery import Celery
app = Celery()
@app.task(name="add-two-numbers")
async def add(a, b):
await asyncio.sleep(5)
return a + b
```
Then run worker:
```bash
$ aio_celery worker hello:app
```
Queue some tasks:
```python
# publish.py
import asyncio
from hello import add, app
async def publish():
async with app.setup():
tasks = [add.delay(n, n) for n in range(50000)]
await asyncio.gather(*tasks)
asyncio.run(publish())
```
```bash
$ python3 publish.py
```
The last script concurrently publishes 50000 messages to RabbitMQ. It takes about 8 seconds to finish,
with gives average publishing rate of about 6000 messages per second.
### Using Redis Result Backend
```python
import asyncio
from aio_celery import Celery
from aio_celery.exceptions import TimeoutError
app = Celery()
app.conf.update(
result_backend="redis://localhost:6379",
)
@app.task(name="do-something")
async def foo(x, y, z):
await asyncio.sleep(5)
return x + y - z
async def main():
async with app.setup():
result = await foo.delay(1, 2, 3)
try:
value = await result.get(timeout=10)
except TimeoutError:
print("Result is not ready after 10 seconds")
else:
print("Result is", value)
if __name__ == "__main__":
asyncio.run(main())
```
### Adding context
```python
import contextlib
import asyncpg
from aio_celery import Celery
app = Celery()
@app.define_app_context
@contextlib.asynccontextmanager
async def setup_context():
async with asyncpg.create_pool("postgresql://localhost:5432", max_size=10) as pool:
yield {"pool": pool}
@app.task
async def get_postgres_version():
async with app.context["pool"].acquire() as conn:
version = await conn.fetchval("SELECT version()")
return version
```
### Retries
```python
import random
from aio_celery import Celery
app = Celery()
@app.task(name="add-two-numbers", bind=True, max_retries=3)
async def add(self, a, b):
if random.random() > 0.25:
# Sends task to queue and raises `aio_celery.exception.Retry` exception.
await self.retry(countdown=2)
```
### Priorities and Queues
Support for [RabbitMQ Message Priorities](https://docs.celeryq.dev/en/stable/userguide/routing.html#rabbitmq-message-priorities):
```python
import asyncio
from aio_celery import Celery
app = Celery()
app.conf.update(
task_default_priority=5, # global default for all tasks
task_default_queue="queue-a", # global default for all tasks
task_queue_max_priority=10, # sets `x-max-priority` argument for RabbitMQ Queue
)
@app.task(
name="add-two-numbers",
priority=6, # per task default (overrides global default)
queue="queue-b", # per task default (overrider global default)
)
async def add(a, b):
await asyncio.sleep(3)
return a + b
async def main():
async with app.setup():
await add.apply_async(
args=(2, 3),
priority=7, # overrides all defaults
queue="queue-c", # overrides all defaults
)
if __name__ == "__main__":
asyncio.run(main())
```
See also [RabbitMQ documentation](https://www.rabbitmq.com/priority.html) on priorities.
### Send unregistered task by name
```python
import asyncio
from aio_celery import Celery
app = Celery()
app.conf.update(
result_backend="redis://localhost:6379",
)
async def main():
async with app.setup():
result = await app.send_task(
"add-two-numbers",
args=(3, 4),
queue="high-priority",
countdown=30,
)
print(await result.get(timeout=5))
if __name__ == "__main__":
asyncio.run(main())
```
### Register tasks using `@shared_task` decorator
Analogous to original Celery [feature](https://docs.celeryq.dev/en/latest/django/first-steps-with-django.html#using-the-shared-task-decorator),
the `@shared_task` decorator lets you create tasks without having any concrete app instance:
```python
from aio_celery import Celery, shared_task
@shared_task
async def add(a, b):
return a + b
app = Celery() # `add` task is already registered on `app` instance
```
References
----------
### Similar Projects
https://github.com/cr0hn/aiotasks
https://github.com/the-wondersmith/celery-aio-pool
https://github.com/kai3341/celery-pool-asyncio
### Inspiration
https://github.com/taskiq-python/taskiq
### Relevant Discussions
https://github.com/celery/celery/issues/3884
https://github.com/celery/celery/issues/7874
https://github.com/anomaly/lab-python-server/issues/21
https://github.com/anomaly/lab-python-server/issues/32
Raw data
{
"_id": null,
"home_page": "https://github.com/earlgreyness/aio-celery",
"name": "aio-celery",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.8",
"maintainer_email": null,
"keywords": "asyncio, celery",
"author": "Kirill Kondratenko",
"author_email": "earlgreatness@gmail.com",
"download_url": "https://files.pythonhosted.org/packages/dc/5b/f9bcb10a2eb7aba67eb2115cd8b1716f953618936ed0a73f6b40230f792c/aio_celery-0.20.0.tar.gz",
"platform": null,
"description": "aio-celery\n==========\n\nWhat is aio-celery?\n-------------------\n\nThis project is an alternative independent asyncio implementation of [Celery](https://docs.celeryq.dev).\n\n\nQuoting Celery [documentation](https://docs.celeryq.dev/en/latest/getting-started/introduction.html#what-s-a-task-queue):\n\n> Celery is written in Python, but the protocol can be implemented in any language.\n\nAnd aio-celery does exactly this, it (re)implements\n[Celery Message Protocol](https://docs.celeryq.dev/en/latest/internals/protocol.html)\n(in Python) in order to unlock access to asyncio tasks and workers. \n\nThe most notable feature of aio-celery is that it does not depend on Celery codebase.\nIt is written completely from scratch as a thin wrapper around [aio-pika](https://github.com/mosquito/aio-pika)\n(which is an asyncronous RabbitMQ python driver)\nand it has no other dependencies (except for [redis-py](https://github.com/redis/redis-py) for result backend support, but this dependency is optional).\n\nThere have been attempts to create asyncio Celery Pools before, and [celery-pool-asyncio](https://pypi.org/project/celery-pool-asyncio/)\nis one such example, but its implementation, due to convoluted structure \nof the original Celery codebase, is (by necessity) full of monkeypatching and other\nfragile techniques. This fragility was apparently the [reason](https://github.com/kai3341/celery-pool-asyncio/issues/29)\nwhy this library became incompatible with Celery version 5.\n\nCelery project itself clearly [struggles](https://github.com/celery/celery/issues/7874) with implementing Asyncio Coroutine support,\nconstantly delaying this feature due to apparent architectural difficulties.\n\nThis project was created in an attempt to solve the same problem but using the opposite approach.\nIt implements only a limited (but still usable \u2014 that is the whole point) subset of Celery functionality\nwithout relying on Celery code at all \u2014 the goal is to mimic the basic\nwire protocol and to support a subset of Celery API minimally required for running and manipulating\ntasks.\n\nFeatures\n--------\n\nWhat is supported:\n\n* Basic tasks API: `@app.task` decorator, `delay` and `apply_async` task methods, `AsyncResult` class etc.\n* Everything is asyncio-friendly and awaitable\n* Asyncronous Celery worker that is started from the command line\n* Routing and publishing options such as `countdown`, `eta`, `queue`, `priority`, etc.\n* Task retries\n* Only RabbitMQ as a message broker\n* Only Redis as a result backend\n\nImportant design decisions for aio-celery:\n\n* Complete feature parity with upstream Celery project is not the goal\n* The parts that are implemented mimic original Celery API as close as possible, down to\nclass and attribute names\n* The codebase of this project is kept as simple and as concise, it strives to be easy to understand and reason about\n* The codebase is maintained to be as small as possible \u2013 the less code, the fewer bugs\n* External dependencies are kept to a minimum for the same purpose\n* This project must not at any point have celery as its external dependency \n\nInstallation\n------------\nInstall using [pip](https://pip.pypa.io/en/stable/getting-started/):\n\n```bash\npip install aio-celery\n```\n\nIf you intend to use Redis result backend for storing task results, run this command:\n```bash\npip install aio-celery[redis]\n```\n\nUsage\n-----\nDefine `Celery` application instance and register a task:\n```python\n# hello.py\nimport asyncio\nfrom aio_celery import Celery\n\napp = Celery()\n\n@app.task(name=\"add-two-numbers\")\nasync def add(a, b):\n await asyncio.sleep(5)\n return a + b\n```\n\nThen run worker:\n\n```bash\n$ aio_celery worker hello:app\n```\n\nQueue some tasks:\n\n```python\n# publish.py\nimport asyncio\nfrom hello import add, app\n\nasync def publish():\n async with app.setup():\n tasks = [add.delay(n, n) for n in range(50000)]\n await asyncio.gather(*tasks)\n\nasyncio.run(publish())\n```\n```bash\n$ python3 publish.py\n```\nThe last script concurrently publishes 50000 messages to RabbitMQ. It takes about 8 seconds to finish,\nwith gives average publishing rate of about 6000 messages per second.\n\n\n### Using Redis Result Backend\n\n```python\nimport asyncio\nfrom aio_celery import Celery\nfrom aio_celery.exceptions import TimeoutError\n\napp = Celery()\napp.conf.update(\n result_backend=\"redis://localhost:6379\",\n)\n\n@app.task(name=\"do-something\")\nasync def foo(x, y, z):\n await asyncio.sleep(5)\n return x + y - z\n\nasync def main():\n async with app.setup():\n result = await foo.delay(1, 2, 3)\n try:\n value = await result.get(timeout=10)\n except TimeoutError:\n print(\"Result is not ready after 10 seconds\")\n else:\n print(\"Result is\", value)\n\nif __name__ == \"__main__\":\n asyncio.run(main())\n```\n\n### Adding context\n\n```python\nimport contextlib\nimport asyncpg\nfrom aio_celery import Celery\n\napp = Celery()\n\n@app.define_app_context\n@contextlib.asynccontextmanager\nasync def setup_context():\n async with asyncpg.create_pool(\"postgresql://localhost:5432\", max_size=10) as pool:\n yield {\"pool\": pool}\n\n@app.task\nasync def get_postgres_version():\n async with app.context[\"pool\"].acquire() as conn:\n version = await conn.fetchval(\"SELECT version()\")\n return version\n\n```\n\n### Retries\n\n```python\nimport random\nfrom aio_celery import Celery\n\napp = Celery()\n\n@app.task(name=\"add-two-numbers\", bind=True, max_retries=3)\nasync def add(self, a, b):\n if random.random() > 0.25:\n # Sends task to queue and raises `aio_celery.exception.Retry` exception.\n await self.retry(countdown=2)\n```\n\n### Priorities and Queues\n\nSupport for [RabbitMQ Message Priorities](https://docs.celeryq.dev/en/stable/userguide/routing.html#rabbitmq-message-priorities):\n\n```python\nimport asyncio\nfrom aio_celery import Celery\n\napp = Celery()\napp.conf.update(\n task_default_priority=5, # global default for all tasks\n task_default_queue=\"queue-a\", # global default for all tasks\n task_queue_max_priority=10, # sets `x-max-priority` argument for RabbitMQ Queue\n)\n\n@app.task(\n name=\"add-two-numbers\",\n priority=6, # per task default (overrides global default)\n queue=\"queue-b\", # per task default (overrider global default)\n)\nasync def add(a, b):\n await asyncio.sleep(3)\n return a + b\n\nasync def main():\n async with app.setup():\n await add.apply_async(\n args=(2, 3),\n priority=7, # overrides all defaults\n queue=\"queue-c\", # overrides all defaults\n )\n\nif __name__ == \"__main__\":\n asyncio.run(main())\n```\n\nSee also [RabbitMQ documentation](https://www.rabbitmq.com/priority.html) on priorities.\n\n### Send unregistered task by name\n\n```python\nimport asyncio\nfrom aio_celery import Celery\n\napp = Celery()\napp.conf.update(\n result_backend=\"redis://localhost:6379\",\n)\n\nasync def main():\n async with app.setup():\n result = await app.send_task(\n \"add-two-numbers\",\n args=(3, 4),\n queue=\"high-priority\",\n countdown=30,\n )\n print(await result.get(timeout=5))\n\nif __name__ == \"__main__\":\n asyncio.run(main())\n```\n\n### Register tasks using `@shared_task` decorator\n\nAnalogous to original Celery [feature](https://docs.celeryq.dev/en/latest/django/first-steps-with-django.html#using-the-shared-task-decorator),\nthe `@shared_task` decorator lets you create tasks without having any concrete app instance:\n\n```python\nfrom aio_celery import Celery, shared_task\n\n@shared_task\nasync def add(a, b):\n return a + b\n\napp = Celery() # `add` task is already registered on `app` instance\n```\n\nReferences\n----------\n\n### Similar Projects\n\nhttps://github.com/cr0hn/aiotasks\n\nhttps://github.com/the-wondersmith/celery-aio-pool\n\nhttps://github.com/kai3341/celery-pool-asyncio\n\n### Inspiration\n\nhttps://github.com/taskiq-python/taskiq\n\n### Relevant Discussions\n\nhttps://github.com/celery/celery/issues/3884\n\nhttps://github.com/celery/celery/issues/7874\n\nhttps://github.com/anomaly/lab-python-server/issues/21\n\nhttps://github.com/anomaly/lab-python-server/issues/32\n",
"bugtrack_url": null,
"license": null,
"summary": "Celery worker for running asyncio coroutine tasks",
"version": "0.20.0",
"project_urls": {
"Homepage": "https://github.com/earlgreyness/aio-celery"
},
"split_keywords": [
"asyncio",
" celery"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "442e0a8444c9e7620af7de3b4f6c3393dc9f8979bba2738ae784363519844195",
"md5": "c77ee304d84312f724a926b409416452",
"sha256": "46f44731b44b37470e27385b35d1e4fbaead1a7b77110cca1acda40917ae0fda"
},
"downloads": -1,
"filename": "aio_celery-0.20.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "c77ee304d84312f724a926b409416452",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.8",
"size": 25978,
"upload_time": "2024-10-10T07:17:21",
"upload_time_iso_8601": "2024-10-10T07:17:21.234297Z",
"url": "https://files.pythonhosted.org/packages/44/2e/0a8444c9e7620af7de3b4f6c3393dc9f8979bba2738ae784363519844195/aio_celery-0.20.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "dc5bf9bcb10a2eb7aba67eb2115cd8b1716f953618936ed0a73f6b40230f792c",
"md5": "33b016b5cd877df133fb202413c5b414",
"sha256": "5677f26a6f05e194720260f9851722000d02c6bbe66e10473fa369e33af58022"
},
"downloads": -1,
"filename": "aio_celery-0.20.0.tar.gz",
"has_sig": false,
"md5_digest": "33b016b5cd877df133fb202413c5b414",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.8",
"size": 22927,
"upload_time": "2024-10-10T07:17:22",
"upload_time_iso_8601": "2024-10-10T07:17:22.765823Z",
"url": "https://files.pythonhosted.org/packages/dc/5b/f9bcb10a2eb7aba67eb2115cd8b1716f953618936ed0a73f6b40230f792c/aio_celery-0.20.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-10-10 07:17:22",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "earlgreyness",
"github_project": "aio-celery",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"lcname": "aio-celery"
}