kew


Namekew JSON
Version 0.1.4 PyPI version JSON
download
home_pageNone
SummaryA flexible async task queue manager for Python applications
upload_time2024-11-03 04:01:33
maintainerNone
docs_urlNone
authorNone
requires_python>=3.8
licenseNone
keywords async concurrent manager queue redis task
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Kew Task Queue Manager

A robust, Redis-backed asynchronous task queue manager for Python applications with support for priority-based queues and circuit breaker patterns.

## Features

- Multiple named queues with independent configurations
- Priority-based task scheduling with millisecond precision
- Redis-backed persistence for reliability
- Configurable worker pools per queue
- Built-in circuit breaker for fault tolerance
- Comprehensive task lifecycle management
- Automatic task expiration (24-hour default)
- Detailed logging and monitoring
- Graceful shutdown handling
- Thread-safe operations

## Installation

```bash
pip install kew
```

## Quick Start

```python
import asyncio
from kew import TaskQueueManager, QueueConfig, QueuePriority

async def example_task(x: int):
    await asyncio.sleep(1)
    return x * 2

async def main():
    # Initialize the task queue manager with Redis connection
    manager = TaskQueueManager(redis_url="redis://localhost:6379")
    await manager.initialize()
    
    # Create a high-priority queue
    await manager.create_queue(QueueConfig(
        name="high_priority",
        max_workers=4,
        max_size=1000,
        priority=QueuePriority.HIGH
    ))
    
    # Submit a task
    task_info = await manager.submit_task(
        task_id="task1",
        queue_name="high_priority",
        task_type="multiplication",
        task_func=example_task,
        priority=QueuePriority.HIGH,
        x=5
    )
    
    # Check task status
    await asyncio.sleep(2)
    status = await manager.get_task_status("task1")
    print(f"Task Result: {status.result}")
    
    # Graceful shutdown
    await manager.shutdown()

if __name__ == "__main__":
    asyncio.run(main())
```

## Queue Configuration

### Creating Queues

```python
from kew import QueueConfig, QueuePriority

# Create a high-priority queue with 4 workers
await manager.create_queue(QueueConfig(
    name="critical",
    max_workers=4,
    max_size=1000,
    priority=QueuePriority.HIGH
))
```

### Queue Priority Levels

- `QueuePriority.HIGH` (1)
- `QueuePriority.MEDIUM` (2)
- `QueuePriority.LOW` (3)

Tasks within the same priority level are processed in FIFO order with millisecond precision.

## Task Management

### Submitting Tasks

```python
task_info = await manager.submit_task(
    task_id="unique_id",
    queue_name="critical",
    task_type="example",
    task_func=my_async_function,
    priority=QueuePriority.HIGH,
    *args,
    **kwargs
)
```

### Monitoring Task Status

```python
status = await manager.get_task_status("unique_id")
print(f"Status: {status.status}")  # QUEUED, PROCESSING, COMPLETED, FAILED
print(f"Queue: {status.queue_name}")
print(f"Priority: {status.priority}")
print(f"Result: {status.result}")
print(f"Error: {status.error}")
```

### Queue Status Monitoring

```python
status = await manager.get_queue_status("critical")
print(f"Queue Size: {status['queued_tasks']}")
print(f"Active Workers: {status['current_workers']}")
print(f"Circuit Breaker: {status['circuit_breaker_status']}")
```

## Advanced Features

### Circuit Breaker

Each queue has a built-in circuit breaker that helps prevent cascade failures:

- Opens after 3 consecutive failures (configurable)
- Auto-resets after 60 seconds (configurable)
- Provides circuit state monitoring

### Task Expiration

Tasks automatically expire after 24 hours (configurable) to prevent resource leaks.

### Redis Configuration

```python
manager = TaskQueueManager(
    redis_url="redis://username:password@hostname:6379/0",
    cleanup_on_start=True  # Optional: cleans up existing tasks on startup
)
```

## Error Handling

The system handles various error scenarios:

- `TaskAlreadyExistsError`: Raised when submitting a task with a duplicate ID
- `TaskNotFoundError`: Raised when querying a non-existent task
- `QueueNotFoundError`: Raised when accessing an undefined queue
- `QueueProcessorError`: Raised for queue processing failures

## API Reference

### TaskQueueManager

