qode-task-worker


Nameqode-task-worker JSON
Version 1.0.4 PyPI version JSON
download
home_pageNone
SummaryA Python library for task management with RabbitMQ and polling support
upload_time2025-08-03 13:14:30
maintainerBao Ninh
docs_urlNone
authorBao Ninh
requires_python>=3.8
licenseNone
keywords task worker rabbitmq polling queue
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Task Worker Library

A Python library for task management with RabbitMQ and polling support. This library provides a unified interface for consuming tasks from a task management API with automatic fallback between RabbitMQ and polling modes.

## Features

- **Dual Mode Support**: RabbitMQ with manual acknowledgment/nack and polling fallback
- **Automatic Fallback**: Seamlessly switches to polling if RabbitMQ is unavailable
- **Manual Acknowledgment**: Proper message handling with ack/nack for RabbitMQ
- **Configurable**: Flexible configuration for different environments
- **Error Handling**: Robust error handling with retry mechanisms
- **Logging**: Comprehensive logging for debugging and monitoring

## Installation

```bash
pip install qode-task-worker
```

For development:
```bash
pip install qode-task-worker[dev]
```

## Quick Start

```python
import logging
from task_worker import TaskWorker, TaskWorkerConfig

# Configure logging
logging.basicConfig(level=logging.INFO)

# Define your task processing function
def process_my_task(task):
    """Your custom task processing logic"""
    task_id = task.get('id')
    metadata = task.get('metadata', {})
    
    print(f"Processing task {task_id}: {metadata}")
    
    # Your business logic here
    
    # The library handles task registration and result saving
    # You just need to process the task data
    return result;

# Configure the worker
config = TaskWorkerConfig(
    task_management_endpoint="https://your-task-api.com",
    rabbitmq_consumer_host="amqps://user:pass@your-rabbitmq.com/vhost",
    task_type="your-task-type",
    worker_name="my-worker",
    polling_interval=5
)

# Create and start the worker
worker = TaskWorker(config, process_my_task)
worker.start()  # This will run indefinitely
```

## Configuration

### TaskWorkerConfig Parameters

| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `task_management_endpoint` | str | ENV or default URL | Task management API endpoint |
| `rabbitmq_consumer_host` | str | ENV or None | RabbitMQ connection URL |
| `task_type` | str | "default" | Type of tasks to process |
| `worker_name` | str | hostname | Worker identifier |
| `polling_interval` | int | 5 | Seconds between polls in polling mode |
| `rabbitmq_queue_prefix` | str | "task.available" | RabbitMQ queue prefix |
| `prefetch_count` | int | 1 | RabbitMQ prefetch count |
| `max_retry_attempts` | int | 3 | Max retry attempts for API calls |

### Environment Variables

The library automatically reads from these environment variables:

- `TASK_MANAGEMENT_ENDPOINT`: Task management API URL
- `RABBITMQ_CONSUMER_HOST`: RabbitMQ connection string

## Advanced Usage

### Status Monitoring

```python
worker = TaskWorker(config, process_task)

# Get worker status
status = worker.get_status()
print(f"Mode: {status['mode']}")
print(f"Task Type: {status['task_type']}")
print(f"RabbitMQ Connected: {status['rabbitmq_connected']}")

# Check if running in RabbitMQ mode
if worker.is_running_rabbitmq_mode():
    print("Running with RabbitMQ")
else:
    print("Running in polling mode")
```

## Error Handling

The library provides robust error handling:

### RabbitMQ Mode
- **Message Parsing Errors**: Messages are nacked without requeue to prevent infinite loops
- **Task Processing Errors**: Messages are nacked with requeue for retry by other workers
- **Connection Failures**: Automatic fallback to polling mode

### Polling Mode
- **API Failures**: Logged and retried after polling interval
- **Processing Errors**: Logged (no requeue mechanism in polling)

## Message Format

Expected RabbitMQ message format:

```json
{
  "taskType": "your-task-type",
  "metadata": {
    "additional": "data"
  }
}
```

