Name | kew JSON |
Version |
0.1.4
JSON |
| download |
home_page | None |
Summary | A flexible async task queue manager for Python applications |
upload_time | 2024-11-03 04:01:33 |
maintainer | None |
docs_url | None |
author | None |
requires_python | >=3.8 |
license | None |
keywords |
async
concurrent
manager
queue
redis
task
|
VCS |
|
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
# Kew Task Queue Manager
A robust, Redis-backed asynchronous task queue manager for Python applications with support for priority-based queues and circuit breaker patterns.
## Features
- Multiple named queues with independent configurations
- Priority-based task scheduling with millisecond precision
- Redis-backed persistence for reliability
- Configurable worker pools per queue
- Built-in circuit breaker for fault tolerance
- Comprehensive task lifecycle management
- Automatic task expiration (24-hour default)
- Detailed logging and monitoring
- Graceful shutdown handling
- Thread-safe operations
## Installation
```bash
pip install kew
```
## Quick Start
```python
import asyncio
from kew import TaskQueueManager, QueueConfig, QueuePriority
async def example_task(x: int):
await asyncio.sleep(1)
return x * 2
async def main():
# Initialize the task queue manager with Redis connection
manager = TaskQueueManager(redis_url="redis://localhost:6379")
await manager.initialize()
# Create a high-priority queue
await manager.create_queue(QueueConfig(
name="high_priority",
max_workers=4,
max_size=1000,
priority=QueuePriority.HIGH
))
# Submit a task
task_info = await manager.submit_task(
task_id="task1",
queue_name="high_priority",
task_type="multiplication",
task_func=example_task,
priority=QueuePriority.HIGH,
x=5
)
# Check task status
await asyncio.sleep(2)
status = await manager.get_task_status("task1")
print(f"Task Result: {status.result}")
# Graceful shutdown
await manager.shutdown()
if __name__ == "__main__":
asyncio.run(main())
```
## Queue Configuration
### Creating Queues
```python
from kew import QueueConfig, QueuePriority
# Create a high-priority queue with 4 workers
await manager.create_queue(QueueConfig(
name="critical",
max_workers=4,
max_size=1000,
priority=QueuePriority.HIGH
))
```
### Queue Priority Levels
- `QueuePriority.HIGH` (1)
- `QueuePriority.MEDIUM` (2)
- `QueuePriority.LOW` (3)
Tasks within the same priority level are processed in FIFO order with millisecond precision.
## Task Management
### Submitting Tasks
```python
task_info = await manager.submit_task(
task_id="unique_id",
queue_name="critical",
task_type="example",
task_func=my_async_function,
priority=QueuePriority.HIGH,
*args,
**kwargs
)
```
### Monitoring Task Status
```python
status = await manager.get_task_status("unique_id")
print(f"Status: {status.status}") # QUEUED, PROCESSING, COMPLETED, FAILED
print(f"Queue: {status.queue_name}")
print(f"Priority: {status.priority}")
print(f"Result: {status.result}")
print(f"Error: {status.error}")
```
### Queue Status Monitoring
```python
status = await manager.get_queue_status("critical")
print(f"Queue Size: {status['queued_tasks']}")
print(f"Active Workers: {status['current_workers']}")
print(f"Circuit Breaker: {status['circuit_breaker_status']}")
```
## Advanced Features
### Circuit Breaker
Each queue has a built-in circuit breaker that helps prevent cascade failures:
- Opens after 3 consecutive failures (configurable)
- Auto-resets after 60 seconds (configurable)
- Provides circuit state monitoring
### Task Expiration
Tasks automatically expire after 24 hours (configurable) to prevent resource leaks.
### Redis Configuration
```python
manager = TaskQueueManager(
redis_url="redis://username:password@hostname:6379/0",
cleanup_on_start=True # Optional: cleans up existing tasks on startup
)
```
## Error Handling
The system handles various error scenarios:
- `TaskAlreadyExistsError`: Raised when submitting a task with a duplicate ID
- `TaskNotFoundError`: Raised when querying a non-existent task
- `QueueNotFoundError`: Raised when accessing an undefined queue
- `QueueProcessorError`: Raised for queue processing failures
## API Reference
### TaskQueueManager
Core Methods:
- `async initialize()`
- `async create_queue(config: QueueConfig)`
- `async submit_task(task_id, queue_name, task_type, task_func, priority, *args, **kwargs)`
- `async get_task_status(task_id)`
- `async get_queue_status(queue_name)`
- `async shutdown(wait=True, timeout=5.0)`
### QueueConfig
Configuration Parameters:
- `name: str`
- `max_workers: int`
- `max_size: int`
- `priority: QueuePriority`
## Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
## License
This project is licensed under the MIT License - see the LICENSE file for details.
Raw data
{
"_id": null,
"home_page": null,
"name": "kew",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.8",
"maintainer_email": null,
"keywords": "async, concurrent, manager, queue, redis, task",
"author": null,
"author_email": "Rach Pradhan <rach@rachit.ai>",
"download_url": "https://files.pythonhosted.org/packages/9f/c1/85e8d5f9c95c12c162121344ecc28473d9cc0507be0f1c890f2b6e401cbb/kew-0.1.4.tar.gz",
"platform": null,
"description": "# Kew Task Queue Manager\n\nA robust, Redis-backed asynchronous task queue manager for Python applications with support for priority-based queues and circuit breaker patterns.\n\n## Features\n\n- Multiple named queues with independent configurations\n- Priority-based task scheduling with millisecond precision\n- Redis-backed persistence for reliability\n- Configurable worker pools per queue\n- Built-in circuit breaker for fault tolerance\n- Comprehensive task lifecycle management\n- Automatic task expiration (24-hour default)\n- Detailed logging and monitoring\n- Graceful shutdown handling\n- Thread-safe operations\n\n## Installation\n\n```bash\npip install kew\n```\n\n## Quick Start\n\n```python\nimport asyncio\nfrom kew import TaskQueueManager, QueueConfig, QueuePriority\n\nasync def example_task(x: int):\n await asyncio.sleep(1)\n return x * 2\n\nasync def main():\n # Initialize the task queue manager with Redis connection\n manager = TaskQueueManager(redis_url=\"redis://localhost:6379\")\n await manager.initialize()\n \n # Create a high-priority queue\n await manager.create_queue(QueueConfig(\n name=\"high_priority\",\n max_workers=4,\n max_size=1000,\n priority=QueuePriority.HIGH\n ))\n \n # Submit a task\n task_info = await manager.submit_task(\n task_id=\"task1\",\n queue_name=\"high_priority\",\n task_type=\"multiplication\",\n task_func=example_task,\n priority=QueuePriority.HIGH,\n x=5\n )\n \n # Check task status\n await asyncio.sleep(2)\n status = await manager.get_task_status(\"task1\")\n print(f\"Task Result: {status.result}\")\n \n # Graceful shutdown\n await manager.shutdown()\n\nif __name__ == \"__main__\":\n asyncio.run(main())\n```\n\n## Queue Configuration\n\n### Creating Queues\n\n```python\nfrom kew import QueueConfig, QueuePriority\n\n# Create a high-priority queue with 4 workers\nawait manager.create_queue(QueueConfig(\n name=\"critical\",\n max_workers=4,\n max_size=1000,\n priority=QueuePriority.HIGH\n))\n```\n\n### Queue Priority Levels\n\n- `QueuePriority.HIGH` (1)\n- `QueuePriority.MEDIUM` (2)\n- `QueuePriority.LOW` (3)\n\nTasks within the same priority level are processed in FIFO order with millisecond precision.\n\n## Task Management\n\n### Submitting Tasks\n\n```python\ntask_info = await manager.submit_task(\n task_id=\"unique_id\",\n queue_name=\"critical\",\n task_type=\"example\",\n task_func=my_async_function,\n priority=QueuePriority.HIGH,\n *args,\n **kwargs\n)\n```\n\n### Monitoring Task Status\n\n```python\nstatus = await manager.get_task_status(\"unique_id\")\nprint(f\"Status: {status.status}\") # QUEUED, PROCESSING, COMPLETED, FAILED\nprint(f\"Queue: {status.queue_name}\")\nprint(f\"Priority: {status.priority}\")\nprint(f\"Result: {status.result}\")\nprint(f\"Error: {status.error}\")\n```\n\n### Queue Status Monitoring\n\n```python\nstatus = await manager.get_queue_status(\"critical\")\nprint(f\"Queue Size: {status['queued_tasks']}\")\nprint(f\"Active Workers: {status['current_workers']}\")\nprint(f\"Circuit Breaker: {status['circuit_breaker_status']}\")\n```\n\n## Advanced Features\n\n### Circuit Breaker\n\nEach queue has a built-in circuit breaker that helps prevent cascade failures:\n\n- Opens after 3 consecutive failures (configurable)\n- Auto-resets after 60 seconds (configurable)\n- Provides circuit state monitoring\n\n### Task Expiration\n\nTasks automatically expire after 24 hours (configurable) to prevent resource leaks.\n\n### Redis Configuration\n\n```python\nmanager = TaskQueueManager(\n redis_url=\"redis://username:password@hostname:6379/0\",\n cleanup_on_start=True # Optional: cleans up existing tasks on startup\n)\n```\n\n## Error Handling\n\nThe system handles various error scenarios:\n\n- `TaskAlreadyExistsError`: Raised when submitting a task with a duplicate ID\n- `TaskNotFoundError`: Raised when querying a non-existent task\n- `QueueNotFoundError`: Raised when accessing an undefined queue\n- `QueueProcessorError`: Raised for queue processing failures\n\n## API Reference\n\n### TaskQueueManager\n\nCore Methods:\n- `async initialize()`\n- `async create_queue(config: QueueConfig)`\n- `async submit_task(task_id, queue_name, task_type, task_func, priority, *args, **kwargs)`\n- `async get_task_status(task_id)`\n- `async get_queue_status(queue_name)`\n- `async shutdown(wait=True, timeout=5.0)`\n\n### QueueConfig\n\nConfiguration Parameters:\n- `name: str`\n- `max_workers: int`\n- `max_size: int`\n- `priority: QueuePriority`\n\n## Contributing\n\nContributions are welcome! Please feel free to submit a Pull Request.\n\n## License\n\nThis project is licensed under the MIT License - see the LICENSE file for details.",
"bugtrack_url": null,
"license": null,
"summary": "A flexible async task queue manager for Python applications",
"version": "0.1.4",
"project_urls": {
"Bug Tracker": "https://github.com/justrach/kew/issues",
"Homepage": "https://github.com/justrach/kew"
},
"split_keywords": [
"async",
" concurrent",
" manager",
" queue",
" redis",
" task"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "a77ec1a24dee7e925f98c713ae4d4f922b678ad172797996b1601f8859e7c71f",
"md5": "bab47e28567450e8c6b486c3960afdba",
"sha256": "0f0ca7d4e78b3da0af034f9c470668b9a5c67c898885bacaf9b677e6459e2438"
},
"downloads": -1,
"filename": "kew-0.1.4-py3-none-any.whl",
"has_sig": false,
"md5_digest": "bab47e28567450e8c6b486c3960afdba",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.8",
"size": 12876,
"upload_time": "2024-11-03T04:01:31",
"upload_time_iso_8601": "2024-11-03T04:01:31.832462Z",
"url": "https://files.pythonhosted.org/packages/a7/7e/c1a24dee7e925f98c713ae4d4f922b678ad172797996b1601f8859e7c71f/kew-0.1.4-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "9fc185e8d5f9c95c12c162121344ecc28473d9cc0507be0f1c890f2b6e401cbb",
"md5": "5d2dae592c38f912d6d10497f25a3c5f",
"sha256": "2342f389d20d97ff524cda435545fa77db0626945ff528e0a44c675da3b40314"
},
"downloads": -1,
"filename": "kew-0.1.4.tar.gz",
"has_sig": false,
"md5_digest": "5d2dae592c38f912d6d10497f25a3c5f",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.8",
"size": 15497,
"upload_time": "2024-11-03T04:01:33",
"upload_time_iso_8601": "2024-11-03T04:01:33.605086Z",
"url": "https://files.pythonhosted.org/packages/9f/c1/85e8d5f9c95c12c162121344ecc28473d9cc0507be0f1c890f2b6e401cbb/kew-0.1.4.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-11-03 04:01:33",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "justrach",
"github_project": "kew",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "kew"
}