| Name | qode-task-worker JSON |
| Version |
1.0.4
JSON |
| download |
| home_page | None |
| Summary | A Python library for task management with RabbitMQ and polling support |
| upload_time | 2025-08-03 13:14:30 |
| maintainer | Bao Ninh |
| docs_url | None |
| author | Bao Ninh |
| requires_python | >=3.8 |
| license | None |
| keywords |
task
worker
rabbitmq
polling
queue
|
| VCS |
|
| bugtrack_url |
|
| requirements |
No requirements were recorded.
|
| Travis-CI |
No Travis.
|
| coveralls test coverage |
No coveralls.
|
# Task Worker Library
A Python library for task management with RabbitMQ and polling support. This library provides a unified interface for consuming tasks from a task management API with automatic fallback between RabbitMQ and polling modes.
## Features
- **Dual Mode Support**: RabbitMQ with manual acknowledgment/nack and polling fallback
- **Automatic Fallback**: Seamlessly switches to polling if RabbitMQ is unavailable
- **Manual Acknowledgment**: Proper message handling with ack/nack for RabbitMQ
- **Configurable**: Flexible configuration for different environments
- **Error Handling**: Robust error handling with retry mechanisms
- **Logging**: Comprehensive logging for debugging and monitoring
## Installation
```bash
pip install qode-task-worker
```
For development:
```bash
pip install qode-task-worker[dev]
```
## Quick Start
```python
import logging
from task_worker import TaskWorker, TaskWorkerConfig
# Configure logging
logging.basicConfig(level=logging.INFO)
# Define your task processing function
def process_my_task(task):
"""Your custom task processing logic"""
task_id = task.get('id')
metadata = task.get('metadata', {})
print(f"Processing task {task_id}: {metadata}")
# Your business logic here
# The library handles task registration and result saving
# You just need to process the task data
return result;
# Configure the worker
config = TaskWorkerConfig(
task_management_endpoint="https://your-task-api.com",
rabbitmq_consumer_host="amqps://user:pass@your-rabbitmq.com/vhost",
task_type="your-task-type",
worker_name="my-worker",
polling_interval=5
)
# Create and start the worker
worker = TaskWorker(config, process_my_task)
worker.start() # This will run indefinitely
```
## Configuration
### TaskWorkerConfig Parameters
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `task_management_endpoint` | str | ENV or default URL | Task management API endpoint |
| `rabbitmq_consumer_host` | str | ENV or None | RabbitMQ connection URL |
| `task_type` | str | "default" | Type of tasks to process |
| `worker_name` | str | hostname | Worker identifier |
| `polling_interval` | int | 5 | Seconds between polls in polling mode |
| `rabbitmq_queue_prefix` | str | "task.available" | RabbitMQ queue prefix |
| `prefetch_count` | int | 1 | RabbitMQ prefetch count |
| `max_retry_attempts` | int | 3 | Max retry attempts for API calls |
### Environment Variables
The library automatically reads from these environment variables:
- `TASK_MANAGEMENT_ENDPOINT`: Task management API URL
- `RABBITMQ_CONSUMER_HOST`: RabbitMQ connection string
## Advanced Usage
### Status Monitoring
```python
worker = TaskWorker(config, process_task)
# Get worker status
status = worker.get_status()
print(f"Mode: {status['mode']}")
print(f"Task Type: {status['task_type']}")
print(f"RabbitMQ Connected: {status['rabbitmq_connected']}")
# Check if running in RabbitMQ mode
if worker.is_running_rabbitmq_mode():
print("Running with RabbitMQ")
else:
print("Running in polling mode")
```
## Error Handling
The library provides robust error handling:
### RabbitMQ Mode
- **Message Parsing Errors**: Messages are nacked without requeue to prevent infinite loops
- **Task Processing Errors**: Messages are nacked with requeue for retry by other workers
- **Connection Failures**: Automatic fallback to polling mode
### Polling Mode
- **API Failures**: Logged and retried after polling interval
- **Processing Errors**: Logged (no requeue mechanism in polling)
## Message Format
Expected RabbitMQ message format:
```json
{
"taskType": "your-task-type",
"metadata": {
"additional": "data"
}
}
```
Task API response format:
```json
{
"data": {
"id": "task-id-123",
"taskType": "your-task-type",
"metadata": {
"metadata": "metadata"
},
"webhookApi": "https://callback-url.com"
}
}
```
## License
MIT License
## Contributing
1. Fork the repository
2. Create a feature branch
3. Make your changes
4. Add tests
5. Submit a pull request
Raw data
{
"_id": null,
"home_page": null,
"name": "qode-task-worker",
"maintainer": "Bao Ninh",
"docs_url": null,
"requires_python": ">=3.8",
"maintainer_email": "Bao Ninh <bao.ninh@qode.world>",
"keywords": "task, worker, rabbitmq, polling, queue",
"author": "Bao Ninh",
"author_email": "Bao Ninh <bao.ninh@qode.world>",
"download_url": "https://files.pythonhosted.org/packages/d3/57/6b49f6309fbd017aae34b3822368f9d6e729fc41fedb7ada348115d618ee/qode_task_worker-1.0.4.tar.gz",
"platform": null,
"description": "# Task Worker Library\n\nA Python library for task management with RabbitMQ and polling support. This library provides a unified interface for consuming tasks from a task management API with automatic fallback between RabbitMQ and polling modes.\n\n## Features\n\n- **Dual Mode Support**: RabbitMQ with manual acknowledgment/nack and polling fallback\n- **Automatic Fallback**: Seamlessly switches to polling if RabbitMQ is unavailable\n- **Manual Acknowledgment**: Proper message handling with ack/nack for RabbitMQ\n- **Configurable**: Flexible configuration for different environments\n- **Error Handling**: Robust error handling with retry mechanisms\n- **Logging**: Comprehensive logging for debugging and monitoring\n\n## Installation\n\n```bash\npip install qode-task-worker\n```\n\nFor development:\n```bash\npip install qode-task-worker[dev]\n```\n\n## Quick Start\n\n```python\nimport logging\nfrom task_worker import TaskWorker, TaskWorkerConfig\n\n# Configure logging\nlogging.basicConfig(level=logging.INFO)\n\n# Define your task processing function\ndef process_my_task(task):\n \"\"\"Your custom task processing logic\"\"\"\n task_id = task.get('id')\n metadata = task.get('metadata', {})\n \n print(f\"Processing task {task_id}: {metadata}\")\n \n # Your business logic here\n \n # The library handles task registration and result saving\n # You just need to process the task data\n return result;\n\n# Configure the worker\nconfig = TaskWorkerConfig(\n task_management_endpoint=\"https://your-task-api.com\",\n rabbitmq_consumer_host=\"amqps://user:pass@your-rabbitmq.com/vhost\",\n task_type=\"your-task-type\",\n worker_name=\"my-worker\",\n polling_interval=5\n)\n\n# Create and start the worker\nworker = TaskWorker(config, process_my_task)\nworker.start() # This will run indefinitely\n```\n\n## Configuration\n\n### TaskWorkerConfig Parameters\n\n| Parameter | Type | Default | Description |\n|-----------|------|---------|-------------|\n| `task_management_endpoint` | str | ENV or default URL | Task management API endpoint |\n| `rabbitmq_consumer_host` | str | ENV or None | RabbitMQ connection URL |\n| `task_type` | str | \"default\" | Type of tasks to process |\n| `worker_name` | str | hostname | Worker identifier |\n| `polling_interval` | int | 5 | Seconds between polls in polling mode |\n| `rabbitmq_queue_prefix` | str | \"task.available\" | RabbitMQ queue prefix |\n| `prefetch_count` | int | 1 | RabbitMQ prefetch count |\n| `max_retry_attempts` | int | 3 | Max retry attempts for API calls |\n\n### Environment Variables\n\nThe library automatically reads from these environment variables:\n\n- `TASK_MANAGEMENT_ENDPOINT`: Task management API URL\n- `RABBITMQ_CONSUMER_HOST`: RabbitMQ connection string\n\n## Advanced Usage\n\n### Status Monitoring\n\n```python\nworker = TaskWorker(config, process_task)\n\n# Get worker status\nstatus = worker.get_status()\nprint(f\"Mode: {status['mode']}\")\nprint(f\"Task Type: {status['task_type']}\")\nprint(f\"RabbitMQ Connected: {status['rabbitmq_connected']}\")\n\n# Check if running in RabbitMQ mode\nif worker.is_running_rabbitmq_mode():\n print(\"Running with RabbitMQ\")\nelse:\n print(\"Running in polling mode\")\n```\n\n## Error Handling\n\nThe library provides robust error handling:\n\n### RabbitMQ Mode\n- **Message Parsing Errors**: Messages are nacked without requeue to prevent infinite loops\n- **Task Processing Errors**: Messages are nacked with requeue for retry by other workers\n- **Connection Failures**: Automatic fallback to polling mode\n\n### Polling Mode\n- **API Failures**: Logged and retried after polling interval\n- **Processing Errors**: Logged (no requeue mechanism in polling)\n\n## Message Format\n\nExpected RabbitMQ message format:\n\n```json\n{\n \"taskType\": \"your-task-type\",\n \"metadata\": {\n \"additional\": \"data\"\n }\n}\n```\n\nTask API response format:\n\n```json\n{\n \"data\": {\n \"id\": \"task-id-123\",\n \"taskType\": \"your-task-type\", \n \"metadata\": {\n \"metadata\": \"metadata\"\n },\n \"webhookApi\": \"https://callback-url.com\"\n }\n}\n```\n\n## License\n\nMIT License\n\n## Contributing\n\n1. Fork the repository\n2. Create a feature branch\n3. Make your changes\n4. Add tests\n5. Submit a pull request \n",
"bugtrack_url": null,
"license": null,
"summary": "A Python library for task management with RabbitMQ and polling support",
"version": "1.0.4",
"project_urls": null,
"split_keywords": [
"task",
" worker",
" rabbitmq",
" polling",
" queue"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "dc0eb6d6db4af392ea6c0fe2d172dcdb6b6b6ca38bdeb9d4cead4d7b35080ff8",
"md5": "512bd230bc6d3cac2aa93e517f6576f6",
"sha256": "a5241940a1c6e267ffc0024010384ea02a320798d1610fe7181a2413fd1b6aa3"
},
"downloads": -1,
"filename": "qode_task_worker-1.0.4-py3-none-any.whl",
"has_sig": false,
"md5_digest": "512bd230bc6d3cac2aa93e517f6576f6",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.8",
"size": 10063,
"upload_time": "2025-08-03T13:14:29",
"upload_time_iso_8601": "2025-08-03T13:14:29.723256Z",
"url": "https://files.pythonhosted.org/packages/dc/0e/b6d6db4af392ea6c0fe2d172dcdb6b6b6ca38bdeb9d4cead4d7b35080ff8/qode_task_worker-1.0.4-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "d3576b49f6309fbd017aae34b3822368f9d6e729fc41fedb7ada348115d618ee",
"md5": "364a888c9cb198e6a66e7402537163f4",
"sha256": "04ba3b8f24d431c482ceedf0c83d72a76cf8d6d098056d1353e5e67e328c4f7a"
},
"downloads": -1,
"filename": "qode_task_worker-1.0.4.tar.gz",
"has_sig": false,
"md5_digest": "364a888c9cb198e6a66e7402537163f4",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.8",
"size": 9889,
"upload_time": "2025-08-03T13:14:30",
"upload_time_iso_8601": "2025-08-03T13:14:30.727751Z",
"url": "https://files.pythonhosted.org/packages/d3/57/6b49f6309fbd017aae34b3822368f9d6e729fc41fedb7ada348115d618ee/qode_task_worker-1.0.4.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-08-03 13:14:30",
"github": false,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"lcname": "qode-task-worker"
}