Task API response format:

```json
{
  "data": {
    "id": "task-id-123",
    "taskType": "your-task-type", 
    "metadata": {
        "metadata": "metadata"
    },
    "webhookApi": "https://callback-url.com"
  }
}
```

## License

MIT License

## Contributing

1. Fork the repository
2. Create a feature branch
3. Make your changes
4. Add tests
5. Submit a pull request 

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "qode-task-worker",
    "maintainer": "Bao Ninh",
    "docs_url": null,
    "requires_python": ">=3.8",
    "maintainer_email": "Bao Ninh <bao.ninh@qode.world>",
    "keywords": "task, worker, rabbitmq, polling, queue",
    "author": "Bao Ninh",
    "author_email": "Bao Ninh <bao.ninh@qode.world>",
    "download_url": "https://files.pythonhosted.org/packages/d3/57/6b49f6309fbd017aae34b3822368f9d6e729fc41fedb7ada348115d618ee/qode_task_worker-1.0.4.tar.gz",
    "platform": null,
    "description": "# Task Worker Library\n\nA Python library for task management with RabbitMQ and polling support. This library provides a unified interface for consuming tasks from a task management API with automatic fallback between RabbitMQ and polling modes.\n\n## Features\n\n- **Dual Mode Support**: RabbitMQ with manual acknowledgment/nack and polling fallback\n- **Automatic Fallback**: Seamlessly switches to polling if RabbitMQ is unavailable\n- **Manual Acknowledgment**: Proper message handling with ack/nack for RabbitMQ\n- **Configurable**: Flexible configuration for different environments\n- **Error Handling**: Robust error handling with retry mechanisms\n- **Logging**: Comprehensive logging for debugging and monitoring\n\n## Installation\n\n```bash\npip install qode-task-worker\n```\n\nFor development:\n```bash\npip install qode-task-worker[dev]\n```\n\n## Quick Start\n\n```python\nimport logging\nfrom task_worker import TaskWorker, TaskWorkerConfig\n\n# Configure logging\nlogging.basicConfig(level=logging.INFO)\n\n# Define your task processing function\ndef process_my_task(task):\n    \"\"\"Your custom task processing logic\"\"\"\n    task_id = task.get('id')\n    metadata = task.get('metadata', {})\n    \n    print(f\"Processing task {task_id}: {metadata}\")\n    \n    # Your business logic here\n    \n    # The library handles task registration and result saving\n    # You just need to process the task data\n    return result;\n\n# Configure the worker\nconfig = TaskWorkerConfig(\n    task_management_endpoint=\"https://your-task-api.com\",\n    rabbitmq_consumer_host=\"amqps://user:pass@your-rabbitmq.com/vhost\",\n    task_type=\"your-task-type\",\n    worker_name=\"my-worker\",\n    polling_interval=5\n)\n\n# Create and start the worker\nworker = TaskWorker(config, process_my_task)\nworker.start()  # This will run indefinitely\n```\n\n## Configuration\n\n### TaskWorkerConfig Parameters\n\n| Parameter | Type | Default | Description |\n|-----------|------|---------|-------------|\n| `task_management_endpoint` | str | ENV or default URL | Task management API endpoint |\n| `rabbitmq_consumer_host` | str | ENV or None | RabbitMQ connection URL |\n| `task_type` | str | \"default\" | Type of tasks to process |\n| `worker_name` | str | hostname | Worker identifier |\n| `polling_interval` | int | 5 | Seconds between polls in polling mode |\n| `rabbitmq_queue_prefix` | str | \"task.available\" | RabbitMQ queue prefix |\n| `prefetch_count` | int | 1 | RabbitMQ prefetch count |\n| `max_retry_attempts` | int | 3 | Max retry attempts for API calls |\n\n### Environment Variables\n\nThe library automatically reads from these environment variables:\n\n- `TASK_MANAGEMENT_ENDPOINT`: Task management API URL\n- `RABBITMQ_CONSUMER_HOST`: RabbitMQ connection string\n\n## Advanced Usage\n\n### Status Monitoring\n\n```python\nworker = TaskWorker(config, process_task)\n\n# Get worker status\nstatus = worker.get_status()\nprint(f\"Mode: {status['mode']}\")\nprint(f\"Task Type: {status['task_type']}\")\nprint(f\"RabbitMQ Connected: {status['rabbitmq_connected']}\")\n\n# Check if running in RabbitMQ mode\nif worker.is_running_rabbitmq_mode():\n    print(\"Running with RabbitMQ\")\nelse:\n    print(\"Running in polling mode\")\n```\n\n## Error Handling\n\nThe library provides robust error handling:\n\n### RabbitMQ Mode\n- **Message Parsing Errors**: Messages are nacked without requeue to prevent infinite loops\n- **Task Processing Errors**: Messages are nacked with requeue for retry by other workers\n- **Connection Failures**: Automatic fallback to polling mode\n\n### Polling Mode\n- **API Failures**: Logged and retried after polling interval\n- **Processing Errors**: Logged (no requeue mechanism in polling)\n\n## Message Format\n\nExpected RabbitMQ message format:\n\n```json\n{\n  \"taskType\": \"your-task-type\",\n  \"metadata\": {\n    \"additional\": \"data\"\n  }\n}\n```\n\nTask API response format:\n\n```json\n{\n  \"data\": {\n    \"id\": \"task-id-123\",\n    \"taskType\": \"your-task-type\", \n    \"metadata\": {\n        \"metadata\": \"metadata\"\n    },\n    \"webhookApi\": \"https://callback-url.com\"\n  }\n}\n```\n\n## License\n\nMIT License\n\n## Contributing\n\n1. Fork the repository\n2. Create a feature branch\n3. Make your changes\n4. Add tests\n5. Submit a pull request \n",
    "bugtrack_url": null,
    "license": null,
    "summary": "A Python library for task management with RabbitMQ and polling support",
    "version": "1.0.4",
    "project_urls": null,
    "split_keywords": [
        "task",
        " worker",
        " rabbitmq",
        " polling",
        " queue"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "dc0eb6d6db4af392ea6c0fe2d172dcdb6b6b6ca38bdeb9d4cead4d7b35080ff8",
                "md5": "512bd230bc6d3cac2aa93e517f6576f6",
                "sha256": "a5241940a1c6e267ffc0024010384ea02a320798d1610fe7181a2413fd1b6aa3"
            },
            "downloads": -1,
            "filename": "qode_task_worker-1.0.4-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "512bd230bc6d3cac2aa93e517f6576f6",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.8",
            "size": 10063,
            "upload_time": "2025-08-03T13:14:29",
            "upload_time_iso_8601": "2025-08-03T13:14:29.723256Z",
            "url": "https://files.pythonhosted.org/packages/dc/0e/b6d6db4af392ea6c0fe2d172dcdb6b6b6ca38bdeb9d4cead4d7b35080ff8/qode_task_worker-1.0.4-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "d3576b49f6309fbd017aae34b3822368f9d6e729fc41fedb7ada348115d618ee",
                "md5": "364a888c9cb198e6a66e7402537163f4",
                "sha256": "04ba3b8f24d431c482ceedf0c83d72a76cf8d6d098056d1353e5e67e328c4f7a"
            },
            "downloads": -1,
            "filename": "qode_task_worker-1.0.4.tar.gz",
            "has_sig": false,
            "md5_digest": "364a888c9cb198e6a66e7402537163f4",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8",
            "size": 9889,
            "upload_time": "2025-08-03T13:14:30",
            "upload_time_iso_8601": "2025-08-03T13:14:30.727751Z",
            "url": "https://files.pythonhosted.org/packages/d3/57/6b49f6309fbd017aae34b3822368f9d6e729fc41fedb7ada348115d618ee/qode_task_worker-1.0.4.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-08-03 13:14:30",
    "github": false,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "lcname": "qode-task-worker"
}
        
Elapsed time: 2.52880s