Core Methods:
- `async initialize()`
- `async create_queue(config: QueueConfig)`
- `async submit_task(task_id, queue_name, task_type, task_func, priority, *args, **kwargs)`
- `async get_task_status(task_id)`
- `async get_queue_status(queue_name)`
- `async shutdown(wait=True, timeout=5.0)`

### QueueConfig

Configuration Parameters:
- `name: str`
- `max_workers: int`
- `max_size: int`
- `priority: QueuePriority`

## Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

## License

This project is licensed under the MIT License - see the LICENSE file for details.
            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "kew",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.8",
    "maintainer_email": null,
    "keywords": "async, concurrent, manager, queue, redis, task",
    "author": null,
    "author_email": "Rach Pradhan <rach@rachit.ai>",
    "download_url": "https://files.pythonhosted.org/packages/9f/c1/85e8d5f9c95c12c162121344ecc28473d9cc0507be0f1c890f2b6e401cbb/kew-0.1.4.tar.gz",
    "platform": null,
    "description": "# Kew Task Queue Manager\n\nA robust, Redis-backed asynchronous task queue manager for Python applications with support for priority-based queues and circuit breaker patterns.\n\n## Features\n\n- Multiple named queues with independent configurations\n- Priority-based task scheduling with millisecond precision\n- Redis-backed persistence for reliability\n- Configurable worker pools per queue\n- Built-in circuit breaker for fault tolerance\n- Comprehensive task lifecycle management\n- Automatic task expiration (24-hour default)\n- Detailed logging and monitoring\n- Graceful shutdown handling\n- Thread-safe operations\n\n## Installation\n\n```bash\npip install kew\n```\n\n## Quick Start\n\n```python\nimport asyncio\nfrom kew import TaskQueueManager, QueueConfig, QueuePriority\n\nasync def example_task(x: int):\n    await asyncio.sleep(1)\n    return x * 2\n\nasync def main():\n    # Initialize the task queue manager with Redis connection\n    manager = TaskQueueManager(redis_url=\"redis://localhost:6379\")\n    await manager.initialize()\n    \n    # Create a high-priority queue\n    await manager.create_queue(QueueConfig(\n        name=\"high_priority\",\n        max_workers=4,\n        max_size=1000,\n        priority=QueuePriority.HIGH\n    ))\n    \n    # Submit a task\n    task_info = await manager.submit_task(\n        task_id=\"task1\",\n        queue_name=\"high_priority\",\n        task_type=\"multiplication\",\n        task_func=example_task,\n        priority=QueuePriority.HIGH,\n        x=5\n    )\n    \n    # Check task status\n    await asyncio.sleep(2)\n    status = await manager.get_task_status(\"task1\")\n    print(f\"Task Result: {status.result}\")\n    \n    # Graceful shutdown\n    await manager.shutdown()\n\nif __name__ == \"__main__\":\n    asyncio.run(main())\n```\n\n## Queue Configuration\n\n### Creating Queues\n\n```python\nfrom kew import QueueConfig, QueuePriority\n\n# Create a high-priority queue with 4 workers\nawait manager.create_queue(QueueConfig(\n    name=\"critical\",\n    max_workers=4,\n    max_size=1000,\n    priority=QueuePriority.HIGH\n))\n```\n\n### Queue Priority Levels\n\n- `QueuePriority.HIGH` (1)\n- `QueuePriority.MEDIUM` (2)\n- `QueuePriority.LOW` (3)\n\nTasks within the same priority level are processed in FIFO order with millisecond precision.\n\n## Task Management\n\n### Submitting Tasks\n\n```python\ntask_info = await manager.submit_task(\n    task_id=\"unique_id\",\n    queue_name=\"critical\",\n    task_type=\"example\",\n    task_func=my_async_function,\n    priority=QueuePriority.HIGH,\n    *args,\n    **kwargs\n)\n```\n\n### Monitoring Task Status\n\n```python\nstatus = await manager.get_task_status(\"unique_id\")\nprint(f\"Status: {status.status}\")  # QUEUED, PROCESSING, COMPLETED, FAILED\nprint(f\"Queue: {status.queue_name}\")\nprint(f\"Priority: {status.priority}\")\nprint(f\"Result: {status.result}\")\nprint(f\"Error: {status.error}\")\n```\n\n### Queue Status Monitoring\n\n```python\nstatus = await manager.get_queue_status(\"critical\")\nprint(f\"Queue Size: {status['queued_tasks']}\")\nprint(f\"Active Workers: {status['current_workers']}\")\nprint(f\"Circuit Breaker: {status['circuit_breaker_status']}\")\n```\n\n## Advanced Features\n\n### Circuit Breaker\n\nEach queue has a built-in circuit breaker that helps prevent cascade failures:\n\n- Opens after 3 consecutive failures (configurable)\n- Auto-resets after 60 seconds (configurable)\n- Provides circuit state monitoring\n\n### Task Expiration\n\nTasks automatically expire after 24 hours (configurable) to prevent resource leaks.\n\n### Redis Configuration\n\n```python\nmanager = TaskQueueManager(\n    redis_url=\"redis://username:password@hostname:6379/0\",\n    cleanup_on_start=True  # Optional: cleans up existing tasks on startup\n)\n```\n\n## Error Handling\n\nThe system handles various error scenarios:\n\n- `TaskAlreadyExistsError`: Raised when submitting a task with a duplicate ID\n- `TaskNotFoundError`: Raised when querying a non-existent task\n- `QueueNotFoundError`: Raised when accessing an undefined queue\n- `QueueProcessorError`: Raised for queue processing failures\n\n## API Reference\n\n### TaskQueueManager\n\nCore Methods:\n- `async initialize()`\n- `async create_queue(config: QueueConfig)`\n- `async submit_task(task_id, queue_name, task_type, task_func, priority, *args, **kwargs)`\n- `async get_task_status(task_id)`\n- `async get_queue_status(queue_name)`\n- `async shutdown(wait=True, timeout=5.0)`\n\n### QueueConfig\n\nConfiguration Parameters:\n- `name: str`\n- `max_workers: int`\n- `max_size: int`\n- `priority: QueuePriority`\n\n## Contributing\n\nContributions are welcome! Please feel free to submit a Pull Request.\n\n## License\n\nThis project is licensed under the MIT License - see the LICENSE file for details.",
    "bugtrack_url": null,
    "license": null,
    "summary": "A flexible async task queue manager for Python applications",
    "version": "0.1.4",
    "project_urls": {
        "Bug Tracker": "https://github.com/justrach/kew/issues",
        "Homepage": "https://github.com/justrach/kew"
    },
    "split_keywords": [
        "async",
        " concurrent",
        " manager",
        " queue",
        " redis",
        " task"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "a77ec1a24dee7e925f98c713ae4d4f922b678ad172797996b1601f8859e7c71f",
                "md5": "bab47e28567450e8c6b486c3960afdba",
                "sha256": "0f0ca7d4e78b3da0af034f9c470668b9a5c67c898885bacaf9b677e6459e2438"
            },
            "downloads": -1,
            "filename": "kew-0.1.4-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "bab47e28567450e8c6b486c3960afdba",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.8",
            "size": 12876,
            "upload_time": "2024-11-03T04:01:31",
            "upload_time_iso_8601": "2024-11-03T04:01:31.832462Z",
            "url": "https://files.pythonhosted.org/packages/a7/7e/c1a24dee7e925f98c713ae4d4f922b678ad172797996b1601f8859e7c71f/kew-0.1.4-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "9fc185e8d5f9c95c12c162121344ecc28473d9cc0507be0f1c890f2b6e401cbb",
                "md5": "5d2dae592c38f912d6d10497f25a3c5f",
                "sha256": "2342f389d20d97ff524cda435545fa77db0626945ff528e0a44c675da3b40314"
            },
            "downloads": -1,
            "filename": "kew-0.1.4.tar.gz",
            "has_sig": false,
            "md5_digest": "5d2dae592c38f912d6d10497f25a3c5f",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8",
            "size": 15497,
            "upload_time": "2024-11-03T04:01:33",
            "upload_time_iso_8601": "2024-11-03T04:01:33.605086Z",
            "url": "https://files.pythonhosted.org/packages/9f/c1/85e8d5f9c95c12c162121344ecc28473d9cc0507be0f1c890f2b6e401cbb/kew-0.1.4.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-11-03 04:01:33",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "justrach",
    "github_project": "kew",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "kew"
}
        
Elapsed time: 0.92004s