# Aqute
**A**sync **QU**eue **T**ask **E**ngine
Aqute is a minimalist yet potent Python library specifically designed
for hassle-free asynchronous task processing. Leveraging the power of async programming,
Aqute offers:
- **Efficient Producer-Consumer Model**: Utilizing the Producer-Consumer pattern with
multiple workers, Aqute ensures streamlined task distribution and swift concurrent processing.
- **Worker count & Rate Limiting**: Regulate the execution rate of tasks and configure
the number of workers for concurrent processing, ensuring optimal resource utilization.
You can use pre-programmed rate limiters or provide your own rate limiting implementation.
- **Resilient Retry Mechanism**: Tasks that encounter errors can automatically retry,
with options to specify which error types should trigger retries.
Exception in handler is returned as error-value.
- **Versatile task adding**: You can process the whole batch or add tasks on the fly,
depending on your needs.
- **Lightweight and simple**: Aqute operates efficiently without relying on any
external dependencies, ensuring seamless integration and minimal footprint in your projects.
Aqute simplifies task management in asynchronous landscapes, allowing developers
to focus on the task logic rather than concurrency challenges.
## Table of Contents
- [Install](#install)
- [Quickstart](#quickstart)
- [How to use it?](#how-to-use-it)
- [Simple batch processing](#simple-batch-processing)
- [Result as async generator per completed](#result-as-async-generator-per-completed)
- [Infinite loop](#infinite-loop)
- [Rate limiting](#rate-limiting)
- [Manual task adding, context manager and error retry](#manual-task-adding-context-manager-and-error-retry)
- [Manual flow management and custom result queue](#manual-flow-management-and-custom-result-queue)
- [Even more manual management and internal worker queue size](#even-more-manual-management-and-internal-worker-queue-size)
- [Use priroty queue](#use-priroty-queue)
- [Task timeout setting](#task-timeout-setting)
- [Early stopping on too many failed tasks](#early-stopping-on-too-many-failed-tasks)
- [Barebone queue via Foreman](#barebone-queue-via-foreman)
- [Some caveats](#some-caveats)
- [Start load timeout](#start-load-timeout)
- [You can't wait on not started Aqute](#you-can-t-wait-on-not-started-aqute)
- [Misc](#misc)
- [Instance reuse after stop()](#instance-reuse-after-stop)
- [Type checking and generics](#type-checking-and-generics)
# Install
Python 3.9+ required:
```bash
pip install aqute
```
# Quickstart
Apply your async function to each item of some iterable and get list of wrapped in
`AquteTask` results, ordered the same way:
```python
import asyncio
from aqute import Aqute
async def main():
async def handler(i: int) -> int:
await asyncio.sleep(i / 20)
return i * 2
aqute = Aqute(handle_coro=handler, workers_count=2)
result = await aqute.apply_to_all(range(10))
# Do not forget to extract result data from wrapper object with <result> property
assert [t.result for t in result] == [i * 2 for i in range(10)]
asyncio.run(main())
```
# How to use it?
While a deep dive is available through Aqute's method docstrings, it's not necessary.
Aqute is easy to use for both simple and advanced workflows.
## Simple batch processing
The easiest way to use Aqute is `apply_to_all()` method:
```python
import asyncio
import logging
from random import randint, random
from aqute import Aqute, AquteError
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(name)s - %(message)s')
logger = logging.getLogger("main")
async def handler(i: int) -> str:
"""
NOTE: This is a mock handler for demonstration purposes.
Replace the logic below with your own specific task processing logic.
"""
await asyncio.sleep(0.01 + (0.09) * random())
# Guaranties failures for some tasks of examples
if i >= 19:
raise KeyError(f"The Key for {i}")
# Here we have some randomness, so you can see retry after errors in play
r = randint(1, 101)
if r >= 80:
raise ValueError(f"Got big number for {i}")
return f"success {i}"
async def main():
# Getting started example, the most simple
input_data = list(range(20))
# This will apply handler to every item of iterable and return result as list
# with task results ordered as input iterable
aqute = Aqute(handle_coro=handler, workers_count=10, retry_count=2)
result = await aqute.apply_to_all(input_data)
# Each task result is wrapped in AquteTask instance
assert [t.data for t in result] == input_data
...
asyncio.run(main())
```
## Result as async generator per completed
```python
# Like previous but the result is async generator and the tasks are yielded
# in completion order
input_data = list(range(20))
aqute = Aqute(handle_coro=handler, workers_count=10)
done, with_errors = [], []
# You can determine final task status with specific success field
async for task in aqute.apply_to_each(input_data):
if task.success:
done.append(task)
else:
with_errors.append(task)
assert len(done + with_errors) == len(input_data)
```
## Infinite loop
You can run aqute on "infinite" amount of tasks if needed and control "end"
from outside. Outside of context or with `stop()` coro it will shutdown gracefully.
```python
import asyncio
import logging
from random import random
from aqute import Aqute
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(name)s - %(message)s')
logger = logging.getLogger("main")
async def handler(i: int) -> str:
"""
NOTE: This is a mock handler for demonstration purposes.
"""
await asyncio.sleep(0.01 + (0.009) * random())
return f"success {i}"
async def main():
"""
This example shows "infinite"workflow for aqute.
Here limited by 10_000 tasks, but it can be without any end.
"""
# Only to show ruling the done / not done status from outside
TASK_LIMIT = 10_000
done = asyncio.Event()
aqute = Aqute(handle_coro=handler, workers_count=20)
async def add_tasks():
# You can add tasks externally
for i in range(TASK_LIMIT):
await aqute.add_task(i)
# Sleep to simulate some delay between tasks adding (req, calc, etc.)
await asyncio.sleep(0.01 + (0.009) * random())
result = []
async def collect_results():
# You can collect and handle results externally too
while len(result) < TASK_LIMIT:
task = await aqute.get_task_result()
result.append(task)
done.set()
async with aqute:
asyncio.create_task(add_tasks())
asyncio.create_task(collect_results())
# Better to use await done.wait() instead of while loop,
# but it's just for demonstration purposes
while not done.is_set():
logger.info(f"Done tasks: {len(result):_}/{TASK_LIMIT:_}")
await asyncio.sleep(5)
logger.info(f"Done tasks: {len(result):_}/{TASK_LIMIT:_}")
asyncio.run(main())
```
## Rate limiting
You can also add RateLimiter instance to Aqute for rate limiting:
```python
from aqute.ratelimiter import TokenBucketRateLimiter
# Applied rate limiter with 5 handler calls per second.
input_data = list(range(20))
r_limit = TokenBucketRateLimiter(5, 1)
aqute = Aqute(handle_coro=handler, workers_count=10, rate_limiter=r_limit)
result = []
async for task in aqute.apply_to_each(input_data):
result.append(task)
assert len(result) == len(input_data)
```
There are three available `RateLimiter` implementations:
- `TokenBucketRateLimiter`: steady rate by default, burstable with `allow_burst` option;
- `SlidingRateLimiter`: next call will be available after enough time from the oldest one;
- `PerWorkerRateLimiter`: enforces separate rate limits for each unique worker with separate `TokenBucketRateLimiter` instances;
- `RandomizedIntervalRateLimiter`: introduces more random intervals between each call,
but enforcing `max_rate` over `time_period` limit.
You can write your own `RateLimiter` implementation with specific algorithm if needed.
## Manual task adding, context manager and error retry
This can be most useful if not all of your tasks are available at the start:
```python
# You can add tasks manually and also start/stop aqute with context
# manager. And even add tasks on the fly.
# Aqute is reliable for errors retry, you can specify your own
# retry count (0 is used for no retries by default) and specify errors to retry or not
# to keep retrying on all errors
aqute = Aqute(
handle_coro=handler,
workers_count=10,
# We we will retry 5 more times after first fail
retry_count=5,
# We retry only ValueError here
specific_errors_to_retry=(ValueError,),
# Either you can set this option to not retry only on ValueError instead
errors_to_not_retry=(ValueError,),
)
for i in range(10):
# You also can use your own task id for identification
await aqute.add_task(i, task_id=f"My task id: {i}")
async with aqute:
await asyncio.sleep(0.1)
for i in range(10, 15):
await aqute.add_task(i, task_id=f"My task id: {i}")
await asyncio.sleep(0.1)
for i in range(15, 20):
await aqute.add_task(i, task_id=f"My task id: {i}")
# Set waiting for finalization when you have all tasks added
await aqute.wait_till_end()
# You can simply extract all results from queue with this method if aqute has
# finished, returns the list of AquteTask
for tr in aqute.extract_all_results():
logger.info(f"{tr.success, tr.error, tr.result}")
```
## Manual flow management and custom result queue
```python
# You can manage the whole workflow manually if needed and use your own
# result queue instance (with limited size for example)
result_q = asyncio.Queue(5)
aqute = Aqute(handle_coro=handler, workers_count=10, result_queue=result_q)
for i in range(10):
# We can't await here cause we will hang without queue emptying
asyncio.create_task(aqute.add_task(i))
await asyncio.sleep(0.1)
# Starting the processing
aqute.start()
# Sleep enough for possibly all task to finish
await asyncio.sleep(1)
# We can see our result sizing works
assert result_q.qsize() == 5
for _ in range(5):
await result_q.get()
# Now wait till all finished via specific method, this also notifies
# aqute that we have added all tasks
await aqute.wait_till_end()
assert result_q.qsize() == 5
# Stop the aqute
await aqute.stop()
```
## Even more manual management and internal worker queue size
```python
# You can configure internal queue size for consumers if you want it to be limited
aqute = Aqute(
handle_coro=handler, workers_count=10, input_task_queue_size=2
)
for i in range(10):
await aqute.add_task(i)
# Should set it before awaiting bare start() if we want
aqute.set_all_tasks_added()
aqute_run_aiotask = aqute.start()
await aqute_run_aiotask
await aqute.stop()
assert aqute.result_queue.qsize() == 10
```
## Use priroty queue
You can prioritize tasks by setting `use_priority_queue` flag:
```python
async def handler(i: int) -> int:
return i
# Set flag for prioritezed queue, default task priority is 1_000_000
aqute = Aqute(
workers_count=1,
handle_coro=handler,
use_priority_queue=True,
)
await aqute.add_task(1_000_000)
await aqute.add_task(10, task_priority=10)
await aqute.add_task(5, task_priority=5)
await aqute.add_task(10, task_priority=10)
await aqute.add_task(1, task_priority=1)
async with aqute:
await aqute.wait_till_end()
results = aqute.extract_all_results()
assert [t.data for t in results] == [1, 5, 10, 10, 1_000_000]
```
## Task timeout setting
The `task_timeout_seconds` option is used to specify a time limit for each task.
If a task exceeds this duration, it is considered a timeout and is
handled according to the specified retry logic.
### Timeout with retries
By default setting timeout will result in task timeout if task exceeds value, but
retry logic is applied if `retry_count > 0`.
```python
aqute = Aqute(
handle_coro=your_handler_coroutine,
task_timeout_seconds=5,
retry_count=2,
# other parameters
)
```
### Do not retry on timeout
To disable this behavior you can set `errors_to_not_retry` with `AquteTaskTimeoutError`:
```python
aqute = Aqute(
handle_coro=your_handler_coroutine,
task_timeout_seconds=5,
retry_count=2,
errors_to_not_retry=AquteTaskTimeoutError,
# other parameters
)
```
## Early stopping on too many failed tasks
If you want to stop the processing when too many tasks have failed, you can use the
`total_failed_tasks_limit` option. This will raise `AquteTooManyTasksFailedError` if
the limit is reached before all tasks are processed:
```python
async def failing_handler(task: int) -> int:
await asyncio.sleep(0.01)
if task % 2 == 0:
raise ValueError("Even task number")
return task
aqute = Aqute(
workers_count=2,
handle_coro=failing_handler,
total_failed_tasks_limit=5,
)
for i in range(10):
await aqute.add_task(i)
# This will raise AquteTooManyTasksFailedError cause we have enough failed tasks
# before all tasks are processed
async with aqute:
await aqute.wait_till_end()
```
## Barebone queue via Foreman
If you don't need retry flow and high-level helpers you can use `Foreman` for bare flow,
but still with rate limiting support:
```python
import asyncio
from random import random
from aqute.worker import Foreman
from aqute.ratelimiter import TokenBucketRateLimiter
async def handler(i: int) -> str:
await asyncio.sleep(0.01 + (0.09) * random())
return f"Success {i}"
async def main():
# These are the supported options for Foreman
foreman = Foreman(
handle_coro=handler,
workers_count=10,
rate_limiter=TokenBucketRateLimiter(5, 1),
input_task_queue_size=100,
)
for i in range(20):
await foreman.add_task(AquteTask(i, f"{i}"))
foreman.start()
result = []
for _ in range(20):
# Be aware that status and retries are not relevant here
# But you can check the error field of output
r = await foreman.get_handled_task()
assert r.error is None
logger.info(r.result)
result.append(r)
# Do not finalize before result extraction
await foreman.finalize()
```
# Some caveats
## Start load timeout
If no tasks will be provided, and you've set the timeout, Aqute will intentionally fail:
```python
try:
async with Aqute(
handle_coro=handler,
workers_count=10,
start_timeout_seconds=1,
) as aqute:
await asyncio.sleep(1.2)
except AquteError as exc:
logger.error(f"Aqute timeouted: {exc}")
```
## You can't wait on not started Aqute
```python
#
aqute = Aqute(handle_coro=handler, workers_count=10)
try:
await aqute.wait_till_end()
except AquteError as exc:
logger.error(f"Aqute cannot be waited here: {exc}")
```
# Misc
## Instance reuse after `stop()`
```python
# You can reuse same aqute instance after proper stop() call
aqute = Aqute(handle_coro=handler,workers_count=5)
async with aqute:
for i in range(10):
await aqute.add_task(i)
await aqute.wait_till_end()
async with aqute:
for i in range(10, 20):
await aqute.add_task(i)
await aqute.wait_till_end()
assert aqute.result_queue.qsize() == 20
```
## Type checking and generics
You should get error during type check if you would try to use wrong type with
`Aqute` methods (types are indered based on your provided handler):
```python
from aqute import Aqute
async def handler(i: int) -> str:
return f"success {i}"
async def main() -> None:
aqute = Aqute(
handle_coro=handler,
workers_count=10
)
# Mypy error: error: Argument 1 to "add_task" of "Aqute" has incompatible type "str"; expected "int" [arg-type]
await aqute.add_task("10")
```
You can also provide the expected types of in/out via generics mechanism:
```python
from aqute import Aqute
async def handler(i: int) -> str:
return f"success {i}"
async def main() -> None:
# Mypy error: Argument "handle_coro" to "Aqute" has incompatible type "Callable[[int], Coroutine[Any, Any, str]]"; expected "Callable[[int], Coroutine[Any, Any, int]]" [arg-type]
aqute = Aqute[int, int](
handle_coro=handler,
workers_count=10
)
await aqute.add_task(123)
```
Raw data
{
"_id": null,
"home_page": "https://github.com/insomnes/aqute",
"name": "aqute",
"maintainer": null,
"docs_url": null,
"requires_python": "<4.0,>=3.9",
"maintainer_email": null,
"keywords": "asyncio, producer-consumer, ratelimiting",
"author": "Mikhail Dengin",
"author_email": "denginm@gmail.com",
"download_url": "https://files.pythonhosted.org/packages/c0/06/32f554bfcc7352ea6e3256dcf423f3f5a3005ca420105b4072a604f01b33/aqute-0.9.2.tar.gz",
"platform": null,
"description": "# Aqute\n**A**sync **QU**eue **T**ask **E**ngine\n\nAqute is a minimalist yet potent Python library specifically designed \nfor hassle-free asynchronous task processing. Leveraging the power of async programming,\nAqute offers:\n\n - **Efficient Producer-Consumer Model**: Utilizing the Producer-Consumer pattern with\n multiple workers, Aqute ensures streamlined task distribution and swift concurrent processing.\n- **Worker count & Rate Limiting**: Regulate the execution rate of tasks and configure \nthe number of workers for concurrent processing, ensuring optimal resource utilization.\nYou can use pre-programmed rate limiters or provide your own rate limiting implementation.\n- **Resilient Retry Mechanism**: Tasks that encounter errors can automatically retry,\nwith options to specify which error types should trigger retries.\nException in handler is returned as error-value.\n- **Versatile task adding**: You can process the whole batch or add tasks on the fly,\ndepending on your needs.\n- **Lightweight and simple**: Aqute operates efficiently without relying on any\nexternal dependencies, ensuring seamless integration and minimal footprint in your projects.\n\nAqute simplifies task management in asynchronous landscapes, allowing developers\nto focus on the task logic rather than concurrency challenges.\n\n## Table of Contents\n- [Install](#install)\n- [Quickstart](#quickstart)\n- [How to use it?](#how-to-use-it)\n - [Simple batch processing](#simple-batch-processing)\n - [Result as async generator per completed](#result-as-async-generator-per-completed)\n - [Infinite loop](#infinite-loop)\n - [Rate limiting](#rate-limiting)\n - [Manual task adding, context manager and error retry](#manual-task-adding-context-manager-and-error-retry)\n - [Manual flow management and custom result queue](#manual-flow-management-and-custom-result-queue)\n - [Even more manual management and internal worker queue size](#even-more-manual-management-and-internal-worker-queue-size)\n - [Use priroty queue](#use-priroty-queue)\n - [Task timeout setting](#task-timeout-setting)\n - [Early stopping on too many failed tasks](#early-stopping-on-too-many-failed-tasks)\n - [Barebone queue via Foreman](#barebone-queue-via-foreman)\n- [Some caveats](#some-caveats)\n - [Start load timeout](#start-load-timeout)\n - [You can't wait on not started Aqute](#you-can-t-wait-on-not-started-aqute)\n- [Misc](#misc)\n - [Instance reuse after stop()](#instance-reuse-after-stop)\n - [Type checking and generics](#type-checking-and-generics)\n\n# Install\nPython 3.9+ required:\n```bash\npip install aqute\n```\n\n# Quickstart\nApply your async function to each item of some iterable and get list of wrapped in\n`AquteTask` results, ordered the same way:\n\n```python\nimport asyncio\nfrom aqute import Aqute\n\nasync def main():\n async def handler(i: int) -> int:\n await asyncio.sleep(i / 20)\n return i * 2\n\n aqute = Aqute(handle_coro=handler, workers_count=2)\n result = await aqute.apply_to_all(range(10))\n # Do not forget to extract result data from wrapper object with <result> property\n assert [t.result for t in result] == [i * 2 for i in range(10)]\n\nasyncio.run(main())\n```\n\n# How to use it?\nWhile a deep dive is available through Aqute's method docstrings, it's not necessary.\n\nAqute is easy to use for both simple and advanced workflows.\n\n## Simple batch processing\nThe easiest way to use Aqute is `apply_to_all()` method:\n\n```python\nimport asyncio\nimport logging\nfrom random import randint, random\n\nfrom aqute import Aqute, AquteError\n\n\nlogging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(name)s - %(message)s')\n\nlogger = logging.getLogger(\"main\")\n\nasync def handler(i: int) -> str:\n \"\"\"\n NOTE: This is a mock handler for demonstration purposes.\n Replace the logic below with your own specific task processing logic.\n \"\"\"\n await asyncio.sleep(0.01 + (0.09) * random())\n \n # Guaranties failures for some tasks of examples\n if i >= 19:\n raise KeyError(f\"The Key for {i}\")\n \n # Here we have some randomness, so you can see retry after errors in play\n r = randint(1, 101)\n if r >= 80:\n raise ValueError(f\"Got big number for {i}\")\n\n return f\"success {i}\"\n\nasync def main():\n # Getting started example, the most simple\n input_data = list(range(20))\n\n # This will apply handler to every item of iterable and return result as list\n # with task results ordered as input iterable\n aqute = Aqute(handle_coro=handler, workers_count=10, retry_count=2)\n result = await aqute.apply_to_all(input_data)\n # Each task result is wrapped in AquteTask instance\n assert [t.data for t in result] == input_data\n\n ...\n\n\nasyncio.run(main())\n\n```\n\n## Result as async generator per completed\n```python\n # Like previous but the result is async generator and the tasks are yielded\n # in completion order\n input_data = list(range(20))\n aqute = Aqute(handle_coro=handler, workers_count=10)\n\n done, with_errors = [], []\n # You can determine final task status with specific success field\n async for task in aqute.apply_to_each(input_data):\n if task.success:\n done.append(task)\n else:\n with_errors.append(task)\n\n assert len(done + with_errors) == len(input_data)\n\n```\n\n## Infinite loop\nYou can run aqute on \"infinite\" amount of tasks if needed and control \"end\"\nfrom outside. Outside of context or with `stop()` coro it will shutdown gracefully.\n\n```python\nimport asyncio\nimport logging\nfrom random import random\n\nfrom aqute import Aqute\n\n\nlogging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(name)s - %(message)s')\n\nlogger = logging.getLogger(\"main\")\n\nasync def handler(i: int) -> str:\n \"\"\"\n NOTE: This is a mock handler for demonstration purposes.\n \"\"\"\n await asyncio.sleep(0.01 + (0.009) * random())\n\n return f\"success {i}\"\n\nasync def main():\n \"\"\"\n This example shows \"infinite\"workflow for aqute.\n Here limited by 10_000 tasks, but it can be without any end.\n \"\"\"\n\n # Only to show ruling the done / not done status from outside\n TASK_LIMIT = 10_000\n done = asyncio.Event()\n\n aqute = Aqute(handle_coro=handler, workers_count=20)\n\n async def add_tasks():\n # You can add tasks externally\n for i in range(TASK_LIMIT):\n await aqute.add_task(i)\n # Sleep to simulate some delay between tasks adding (req, calc, etc.)\n await asyncio.sleep(0.01 + (0.009) * random())\n\n result = []\n\n async def collect_results():\n # You can collect and handle results externally too\n while len(result) < TASK_LIMIT:\n task = await aqute.get_task_result()\n result.append(task)\n done.set()\n\n async with aqute:\n asyncio.create_task(add_tasks())\n asyncio.create_task(collect_results())\n # Better to use await done.wait() instead of while loop,\n # but it's just for demonstration purposes\n while not done.is_set():\n logger.info(f\"Done tasks: {len(result):_}/{TASK_LIMIT:_}\")\n await asyncio.sleep(5)\n\n logger.info(f\"Done tasks: {len(result):_}/{TASK_LIMIT:_}\")\n\n\nasyncio.run(main())\n```\n\n## Rate limiting\nYou can also add RateLimiter instance to Aqute for rate limiting:\n```python\n\n from aqute.ratelimiter import TokenBucketRateLimiter\n\n # Applied rate limiter with 5 handler calls per second.\n input_data = list(range(20))\n r_limit = TokenBucketRateLimiter(5, 1)\n aqute = Aqute(handle_coro=handler, workers_count=10, rate_limiter=r_limit)\n result = []\n async for task in aqute.apply_to_each(input_data):\n result.append(task)\n\n assert len(result) == len(input_data)\n```\n\nThere are three available `RateLimiter` implementations:\n- `TokenBucketRateLimiter`: steady rate by default, burstable with `allow_burst` option;\n- `SlidingRateLimiter`: next call will be available after enough time from the oldest one;\n- `PerWorkerRateLimiter`: enforces separate rate limits for each unique worker with separate `TokenBucketRateLimiter` instances;\n- `RandomizedIntervalRateLimiter`: introduces more random intervals between each call,\nbut enforcing `max_rate` over `time_period` limit.\n\nYou can write your own `RateLimiter` implementation with specific algorithm if needed.\n\n## Manual task adding, context manager and error retry\nThis can be most useful if not all of your tasks are available at the start:\n```python\n # You can add tasks manually and also start/stop aqute with context\n # manager. And even add tasks on the fly.\n # Aqute is reliable for errors retry, you can specify your own\n # retry count (0 is used for no retries by default) and specify errors to retry or not\n # to keep retrying on all errors\n aqute = Aqute(\n handle_coro=handler,\n workers_count=10,\n # We we will retry 5 more times after first fail\n retry_count=5,\n # We retry only ValueError here\n specific_errors_to_retry=(ValueError,),\n # Either you can set this option to not retry only on ValueError instead\n errors_to_not_retry=(ValueError,),\n )\n for i in range(10):\n # You also can use your own task id for identification\n await aqute.add_task(i, task_id=f\"My task id: {i}\")\n\n\n async with aqute:\n await asyncio.sleep(0.1)\n for i in range(10, 15):\n await aqute.add_task(i, task_id=f\"My task id: {i}\")\n\n await asyncio.sleep(0.1)\n for i in range(15, 20):\n await aqute.add_task(i, task_id=f\"My task id: {i}\")\n\n # Set waiting for finalization when you have all tasks added\n await aqute.wait_till_end()\n\n # You can simply extract all results from queue with this method if aqute has \n # finished, returns the list of AquteTask\n for tr in aqute.extract_all_results():\n logger.info(f\"{tr.success, tr.error, tr.result}\")\n```\n\n## Manual flow management and custom result queue\n```python\n # You can manage the whole workflow manually if needed and use your own\n # result queue instance (with limited size for example)\n result_q = asyncio.Queue(5)\n\n aqute = Aqute(handle_coro=handler, workers_count=10, result_queue=result_q)\n for i in range(10):\n # We can't await here cause we will hang without queue emptying\n asyncio.create_task(aqute.add_task(i))\n await asyncio.sleep(0.1)\n\n # Starting the processing\n aqute.start()\n # Sleep enough for possibly all task to finish\n await asyncio.sleep(1)\n\n # We can see our result sizing works\n assert result_q.qsize() == 5\n for _ in range(5):\n await result_q.get()\n\n # Now wait till all finished via specific method, this also notifies\n # aqute that we have added all tasks\n await aqute.wait_till_end()\n assert result_q.qsize() == 5\n # Stop the aqute\n await aqute.stop()\n```\n\n## Even more manual management and internal worker queue size\n```python\n # You can configure internal queue size for consumers if you want it to be limited\n aqute = Aqute(\n handle_coro=handler, workers_count=10, input_task_queue_size=2\n )\n for i in range(10):\n await aqute.add_task(i)\n # Should set it before awaiting bare start() if we want\n aqute.set_all_tasks_added()\n\n aqute_run_aiotask = aqute.start()\n await aqute_run_aiotask\n await aqute.stop()\n\n assert aqute.result_queue.qsize() == 10\n```\n\n## Use priroty queue\nYou can prioritize tasks by setting `use_priority_queue` flag:\n\n```python\n async def handler(i: int) -> int:\n return i\n\n # Set flag for prioritezed queue, default task priority is 1_000_000\n aqute = Aqute(\n workers_count=1,\n handle_coro=handler,\n use_priority_queue=True,\n )\n await aqute.add_task(1_000_000)\n await aqute.add_task(10, task_priority=10)\n await aqute.add_task(5, task_priority=5)\n await aqute.add_task(10, task_priority=10)\n await aqute.add_task(1, task_priority=1)\n\n async with aqute:\n await aqute.wait_till_end()\n\n results = aqute.extract_all_results()\n assert [t.data for t in results] == [1, 5, 10, 10, 1_000_000]\n```\n\n## Task timeout setting\nThe `task_timeout_seconds` option is used to specify a time limit for each task.\nIf a task exceeds this duration, it is considered a timeout and is\nhandled according to the specified retry logic.\n\n### Timeout with retries\nBy default setting timeout will result in task timeout if task exceeds value, but\nretry logic is applied if `retry_count > 0`.\n\n```python\naqute = Aqute(\n handle_coro=your_handler_coroutine,\n task_timeout_seconds=5,\n retry_count=2,\n # other parameters\n)\n```\n \n### Do not retry on timeout\nTo disable this behavior you can set `errors_to_not_retry` with `AquteTaskTimeoutError`:\n```python\naqute = Aqute(\n handle_coro=your_handler_coroutine,\n task_timeout_seconds=5,\n retry_count=2,\n errors_to_not_retry=AquteTaskTimeoutError,\n # other parameters\n)\n```\n\n## Early stopping on too many failed tasks\nIf you want to stop the processing when too many tasks have failed, you can use the\n`total_failed_tasks_limit` option. This will raise `AquteTooManyTasksFailedError` if\nthe limit is reached before all tasks are processed:\n```python\nasync def failing_handler(task: int) -> int:\n await asyncio.sleep(0.01)\n if task % 2 == 0:\n raise ValueError(\"Even task number\")\n return task\n\naqute = Aqute(\n workers_count=2,\n handle_coro=failing_handler,\n total_failed_tasks_limit=5,\n)\nfor i in range(10):\n await aqute.add_task(i)\n\n# This will raise AquteTooManyTasksFailedError cause we have enough failed tasks\n# before all tasks are processed\nasync with aqute:\n await aqute.wait_till_end()\n```\n\n## Barebone queue via Foreman\nIf you don't need retry flow and high-level helpers you can use `Foreman` for bare flow,\nbut still with rate limiting support:\n```python\nimport asyncio\nfrom random import random\n\nfrom aqute.worker import Foreman\nfrom aqute.ratelimiter import TokenBucketRateLimiter\n\nasync def handler(i: int) -> str:\n await asyncio.sleep(0.01 + (0.09) * random())\n return f\"Success {i}\"\n\nasync def main():\n # These are the supported options for Foreman\n foreman = Foreman(\n handle_coro=handler,\n workers_count=10,\n rate_limiter=TokenBucketRateLimiter(5, 1),\n input_task_queue_size=100,\n )\n for i in range(20):\n await foreman.add_task(AquteTask(i, f\"{i}\"))\n\n foreman.start()\n\n result = []\n for _ in range(20):\n # Be aware that status and retries are not relevant here\n # But you can check the error field of output\n r = await foreman.get_handled_task()\n assert r.error is None\n logger.info(r.result)\n result.append(r)\n\n # Do not finalize before result extraction\n await foreman.finalize()\n```\n\n\n# Some caveats\n## Start load timeout\nIf no tasks will be provided, and you've set the timeout, Aqute will intentionally fail:\n```python\n try:\n async with Aqute(\n handle_coro=handler,\n workers_count=10,\n start_timeout_seconds=1,\n ) as aqute:\n await asyncio.sleep(1.2)\n except AquteError as exc:\n logger.error(f\"Aqute timeouted: {exc}\")\n```\n\n## You can't wait on not started Aqute\n```python\n # \n aqute = Aqute(handle_coro=handler, workers_count=10)\n\n try:\n await aqute.wait_till_end()\n except AquteError as exc:\n logger.error(f\"Aqute cannot be waited here: {exc}\")\n```\n\n# Misc\n## Instance reuse after `stop()`\n```python\n # You can reuse same aqute instance after proper stop() call\n aqute = Aqute(handle_coro=handler,workers_count=5)\n async with aqute:\n for i in range(10):\n await aqute.add_task(i)\n await aqute.wait_till_end()\n\n async with aqute:\n for i in range(10, 20):\n await aqute.add_task(i)\n await aqute.wait_till_end()\n\n assert aqute.result_queue.qsize() == 20\n```\n\n## Type checking and generics\nYou should get error during type check if you would try to use wrong type with\n`Aqute` methods (types are indered based on your provided handler):\n```python\nfrom aqute import Aqute\n\nasync def handler(i: int) -> str:\n return f\"success {i}\"\n\n\nasync def main() -> None:\n aqute = Aqute(\n handle_coro=handler,\n workers_count=10\n )\n # Mypy error: error: Argument 1 to \"add_task\" of \"Aqute\" has incompatible type \"str\"; expected \"int\" [arg-type]\n await aqute.add_task(\"10\") \n```\n\nYou can also provide the expected types of in/out via generics mechanism:\n```python\nfrom aqute import Aqute\n\nasync def handler(i: int) -> str:\n return f\"success {i}\"\n\n\nasync def main() -> None:\n # Mypy error: Argument \"handle_coro\" to \"Aqute\" has incompatible type \"Callable[[int], Coroutine[Any, Any, str]]\"; expected \"Callable[[int], Coroutine[Any, Any, int]]\" [arg-type]\n aqute = Aqute[int, int](\n handle_coro=handler,\n workers_count=10\n )\n\n await aqute.add_task(123)\n```\n\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Async QUeue Task Engine",
"version": "0.9.2",
"project_urls": {
"Homepage": "https://github.com/insomnes/aqute",
"Repository": "https://github.com/insomnes/aqute"
},
"split_keywords": [
"asyncio",
" producer-consumer",
" ratelimiting"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "c88b1caa26186f6416d8aabfef3401491112b52b860723356ce9fbbd201b428e",
"md5": "194b2683c458fbe9129dad41a597baac",
"sha256": "f7fa2c8556beda987a34f0b7d0210c633a034965a8c591ada8fd58eb257bb4fa"
},
"downloads": -1,
"filename": "aqute-0.9.2-py3-none-any.whl",
"has_sig": false,
"md5_digest": "194b2683c458fbe9129dad41a597baac",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<4.0,>=3.9",
"size": 17407,
"upload_time": "2024-10-08T19:25:20",
"upload_time_iso_8601": "2024-10-08T19:25:20.034900Z",
"url": "https://files.pythonhosted.org/packages/c8/8b/1caa26186f6416d8aabfef3401491112b52b860723356ce9fbbd201b428e/aqute-0.9.2-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "c00632f554bfcc7352ea6e3256dcf423f3f5a3005ca420105b4072a604f01b33",
"md5": "43980e3334af147c2c51d33d9aa50e1a",
"sha256": "23e2c8dff912f076068baad8b3fbf867e5317d1ffe88853099ff28bbfddb7ae5"
},
"downloads": -1,
"filename": "aqute-0.9.2.tar.gz",
"has_sig": false,
"md5_digest": "43980e3334af147c2c51d33d9aa50e1a",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<4.0,>=3.9",
"size": 20088,
"upload_time": "2024-10-08T19:25:21",
"upload_time_iso_8601": "2024-10-08T19:25:21.127993Z",
"url": "https://files.pythonhosted.org/packages/c0/06/32f554bfcc7352ea6e3256dcf423f3f5a3005ca420105b4072a604f01b33/aqute-0.9.2.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-10-08 19:25:21",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "insomnes",
"github_project": "aqute",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "aqute"
}