Name | async-queue-manager JSON |
Version |
0.1.1
JSON |
| download |
home_page | None |
Summary | Asynchronous Task Queue, For managing and executing tasks concurrently using asyncio. |
upload_time | 2025-08-24 19:39:42 |
maintainer | None |
docs_url | None |
author | None |
requires_python | >=3.13 |
license | None |
keywords |
queue
asynchronous
concurrency
asyncio
|
VCS |
 |
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
# Asynchronous Task Queue Manager

[](https://opensource.org/licenses/MIT)
An efficient and robust asynchronous task queue for Python, built on top of `asyncio`. It's designed for handling concurrent tasks with support for prioritization, dynamic worker scaling, and graceful shutdowns.
## Key Features
- **Asynchronous First:** Built from the ground up using Python's modern `asyncio` library for high-performance I/O-bound tasks.
- **Priority Queues:** Supports task prioritization out of the box. Lower priority numbers are processed first.
- **Dynamic Worker Management:** Automatically scales worker tasks based on queue size to efficiently process jobs.
- **Sync & Async Task Support:** Seamlessly handles both `async` coroutines and regular synchronous functions.
-e **Timeout Control:** Set a global timeout for the queue to prevent it from running indefinitely.
- **Graceful Shutdown:** Configure how the queue handles pending tasks on exit—either cancel them immediately or complete high-priority ones before stopping.
- **Multiple Modes:** Run in `finite` mode for a set batch of tasks or `infinite` mode for long-running services that continuously process tasks.
## Installation
You can install the package from PyPI:
```bash
pip install async-queue-manager
```
## Basic Usage
Here's how to get started with the `TaskQueue` in just a few lines of code.
```python
import asyncio
import time
# 1. Import the TaskQueue
from async_queue_manager import TaskQueue
# 2. Define some tasks (can be async or regular functions)
async def async_task(duration, name):
"""An example asynchronous task."""
print(f"Starting async task: {name}")
await asyncio.sleep(duration)
print(f"✅ Finished async task: {name}")
def sync_task(duration, name):
"""An example synchronous task."""
print(f"Starting sync task: {name}")
time.sleep(duration)
print(f"✅ Finished sync task: {name}")
async def main():
# 3. Create a TaskQueue instance
task_queue = TaskQueue()
# 4. Add tasks to the queue
print("Adding tasks to the queue...")
task_queue.add_task(async_task, 1, "A (Low Prio)")
task_queue.add_task(sync_task, 2, "B (Sync)")
task_queue.add_task(async_task, 0.5, "C (High Prio)")
# 5. Run the queue and wait for all tasks to complete
print("🚀 Starting the queue...")
start_time = time.monotonic()
await task_queue.run()
end_time = time.monotonic()
print(f"🎉 All tasks completed in {end_time - start_time:.2f} seconds!")
if __name__ == "__main__":
asyncio.run(main())
```
## Advanced Usage
### Task Prioritization
You can assign a `priority` to each task. Tasks with a lower number have a higher priority and will be executed first.
```python
import asyncio
from async_queue_manager import TaskQueue
async def my_task(name):
print(f"Executing task: {name}")
await asyncio.sleep(0.1)
async def main():
queue = TaskQueue()
# Add tasks with different priorities
queue.add_task(my_task, "Task A (Priority 5)", priority=5)
queue.add_task(my_task, "Task B (Priority 10)", priority=10)
queue.add_task(my_task, "Task C (Priority 1)", priority=1) # Highest priority
# The queue will execute tasks in this order: C, A, B
await queue.run()
if __name__ == "__main__":
asyncio.run(main())
```
### Timeout and Shutdown Policy
You can control how the queue behaves when it times out using `queue_timeout` and `on_exit`. You can also mark critical tasks with `must_complete=True` to ensure they finish even if the queue times out.
- `on_exit='complete_priority'` (default): When the timeout is reached, the queue stops accepting new tasks but will wait for any tasks marked `must_complete=True` to finish. Other tasks are cancelled.
- `on_exit='cancel'`: When the timeout is reached, the queue immediately cancels all running and pending tasks.
```python
import asyncio
from async_queue_manager import TaskQueue
async def long_running_task(duration, name):
print(f"Starting task: {name} (will run for {duration}s)")
await asyncio.sleep(duration)
print(f"✅ Finished task: {name}")
async def main():
# Initialize with the 'complete_priority' shutdown policy
queue = TaskQueue(on_exit='complete_priority')
# This task will likely be cancelled by the timeout
queue.add_task(long_running_task, 4, "Normal Task")
# This task will be allowed to finish because must_complete is True
queue.add_task(long_running_task, 4, "Critical Task", must_complete=True)
print("Running queue with a 2-second timeout...")
await queue.run(queue_timeout=2)
print("Queue has finished or timed out.")
# Expected output will show "Critical Task" finishing after the timeout is announced.
if __name__ == "__main__":
asyncio.run(main())
```
## API Reference
### `TaskQueue(...)`
The main class for managing the queue.
| Parameter | Type | Description | Default |
|-----------------|-------------------------------------|---------------------------------------------------------------------------------|-----------------------|
| `size` | `int` | The maximum size of the queue. `0` means infinite. | `0` |
| `queue_timeout` | `int` | Default timeout in seconds for the queue when `run()` is called. | `0` (no timeout) |
| `on_exit` | `'cancel'` or `'complete_priority'` | The policy for handling tasks on shutdown or timeout. | `'complete_priority'` |
| `mode` | `'finite'` or `'infinite'` | `'finite'` stops when empty; `'infinite'` keeps the queue running indefinitely. | `'finite'` |
### `TaskQueue.add_task(...)`
Adds a new task to the queue.
| Parameter | Type | Description | Default |
|---------------------|------------------------|--------------------------------------------------------------------------------------------|---------|
| `task` | `Callable` `Coroutine` | The async or sync function to execute. | |
| `*args`, `**kwargs` | `Any` | Arguments to pass to the task function. | |
| `must_complete` | `bool` | If `True`, task is completed even if queue times out (with `on_exit='complete_priority'`). | `False` |
| `priority` | `int` | The priority of the task. A lower number means higher priority. | `3` |
### `await TaskQueue.run(...)`
Starts the queue workers and waits for tasks to complete.
| Parameter | Type | Description | Default |
|-----------------|-------|------------------------------------------------------|---------|
| `queue_timeout` | `int` | Overrides the default timeout for this specific run. | `None` |
## Contributing
Contributions are welcome! If you'd like to contribute, please follow these steps:
1. Fork the repository.
2. Clone your fork and set up the development environment:
```bash
git clone [https://github.com/YOUR_USERNAME/taskqueue.git](https://github.com/YOUR_USERNAME/taskqueue.git)
cd taskqueue
pip install -e .[dev]
```
3. Make your changes and add tests for them.
4. Run the tests to ensure everything is working:
```bash
pytest
```
5. Submit a pull request with a clear description of your changes.
## License
This project is licensed under the MIT License. See the `LICENSE` file for details.
Raw data
{
"_id": null,
"home_page": null,
"name": "async-queue-manager",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.13",
"maintainer_email": null,
"keywords": "queue, asynchronous, concurrency, asyncio",
"author": null,
"author_email": "Ichinga Samuel <ichingasamuel@gmail.com>",
"download_url": "https://files.pythonhosted.org/packages/69/2d/4f08a7dc60c65012fba8d378c7622a2c30c7297deb189f6706ed91cc3d18/async_queue_manager-0.1.1.tar.gz",
"platform": null,
"description": "# Asynchronous Task Queue Manager\r\n\r\n\r\n[](https://opensource.org/licenses/MIT)\r\n\r\nAn efficient and robust asynchronous task queue for Python, built on top of `asyncio`. It's designed for handling concurrent tasks with support for prioritization, dynamic worker scaling, and graceful shutdowns.\r\n\r\n## Key Features\r\n\r\n- **Asynchronous First:** Built from the ground up using Python's modern `asyncio` library for high-performance I/O-bound tasks.\r\n- **Priority Queues:** Supports task prioritization out of the box. Lower priority numbers are processed first.\r\n- **Dynamic Worker Management:** Automatically scales worker tasks based on queue size to efficiently process jobs.\r\n- **Sync & Async Task Support:** Seamlessly handles both `async` coroutines and regular synchronous functions.\r\n-e **Timeout Control:** Set a global timeout for the queue to prevent it from running indefinitely.\r\n- **Graceful Shutdown:** Configure how the queue handles pending tasks on exit\u2014either cancel them immediately or complete high-priority ones before stopping.\r\n- **Multiple Modes:** Run in `finite` mode for a set batch of tasks or `infinite` mode for long-running services that continuously process tasks.\r\n\r\n## Installation\r\n\r\nYou can install the package from PyPI:\r\n\r\n```bash\r\npip install async-queue-manager\r\n```\r\n\r\n## Basic Usage\r\n\r\nHere's how to get started with the `TaskQueue` in just a few lines of code.\r\n\r\n```python\r\nimport asyncio\r\nimport time\r\n\r\n# 1. Import the TaskQueue\r\nfrom async_queue_manager import TaskQueue\r\n\r\n# 2. Define some tasks (can be async or regular functions)\r\nasync def async_task(duration, name):\r\n \"\"\"An example asynchronous task.\"\"\"\r\n print(f\"Starting async task: {name}\")\r\n await asyncio.sleep(duration)\r\n print(f\"\u2705 Finished async task: {name}\")\r\n\r\ndef sync_task(duration, name):\r\n \"\"\"An example synchronous task.\"\"\"\r\n print(f\"Starting sync task: {name}\")\r\n time.sleep(duration)\r\n print(f\"\u2705 Finished sync task: {name}\")\r\n\r\nasync def main():\r\n # 3. Create a TaskQueue instance\r\n task_queue = TaskQueue()\r\n\r\n # 4. Add tasks to the queue\r\n print(\"Adding tasks to the queue...\")\r\n task_queue.add_task(async_task, 1, \"A (Low Prio)\")\r\n task_queue.add_task(sync_task, 2, \"B (Sync)\")\r\n task_queue.add_task(async_task, 0.5, \"C (High Prio)\")\r\n\r\n # 5. Run the queue and wait for all tasks to complete\r\n print(\"\ud83d\ude80 Starting the queue...\")\r\n start_time = time.monotonic()\r\n await task_queue.run()\r\n end_time = time.monotonic()\r\n \r\n print(f\"\ud83c\udf89 All tasks completed in {end_time - start_time:.2f} seconds!\")\r\n\r\nif __name__ == \"__main__\":\r\n asyncio.run(main())\r\n```\r\n\r\n## Advanced Usage\r\n\r\n### Task Prioritization\r\n\r\nYou can assign a `priority` to each task. Tasks with a lower number have a higher priority and will be executed first.\r\n\r\n```python\r\nimport asyncio\r\nfrom async_queue_manager import TaskQueue\r\n\r\nasync def my_task(name):\r\n print(f\"Executing task: {name}\")\r\n await asyncio.sleep(0.1)\r\n\r\nasync def main():\r\n queue = TaskQueue()\r\n\r\n # Add tasks with different priorities\r\n queue.add_task(my_task, \"Task A (Priority 5)\", priority=5)\r\n queue.add_task(my_task, \"Task B (Priority 10)\", priority=10)\r\n queue.add_task(my_task, \"Task C (Priority 1)\", priority=1) # Highest priority\r\n\r\n # The queue will execute tasks in this order: C, A, B\r\n await queue.run()\r\n\r\nif __name__ == \"__main__\":\r\n asyncio.run(main())\r\n```\r\n\r\n### Timeout and Shutdown Policy\r\n\r\nYou can control how the queue behaves when it times out using `queue_timeout` and `on_exit`. You can also mark critical tasks with `must_complete=True` to ensure they finish even if the queue times out.\r\n\r\n- `on_exit='complete_priority'` (default): When the timeout is reached, the queue stops accepting new tasks but will wait for any tasks marked `must_complete=True` to finish. Other tasks are cancelled.\r\n- `on_exit='cancel'`: When the timeout is reached, the queue immediately cancels all running and pending tasks.\r\n\r\n```python\r\nimport asyncio\r\nfrom async_queue_manager import TaskQueue\r\n\r\nasync def long_running_task(duration, name):\r\n print(f\"Starting task: {name} (will run for {duration}s)\")\r\n await asyncio.sleep(duration)\r\n print(f\"\u2705 Finished task: {name}\")\r\n\r\nasync def main():\r\n # Initialize with the 'complete_priority' shutdown policy\r\n queue = TaskQueue(on_exit='complete_priority')\r\n\r\n # This task will likely be cancelled by the timeout\r\n queue.add_task(long_running_task, 4, \"Normal Task\")\r\n\r\n # This task will be allowed to finish because must_complete is True\r\n queue.add_task(long_running_task, 4, \"Critical Task\", must_complete=True)\r\n\r\n print(\"Running queue with a 2-second timeout...\")\r\n await queue.run(queue_timeout=2)\r\n print(\"Queue has finished or timed out.\")\r\n # Expected output will show \"Critical Task\" finishing after the timeout is announced.\r\n\r\nif __name__ == \"__main__\":\r\n asyncio.run(main())\r\n```\r\n\r\n## API Reference\r\n\r\n### `TaskQueue(...)`\r\n\r\nThe main class for managing the queue.\r\n\r\n| Parameter | Type | Description | Default |\r\n|-----------------|-------------------------------------|---------------------------------------------------------------------------------|-----------------------|\r\n| `size` | `int` | The maximum size of the queue. `0` means infinite. | `0` |\r\n| `queue_timeout` | `int` | Default timeout in seconds for the queue when `run()` is called. | `0` (no timeout) |\r\n| `on_exit` | `'cancel'` or `'complete_priority'` | The policy for handling tasks on shutdown or timeout. | `'complete_priority'` |\r\n| `mode` | `'finite'` or `'infinite'` | `'finite'` stops when empty; `'infinite'` keeps the queue running indefinitely. | `'finite'` |\r\n\r\n### `TaskQueue.add_task(...)`\r\n\r\nAdds a new task to the queue.\r\n\r\n| Parameter | Type | Description | Default |\r\n|---------------------|------------------------|--------------------------------------------------------------------------------------------|---------|\r\n| `task` | `Callable` `Coroutine` | The async or sync function to execute. | |\r\n| `*args`, `**kwargs` | `Any` | Arguments to pass to the task function. | |\r\n| `must_complete` | `bool` | If `True`, task is completed even if queue times out (with `on_exit='complete_priority'`). | `False` |\r\n| `priority` | `int` | The priority of the task. A lower number means higher priority. | `3` |\r\n\r\n### `await TaskQueue.run(...)`\r\n\r\nStarts the queue workers and waits for tasks to complete.\r\n\r\n| Parameter | Type | Description | Default |\r\n|-----------------|-------|------------------------------------------------------|---------|\r\n| `queue_timeout` | `int` | Overrides the default timeout for this specific run. | `None` |\r\n\r\n\r\n## Contributing\r\n\r\nContributions are welcome! If you'd like to contribute, please follow these steps:\r\n\r\n1. Fork the repository.\r\n2. Clone your fork and set up the development environment:\r\n ```bash\r\n git clone [https://github.com/YOUR_USERNAME/taskqueue.git](https://github.com/YOUR_USERNAME/taskqueue.git)\r\n cd taskqueue\r\n pip install -e .[dev]\r\n ```\r\n3. Make your changes and add tests for them.\r\n4. Run the tests to ensure everything is working:\r\n ```bash\r\n pytest\r\n ```\r\n5. Submit a pull request with a clear description of your changes.\r\n\r\n## License\r\n\r\nThis project is licensed under the MIT License. See the `LICENSE` file for details.\r\n",
"bugtrack_url": null,
"license": null,
"summary": "Asynchronous Task Queue, For managing and executing tasks concurrently using asyncio.",
"version": "0.1.1",
"project_urls": {
"Bug Tracker": "https://github.com/Ichinga-Samuel/taskqueue/issues",
"Homepage": "https://github.com/Ichinga-Samuel/taskqueue"
},
"split_keywords": [
"queue",
" asynchronous",
" concurrency",
" asyncio"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "6d45b0f6a22cf50c589fc97fca453b0285a7987a4ebb108ec5014a01503c7fbe",
"md5": "7970a765071128d00c171e694a3d2cad",
"sha256": "40688d9a14fb9f0afde72048bfcebe4c8bdce53184cd62a0b5345d7edfe477f6"
},
"downloads": -1,
"filename": "async_queue_manager-0.1.1-py3-none-any.whl",
"has_sig": false,
"md5_digest": "7970a765071128d00c171e694a3d2cad",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.13",
"size": 8592,
"upload_time": "2025-08-24T19:39:40",
"upload_time_iso_8601": "2025-08-24T19:39:40.430646Z",
"url": "https://files.pythonhosted.org/packages/6d/45/b0f6a22cf50c589fc97fca453b0285a7987a4ebb108ec5014a01503c7fbe/async_queue_manager-0.1.1-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "692d4f08a7dc60c65012fba8d378c7622a2c30c7297deb189f6706ed91cc3d18",
"md5": "69dd6e7a41c1f88903f3441052c21c47",
"sha256": "bb2340e4b159f7aadb0d66fc25b4ff8a3eef41e7bd247cfed71f35deafe60df4"
},
"downloads": -1,
"filename": "async_queue_manager-0.1.1.tar.gz",
"has_sig": false,
"md5_digest": "69dd6e7a41c1f88903f3441052c21c47",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.13",
"size": 10845,
"upload_time": "2025-08-24T19:39:42",
"upload_time_iso_8601": "2025-08-24T19:39:42.122284Z",
"url": "https://files.pythonhosted.org/packages/69/2d/4f08a7dc60c65012fba8d378c7622a2c30c7297deb189f6706ed91cc3d18/async_queue_manager-0.1.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-08-24 19:39:42",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "Ichinga-Samuel",
"github_project": "taskqueue",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "async-queue-manager"
}