llm-queue-task-manager


Namellm-queue-task-manager JSON
Version 0.1.3 PyPI version JSON
download
home_pageNone
SummaryA Redis-based queue task manager for LLM processing with intelligent token budget management
upload_time2025-08-05 22:20:24
maintainerNone
docs_urlNone
authorNone
requires_python>=3.8
licenseMIT
keywords llm queue redis task-manager token-management ai
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            
# LLM Queue Task Manager

A Redis-based queue system for managing LLM processing tasks with intelligent token budget management, priority queuing, and batch processing capabilities.

[![Python 3.8+](https://img.shields.io/badge/python-3.8+-blue.svg)](https://www.python.org/downloads/)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)
[![Redis](https://img.shields.io/badge/redis-4.0+-red.svg)](https://redis.io/)

## πŸš€ Features

- **🎯 Priority-based Queuing**: Automatically categorizes tasks by estimated token usage (low/medium/long)
- **πŸ’° Token Budget Management**: Prevents API quota exhaustion with intelligent rate limiting
- **⚑ Batch Processing**: Implements 3-2-1 cycle processing (3 low, 2 medium, 1 long priority tasks)
- **πŸ”„ Retry Logic**: Automatic retry with exponential backoff for failed requests
- **πŸ“Š Real-time Monitoring**: Comprehensive metrics and status tracking
- **πŸ”Œ Extensible**: Easy to implement custom processors for different LLM providers
- **πŸ—οΈ Production Ready**: Built for high-throughput production environments
- **πŸ›‘οΈ Error Resilient**: Graceful error handling and recovery mechanisms

## πŸ“¦ Installation

### Using uv (recommended)

```bash
# Install uv if you haven't already
curl -LsSf https://astral.sh/uv/install.sh | sh

# Install the package
uv add llm-queue-task-manager

# With optional dependencies for specific providers
uv add "llm-queue-task-manager[aws]"      # For AWS Bedrock
uv add "llm-queue-task-manager[openai]"   # For OpenAI
uv add "llm-queue-task-manager[fastapi]"  # For FastAPI integration
uv add "llm-queue-task-manager[all]"      # All optional dependencies


### Using pip

```bash
pip install llm-queue-task-manager

# With optional dependencies
pip install "llm-queue-task-manager[aws,openai,fastapi]"
```

## πŸƒ Quick Start

### 1. Basic Usage

```python
import asyncio
from llm_queue_manager import (
    RedisQueueManager, 
    RedisBatchProcessor, 
    QueuedRequest,
    BaseRequestProcessor
)

# Create a custom processor
class MyLLMProcessor(BaseRequestProcessor):
    async def process_request(self, request_data):
        # Your LLM processing logic here
        prompt = request_data.get('prompt', '')
        # ... process with your LLM provider
        return {"response": f"Processed: {prompt}"}

# Initialize components
processor = MyLLMProcessor()
queue_manager = RedisQueueManager()
batch_processor = RedisBatchProcessor(processor, queue_manager)

# Start processing
async def main():
    await batch_processor.start_processing()
    
    # Submit a request
    request = QueuedRequest.create(
        request_data={'prompt': 'Hello, world!'},
        estimated_tokens=1000
    )
    
    request_id = await queue_manager.add_request(request)
    print(f"Request {request_id} queued")
    
    # Check status
    status = await queue_manager.get_request_status(request_id)
    print(f"Status: {status}")

asyncio.run(main())
```

### 2. AWS Bedrock Integration

```python
from llm_queue_manager import RedisQueueManager, RedisBatchProcessor, QueuedRequest
from llm_queue_manager.processors.examples import BedrockProcessor

# Initialize with Bedrock processor
processor = BedrockProcessor(region_name='us-east-1')
queue_manager = RedisQueueManager()
batch_processor = RedisBatchProcessor(processor, queue_manager)

async def process_claude_request():
    await batch_processor.start_processing()
    
    request = QueuedRequest.create(
        request_data={
            'messages': [
                {
                    'role': 'user',
                    'content': [{'text': 'Explain quantum computing'}]
                }
            ],
            'model': 'claude-3-sonnet',
            'max_tokens': 2000,
            'temperature': 0.7
        },
        estimated_tokens=10000
    )
    
    request_id = await queue_manager.add_request(request)
    return request_id
```

### 3. OpenAI Integration

```python
from llm_queue_manager.processors.examples import OpenAIProcessor

# Initialize with OpenAI processor
processor = OpenAIProcessor(api_key="your-openai-api-key")
queue_manager = RedisQueueManager()
batch_processor = RedisBatchProcessor(processor, queue_manager)

async def process_gpt_request():
    await batch_processor.start_processing()
    
    request = QueuedRequest.create(
        request_data={
            'messages': [
                {'role': 'user', 'content': 'Write a Python function for binary search'}
            ],
            'model': 'gpt-4',
            'max_tokens': 1000,
            'temperature': 0.3
        },
        estimated_tokens=5000
    )
    
    request_id = await queue_manager.add_request(request)
    return request_id
```

### 4. FastAPI Integration

```python
from fastapi import FastAPI
from llm_queue_manager import RedisQueueManager, RedisBatchProcessor, QueuedRequest
from your_processor import YourLLMProcessor

app = FastAPI()
processor = YourLLMProcessor()
queue_manager = RedisQueueManager()
batch_processor = RedisBatchProcessor(processor, queue_manager)

@app.on_event("startup")
async def startup():
    await batch_processor.start_processing()

@app.post("/submit")
async def submit_request(prompt: str, max_tokens: int = 1000):
    request = QueuedRequest.create(
        request_data={'prompt': prompt, 'max_tokens': max_tokens},
        estimated_tokens=max_tokens * 2
    )
    
    request_id = await queue_manager.add_request(request)
    return {"request_id": request_id, "status": "queued"}

@app.get("/status/{request_id}")
async def get_status(request_id: str):
    return await queue_manager.get_request_status(request_id)

@app.get("/metrics")
async def get_metrics():
    return queue_manager.get_metrics()
```

## βš™οΈ Configuration

### Environment Variables

```bash
# Redis Configuration
REDIS_HOST=localhost                     # Redis host
REDIS_PORT=6379                         # Redis port
REDIS_DB=0                              # Redis database number
REDIS_PASSWORD=your_password            # Redis password (optional)
REDIS_MAX_CONNECTIONS=20                # Maximum Redis connections

# Queue Configuration
LLM_QUEUE_TOTAL_TPM=1000000            # Total tokens per minute
LLM_QUEUE_API_ALLOCATION_PERCENT=40    # Percentage for API allocation
LLM_QUEUE_BATCH_LOW_COUNT=3            # Low priority batch size
LLM_QUEUE_BATCH_MEDIUM_COUNT=2         # Medium priority batch size
LLM_QUEUE_BATCH_LONG_COUNT=1           # Long priority batch size
LLM_QUEUE_MAX_EMPTY_CYCLES=5           # Max empty cycles before longer wait
LLM_QUEUE_QUOTA_WAIT_DURATION=65       # Seconds to wait for quota reset
```

### Redis Setup

```bash
# Using Docker
docker run -d --name redis -p 6379:6379 redis:latest

# Using Docker Compose
version: '3.8'
services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    command: redis-server --appendonly yes

volumes:
  redis_data:
```

## πŸ”§ Custom Processors

Create custom processors by extending `BaseRequestProcessor`:

```python
from llm_queue_manager import BaseRequestProcessor
import httpx

class CustomLLMProcessor(BaseRequestProcessor):
    def __init__(self, api_key: str, base_url: str):
        self.api_key = api_key
        self.base_url = base_url
        self.client = httpx.AsyncClient()
    
    def get_required_fields(self):
        return ['prompt', 'model']
    
    async def validate_request(self, request_data):
        # Custom validation logic
        if not request_data.get('prompt'):
            return False
        return await super().validate_request(request_data)
    
    async def process_request(self, request_data):
        # Custom processing logic
        response = await self.client.post(
            f"{self.base_url}/completions",
            headers={"Authorization": f"Bearer {self.api_key}"},
            json={
                "prompt": request_data['prompt'],
                "model": request_data.get('model', 'default'),
                "max_tokens": request_data.get('max_tokens', 1000)
            }
        )
        
        return response.json()
    
    async def estimate_tokens(self, request_data):
        # Custom token estimation
        prompt = request_data.get('prompt', '')
        max_tokens = request_data.get('max_tokens', 1000)
        return len(prompt) // 4 + max_tokens
    
    async def handle_error(self, error, request_data):
        # Custom error handling
        if "rate_limit" in str(error).lower():
            return None  # Trigger retry
        return {"error": str(error)}
```

## πŸ“Š Monitoring and Metrics

### Queue Metrics

```python
metrics = queue_manager.get_metrics()

print(f"Total in queue: {metrics['total_in_queue']}")
print(f"Success rate: {metrics['success_rate_percent']}%")
print(f"Token utilization: {metrics['token_usage']['utilization_percent']}%")

# Per-priority metrics
for priority in ['low', 'medium', 'long']:
    completed = metrics[f'{priority}_completed']
    failed = metrics[f'{priority}_failed']
    print(f"{priority.upper()}: {completed} completed, {failed} failed")
```

### Processor Status

```python
status = batch_processor.get_processor_status()

print(f"Processing: {status['is_processing']}")
print(f"Current cycle: {status['current_cycle_position']}/6")
print(f"Current task type: {status['current_task_type']}")
print(f"Health status: {status['health_status']}")
print(f"Processing rate: {status['processing_rate_per_minute']} req/min")
```

### Health Checks

```python
# Test Redis connection
from llm_queue_manager.config import test_redis_connection
redis_healthy = test_redis_connection()

# Get system health
metrics = queue_manager.get_metrics()
health = metrics.get('system_health', {})
print(f"System status: {health.get('status', 'unknown')}")
print(f"Health score: {health.get('score', 0)}/100")
```

## πŸ—οΈ Architecture

The system consists of several key components:

### Core Components

1. **QueuedRequest**: Data model for requests with automatic task type classification
2. **RedisQueueManager**: Handles queue operations, token budgeting, and metrics
3. **RedisBatchProcessor**: Implements the 3-2-1 batch processing cycle
4. **BaseRequestProcessor**: Abstract base class for implementing custom LLM processors
5. **TokenEstimator**: Estimates token usage for different request types

### Processing Flow

```
Request Submission β†’ Token Estimation β†’ Priority Classification β†’ Queue Assignment
                                                                          ↓
Response Delivery ← Result Processing ← LLM API Call ← Batch Processing ← Queue Processing
```

### Priority System

Requests are automatically categorized into three priority levels:

- **🟒 Low Priority** (< 15K tokens): Quick responses, simple prompts
- **🟑 Medium Priority** (15K-75K tokens): Standard requests, moderate complexity  
- **πŸ”΄ Long Priority** (> 75K tokens): Complex prompts, detailed responses

The **3-2-1 processing cycle** ensures balanced throughput:
- 3 low priority requests
- 2 medium priority requests  
- 1 long priority request
- Repeat cycle

## πŸ›‘οΈ Token Budget Management

The system implements intelligent token budget management:

- **πŸ“Š Per-minute quotas**: Tracks token usage per minute to prevent API limits
- **πŸ›‘οΈ Safety buffers**: Maintains 10% buffer to prevent quota exhaustion
- **⏸️ Automatic throttling**: Pauses processing when approaching limits
- **πŸ”„ Quota reset detection**: Automatically resumes when quotas reset
- **πŸ“ˆ Usage tracking**: Comprehensive token usage analytics

## πŸ”„ Error Handling and Resilience

### Retry Strategies

- **Exponential backoff**: Increasing delays between retry attempts
- **Error classification**: Different handling for different error types
- **Quota-aware retries**: Special handling for rate limit errors
- **Maximum retry limits**: Prevents infinite retry loops

### Error Types

- **Rate Limiting**: Automatic retry with backoff
- **Quota Exhaustion**: Queue pausing until reset
- **Validation Errors**: Request modification and retry
- **Authentication**: Immediate failure with clear error
- **Server Errors**: Retry with exponential backoff
- **Network Timeouts**: Retry with connection pooling

## πŸ“š Examples

The package includes comprehensive examples:

- **`examples/basic_usage.py`**: Simple echo processor demonstration
- **`examples/fastapi_integration.py`**: Production FastAPI service
- **`examples/bedrock_example.py`**: AWS Bedrock with Claude models
- **`examples/openai_example.py`**: OpenAI GPT models with function calling

Run examples:

```bash
# Basic usage
python examples/basic_usage.py

# FastAPI service
python examples/fastapi_integration.py

# AWS Bedrock (requires AWS credentials)
python examples/bedrock_example.py

# OpenAI (requires OPENAI_API_KEY)
python examples/openai_example.py
```

## πŸš€ Production Deployment

### Docker Deployment

```dockerfile
FROM python:3.11-slim

WORKDIR /app

# Install uv
RUN pip install uv

# Copy requirements
COPY pyproject.toml ./
RUN uv pip install --system -e ".[all]"

# Copy application
COPY . .

# Run application
CMD ["python", "-m", "your_app"]
```

### Kubernetes Deployment

```yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: llm-queue-manager
spec:
  replicas: 3
  selector:
    matchLabels:
      app: llm-queue-manager
  template:
    metadata:
      labels:
        app: llm-queue-manager
    spec:
      containers:
      - name: app
        image: your-registry/llm-queue-manager:latest
        env:
        - name: REDIS_HOST
          value: "redis-service"
        - name: LLM_QUEUE_TOTAL_TPM
          value: "1000000"
        ports:
        - containerPort: 8000
        livenessProbe:
          httpGet:
            path: /v1/queue/health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
  name: redis-service
spec:
  selector:
    app: redis
  ports:
  - port: 6379
    targetPort: 6379
```

### Monitoring Setup

```yaml
# Prometheus monitoring
- job_name: 'llm-queue-manager'
  static_configs:
  - targets: ['llm-queue-manager:8000']
  metrics_path: '/v1/queue/metrics'
  scrape_interval: 30s
```

## 🀝 Contributing

We welcome contributions! Please see our [Contributing Guide](CONTRIBUTING.md) for details.

### Development Setup

```bash
# Clone the repository
git clone https://github.com/AryamanGurjar/llm-queue-task-manager.git
cd llm-queue-task-manager

# Install uv
curl -LsSf https://astral.sh/uv/install.sh | sh

# Install dependencies
uv sync --all-extras --dev

# Install pre-commit hooks
uv run pre-commit install

# Run tests
uv run pytest

# Run examples
uv run python examples/basic_usage.py
```

### Code Quality

```bash
# Format code
uv run black src/ examples/
uv run isort src/ examples/

# Type checking
uv run mypy src/

# Linting
uv run flake8 src/ examples/
```

## πŸ“„ License

This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.

## πŸ™ Acknowledgments

- Built with [Redis](https://redis.io/) for high-performance queuing
- Inspired by production LLM processing challenges
- Thanks to the open-source community for tools and libraries

## πŸ“ž Support

- **Issues**: [GitHub Issues](https://github.com/AryamanGurjar/llm-queue-task-manager/issues)
- **Documentation**: [README](https://github.com/AryamanGurjar/llm-queue-task-manager/blob/master/README.md)
- **Env Documentation**: [ENV_DOC](https://github.com/AryamanGurjar/llm-queue-task-manager/blob/master/ENV_DOC.md)

---

**Made with ❀️ for the LLM community**

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "llm-queue-task-manager",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.8",
    "maintainer_email": null,
    "keywords": "llm, queue, redis, task-manager, token-management, ai",
    "author": null,
    "author_email": "Aryaman Gurjar <aryamangurjar6@gmail.com>",
    "download_url": "https://files.pythonhosted.org/packages/89/ef/071936bc94829f4b72a338bd9568cd2df96b86cf1db1999cdfd5b7697f52/llm_queue_task_manager-0.1.3.tar.gz",
    "platform": null,
    "description": "\n# LLM Queue Task Manager\n\nA Redis-based queue system for managing LLM processing tasks with intelligent token budget management, priority queuing, and batch processing capabilities.\n\n[![Python 3.8+](https://img.shields.io/badge/python-3.8+-blue.svg)](https://www.python.org/downloads/)\n[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)\n[![Redis](https://img.shields.io/badge/redis-4.0+-red.svg)](https://redis.io/)\n\n## \ud83d\ude80 Features\n\n- **\ud83c\udfaf Priority-based Queuing**: Automatically categorizes tasks by estimated token usage (low/medium/long)\n- **\ud83d\udcb0 Token Budget Management**: Prevents API quota exhaustion with intelligent rate limiting\n- **\u26a1 Batch Processing**: Implements 3-2-1 cycle processing (3 low, 2 medium, 1 long priority tasks)\n- **\ud83d\udd04 Retry Logic**: Automatic retry with exponential backoff for failed requests\n- **\ud83d\udcca Real-time Monitoring**: Comprehensive metrics and status tracking\n- **\ud83d\udd0c Extensible**: Easy to implement custom processors for different LLM providers\n- **\ud83c\udfd7\ufe0f Production Ready**: Built for high-throughput production environments\n- **\ud83d\udee1\ufe0f Error Resilient**: Graceful error handling and recovery mechanisms\n\n## \ud83d\udce6 Installation\n\n### Using uv (recommended)\n\n```bash\n# Install uv if you haven't already\ncurl -LsSf https://astral.sh/uv/install.sh | sh\n\n# Install the package\nuv add llm-queue-task-manager\n\n# With optional dependencies for specific providers\nuv add \"llm-queue-task-manager[aws]\"      # For AWS Bedrock\nuv add \"llm-queue-task-manager[openai]\"   # For OpenAI\nuv add \"llm-queue-task-manager[fastapi]\"  # For FastAPI integration\nuv add \"llm-queue-task-manager[all]\"      # All optional dependencies\n\n\n### Using pip\n\n```bash\npip install llm-queue-task-manager\n\n# With optional dependencies\npip install \"llm-queue-task-manager[aws,openai,fastapi]\"\n```\n\n## \ud83c\udfc3 Quick Start\n\n### 1. Basic Usage\n\n```python\nimport asyncio\nfrom llm_queue_manager import (\n    RedisQueueManager, \n    RedisBatchProcessor, \n    QueuedRequest,\n    BaseRequestProcessor\n)\n\n# Create a custom processor\nclass MyLLMProcessor(BaseRequestProcessor):\n    async def process_request(self, request_data):\n        # Your LLM processing logic here\n        prompt = request_data.get('prompt', '')\n        # ... process with your LLM provider\n        return {\"response\": f\"Processed: {prompt}\"}\n\n# Initialize components\nprocessor = MyLLMProcessor()\nqueue_manager = RedisQueueManager()\nbatch_processor = RedisBatchProcessor(processor, queue_manager)\n\n# Start processing\nasync def main():\n    await batch_processor.start_processing()\n    \n    # Submit a request\n    request = QueuedRequest.create(\n        request_data={'prompt': 'Hello, world!'},\n        estimated_tokens=1000\n    )\n    \n    request_id = await queue_manager.add_request(request)\n    print(f\"Request {request_id} queued\")\n    \n    # Check status\n    status = await queue_manager.get_request_status(request_id)\n    print(f\"Status: {status}\")\n\nasyncio.run(main())\n```\n\n### 2. AWS Bedrock Integration\n\n```python\nfrom llm_queue_manager import RedisQueueManager, RedisBatchProcessor, QueuedRequest\nfrom llm_queue_manager.processors.examples import BedrockProcessor\n\n# Initialize with Bedrock processor\nprocessor = BedrockProcessor(region_name='us-east-1')\nqueue_manager = RedisQueueManager()\nbatch_processor = RedisBatchProcessor(processor, queue_manager)\n\nasync def process_claude_request():\n    await batch_processor.start_processing()\n    \n    request = QueuedRequest.create(\n        request_data={\n            'messages': [\n                {\n                    'role': 'user',\n                    'content': [{'text': 'Explain quantum computing'}]\n                }\n            ],\n            'model': 'claude-3-sonnet',\n            'max_tokens': 2000,\n            'temperature': 0.7\n        },\n        estimated_tokens=10000\n    )\n    \n    request_id = await queue_manager.add_request(request)\n    return request_id\n```\n\n### 3. OpenAI Integration\n\n```python\nfrom llm_queue_manager.processors.examples import OpenAIProcessor\n\n# Initialize with OpenAI processor\nprocessor = OpenAIProcessor(api_key=\"your-openai-api-key\")\nqueue_manager = RedisQueueManager()\nbatch_processor = RedisBatchProcessor(processor, queue_manager)\n\nasync def process_gpt_request():\n    await batch_processor.start_processing()\n    \n    request = QueuedRequest.create(\n        request_data={\n            'messages': [\n                {'role': 'user', 'content': 'Write a Python function for binary search'}\n            ],\n            'model': 'gpt-4',\n            'max_tokens': 1000,\n            'temperature': 0.3\n        },\n        estimated_tokens=5000\n    )\n    \n    request_id = await queue_manager.add_request(request)\n    return request_id\n```\n\n### 4. FastAPI Integration\n\n```python\nfrom fastapi import FastAPI\nfrom llm_queue_manager import RedisQueueManager, RedisBatchProcessor, QueuedRequest\nfrom your_processor import YourLLMProcessor\n\napp = FastAPI()\nprocessor = YourLLMProcessor()\nqueue_manager = RedisQueueManager()\nbatch_processor = RedisBatchProcessor(processor, queue_manager)\n\n@app.on_event(\"startup\")\nasync def startup():\n    await batch_processor.start_processing()\n\n@app.post(\"/submit\")\nasync def submit_request(prompt: str, max_tokens: int = 1000):\n    request = QueuedRequest.create(\n        request_data={'prompt': prompt, 'max_tokens': max_tokens},\n        estimated_tokens=max_tokens * 2\n    )\n    \n    request_id = await queue_manager.add_request(request)\n    return {\"request_id\": request_id, \"status\": \"queued\"}\n\n@app.get(\"/status/{request_id}\")\nasync def get_status(request_id: str):\n    return await queue_manager.get_request_status(request_id)\n\n@app.get(\"/metrics\")\nasync def get_metrics():\n    return queue_manager.get_metrics()\n```\n\n## \u2699\ufe0f Configuration\n\n### Environment Variables\n\n```bash\n# Redis Configuration\nREDIS_HOST=localhost                     # Redis host\nREDIS_PORT=6379                         # Redis port\nREDIS_DB=0                              # Redis database number\nREDIS_PASSWORD=your_password            # Redis password (optional)\nREDIS_MAX_CONNECTIONS=20                # Maximum Redis connections\n\n# Queue Configuration\nLLM_QUEUE_TOTAL_TPM=1000000            # Total tokens per minute\nLLM_QUEUE_API_ALLOCATION_PERCENT=40    # Percentage for API allocation\nLLM_QUEUE_BATCH_LOW_COUNT=3            # Low priority batch size\nLLM_QUEUE_BATCH_MEDIUM_COUNT=2         # Medium priority batch size\nLLM_QUEUE_BATCH_LONG_COUNT=1           # Long priority batch size\nLLM_QUEUE_MAX_EMPTY_CYCLES=5           # Max empty cycles before longer wait\nLLM_QUEUE_QUOTA_WAIT_DURATION=65       # Seconds to wait for quota reset\n```\n\n### Redis Setup\n\n```bash\n# Using Docker\ndocker run -d --name redis -p 6379:6379 redis:latest\n\n# Using Docker Compose\nversion: '3.8'\nservices:\n  redis:\n    image: redis:7-alpine\n    ports:\n      - \"6379:6379\"\n    volumes:\n      - redis_data:/data\n    command: redis-server --appendonly yes\n\nvolumes:\n  redis_data:\n```\n\n## \ud83d\udd27 Custom Processors\n\nCreate custom processors by extending `BaseRequestProcessor`:\n\n```python\nfrom llm_queue_manager import BaseRequestProcessor\nimport httpx\n\nclass CustomLLMProcessor(BaseRequestProcessor):\n    def __init__(self, api_key: str, base_url: str):\n        self.api_key = api_key\n        self.base_url = base_url\n        self.client = httpx.AsyncClient()\n    \n    def get_required_fields(self):\n        return ['prompt', 'model']\n    \n    async def validate_request(self, request_data):\n        # Custom validation logic\n        if not request_data.get('prompt'):\n            return False\n        return await super().validate_request(request_data)\n    \n    async def process_request(self, request_data):\n        # Custom processing logic\n        response = await self.client.post(\n            f\"{self.base_url}/completions\",\n            headers={\"Authorization\": f\"Bearer {self.api_key}\"},\n            json={\n                \"prompt\": request_data['prompt'],\n                \"model\": request_data.get('model', 'default'),\n                \"max_tokens\": request_data.get('max_tokens', 1000)\n            }\n        )\n        \n        return response.json()\n    \n    async def estimate_tokens(self, request_data):\n        # Custom token estimation\n        prompt = request_data.get('prompt', '')\n        max_tokens = request_data.get('max_tokens', 1000)\n        return len(prompt) // 4 + max_tokens\n    \n    async def handle_error(self, error, request_data):\n        # Custom error handling\n        if \"rate_limit\" in str(error).lower():\n            return None  # Trigger retry\n        return {\"error\": str(error)}\n```\n\n## \ud83d\udcca Monitoring and Metrics\n\n### Queue Metrics\n\n```python\nmetrics = queue_manager.get_metrics()\n\nprint(f\"Total in queue: {metrics['total_in_queue']}\")\nprint(f\"Success rate: {metrics['success_rate_percent']}%\")\nprint(f\"Token utilization: {metrics['token_usage']['utilization_percent']}%\")\n\n# Per-priority metrics\nfor priority in ['low', 'medium', 'long']:\n    completed = metrics[f'{priority}_completed']\n    failed = metrics[f'{priority}_failed']\n    print(f\"{priority.upper()}: {completed} completed, {failed} failed\")\n```\n\n### Processor Status\n\n```python\nstatus = batch_processor.get_processor_status()\n\nprint(f\"Processing: {status['is_processing']}\")\nprint(f\"Current cycle: {status['current_cycle_position']}/6\")\nprint(f\"Current task type: {status['current_task_type']}\")\nprint(f\"Health status: {status['health_status']}\")\nprint(f\"Processing rate: {status['processing_rate_per_minute']} req/min\")\n```\n\n### Health Checks\n\n```python\n# Test Redis connection\nfrom llm_queue_manager.config import test_redis_connection\nredis_healthy = test_redis_connection()\n\n# Get system health\nmetrics = queue_manager.get_metrics()\nhealth = metrics.get('system_health', {})\nprint(f\"System status: {health.get('status', 'unknown')}\")\nprint(f\"Health score: {health.get('score', 0)}/100\")\n```\n\n## \ud83c\udfd7\ufe0f Architecture\n\nThe system consists of several key components:\n\n### Core Components\n\n1. **QueuedRequest**: Data model for requests with automatic task type classification\n2. **RedisQueueManager**: Handles queue operations, token budgeting, and metrics\n3. **RedisBatchProcessor**: Implements the 3-2-1 batch processing cycle\n4. **BaseRequestProcessor**: Abstract base class for implementing custom LLM processors\n5. **TokenEstimator**: Estimates token usage for different request types\n\n### Processing Flow\n\n```\nRequest Submission \u2192 Token Estimation \u2192 Priority Classification \u2192 Queue Assignment\n                                                                          \u2193\nResponse Delivery \u2190 Result Processing \u2190 LLM API Call \u2190 Batch Processing \u2190 Queue Processing\n```\n\n### Priority System\n\nRequests are automatically categorized into three priority levels:\n\n- **\ud83d\udfe2 Low Priority** (< 15K tokens): Quick responses, simple prompts\n- **\ud83d\udfe1 Medium Priority** (15K-75K tokens): Standard requests, moderate complexity  \n- **\ud83d\udd34 Long Priority** (> 75K tokens): Complex prompts, detailed responses\n\nThe **3-2-1 processing cycle** ensures balanced throughput:\n- 3 low priority requests\n- 2 medium priority requests  \n- 1 long priority request\n- Repeat cycle\n\n## \ud83d\udee1\ufe0f Token Budget Management\n\nThe system implements intelligent token budget management:\n\n- **\ud83d\udcca Per-minute quotas**: Tracks token usage per minute to prevent API limits\n- **\ud83d\udee1\ufe0f Safety buffers**: Maintains 10% buffer to prevent quota exhaustion\n- **\u23f8\ufe0f Automatic throttling**: Pauses processing when approaching limits\n- **\ud83d\udd04 Quota reset detection**: Automatically resumes when quotas reset\n- **\ud83d\udcc8 Usage tracking**: Comprehensive token usage analytics\n\n## \ud83d\udd04 Error Handling and Resilience\n\n### Retry Strategies\n\n- **Exponential backoff**: Increasing delays between retry attempts\n- **Error classification**: Different handling for different error types\n- **Quota-aware retries**: Special handling for rate limit errors\n- **Maximum retry limits**: Prevents infinite retry loops\n\n### Error Types\n\n- **Rate Limiting**: Automatic retry with backoff\n- **Quota Exhaustion**: Queue pausing until reset\n- **Validation Errors**: Request modification and retry\n- **Authentication**: Immediate failure with clear error\n- **Server Errors**: Retry with exponential backoff\n- **Network Timeouts**: Retry with connection pooling\n\n## \ud83d\udcda Examples\n\nThe package includes comprehensive examples:\n\n- **`examples/basic_usage.py`**: Simple echo processor demonstration\n- **`examples/fastapi_integration.py`**: Production FastAPI service\n- **`examples/bedrock_example.py`**: AWS Bedrock with Claude models\n- **`examples/openai_example.py`**: OpenAI GPT models with function calling\n\nRun examples:\n\n```bash\n# Basic usage\npython examples/basic_usage.py\n\n# FastAPI service\npython examples/fastapi_integration.py\n\n# AWS Bedrock (requires AWS credentials)\npython examples/bedrock_example.py\n\n# OpenAI (requires OPENAI_API_KEY)\npython examples/openai_example.py\n```\n\n## \ud83d\ude80 Production Deployment\n\n### Docker Deployment\n\n```dockerfile\nFROM python:3.11-slim\n\nWORKDIR /app\n\n# Install uv\nRUN pip install uv\n\n# Copy requirements\nCOPY pyproject.toml ./\nRUN uv pip install --system -e \".[all]\"\n\n# Copy application\nCOPY . .\n\n# Run application\nCMD [\"python\", \"-m\", \"your_app\"]\n```\n\n### Kubernetes Deployment\n\n```yaml\napiVersion: apps/v1\nkind: Deployment\nmetadata:\n  name: llm-queue-manager\nspec:\n  replicas: 3\n  selector:\n    matchLabels:\n      app: llm-queue-manager\n  template:\n    metadata:\n      labels:\n        app: llm-queue-manager\n    spec:\n      containers:\n      - name: app\n        image: your-registry/llm-queue-manager:latest\n        env:\n        - name: REDIS_HOST\n          value: \"redis-service\"\n        - name: LLM_QUEUE_TOTAL_TPM\n          value: \"1000000\"\n        ports:\n        - containerPort: 8000\n        livenessProbe:\n          httpGet:\n            path: /v1/queue/health\n            port: 8000\n          initialDelaySeconds: 30\n          periodSeconds: 10\n---\napiVersion: v1\nkind: Service\nmetadata:\n  name: redis-service\nspec:\n  selector:\n    app: redis\n  ports:\n  - port: 6379\n    targetPort: 6379\n```\n\n### Monitoring Setup\n\n```yaml\n# Prometheus monitoring\n- job_name: 'llm-queue-manager'\n  static_configs:\n  - targets: ['llm-queue-manager:8000']\n  metrics_path: '/v1/queue/metrics'\n  scrape_interval: 30s\n```\n\n## \ud83e\udd1d Contributing\n\nWe welcome contributions! Please see our [Contributing Guide](CONTRIBUTING.md) for details.\n\n### Development Setup\n\n```bash\n# Clone the repository\ngit clone https://github.com/AryamanGurjar/llm-queue-task-manager.git\ncd llm-queue-task-manager\n\n# Install uv\ncurl -LsSf https://astral.sh/uv/install.sh | sh\n\n# Install dependencies\nuv sync --all-extras --dev\n\n# Install pre-commit hooks\nuv run pre-commit install\n\n# Run tests\nuv run pytest\n\n# Run examples\nuv run python examples/basic_usage.py\n```\n\n### Code Quality\n\n```bash\n# Format code\nuv run black src/ examples/\nuv run isort src/ examples/\n\n# Type checking\nuv run mypy src/\n\n# Linting\nuv run flake8 src/ examples/\n```\n\n## \ud83d\udcc4 License\n\nThis project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.\n\n## \ud83d\ude4f Acknowledgments\n\n- Built with [Redis](https://redis.io/) for high-performance queuing\n- Inspired by production LLM processing challenges\n- Thanks to the open-source community for tools and libraries\n\n## \ud83d\udcde Support\n\n- **Issues**: [GitHub Issues](https://github.com/AryamanGurjar/llm-queue-task-manager/issues)\n- **Documentation**: [README](https://github.com/AryamanGurjar/llm-queue-task-manager/blob/master/README.md)\n- **Env Documentation**: [ENV_DOC](https://github.com/AryamanGurjar/llm-queue-task-manager/blob/master/ENV_DOC.md)\n\n---\n\n**Made with \u2764\ufe0f for the LLM community**\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "A Redis-based queue task manager for LLM processing with intelligent token budget management",
    "version": "0.1.3",
    "project_urls": {
        "Homepage": "https://github.com/yourusername/llm-queue-task-manager",
        "Issues": "https://github.com/yourusername/llm-queue-task-manager/issues",
        "Repository": "https://github.com/yourusername/llm-queue-task-manager"
    },
    "split_keywords": [
        "llm",
        " queue",
        " redis",
        " task-manager",
        " token-management",
        " ai"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "c89f778bd2202a238bfd7538029da28535844d27f9b1e7fee281d0a4d56d0ca6",
                "md5": "2def62f1d20d262619c2ee79a2e0a199",
                "sha256": "a370d95981d09e6fd6d7f6e226864a830868743129f44e23cbac8fbf7943a225"
            },
            "downloads": -1,
            "filename": "llm_queue_task_manager-0.1.3-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "2def62f1d20d262619c2ee79a2e0a199",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.8",
            "size": 39004,
            "upload_time": "2025-08-05T22:20:22",
            "upload_time_iso_8601": "2025-08-05T22:20:22.717428Z",
            "url": "https://files.pythonhosted.org/packages/c8/9f/778bd2202a238bfd7538029da28535844d27f9b1e7fee281d0a4d56d0ca6/llm_queue_task_manager-0.1.3-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "89ef071936bc94829f4b72a338bd9568cd2df96b86cf1db1999cdfd5b7697f52",
                "md5": "a1b3f3d9f26664c96dec9eff67d8b1a3",
                "sha256": "a8faa62757ba967a9478b3c9ecab3e461a18d385bf84da45f489d62a9f975c2f"
            },
            "downloads": -1,
            "filename": "llm_queue_task_manager-0.1.3.tar.gz",
            "has_sig": false,
            "md5_digest": "a1b3f3d9f26664c96dec9eff67d8b1a3",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8",
            "size": 37336,
            "upload_time": "2025-08-05T22:20:24",
            "upload_time_iso_8601": "2025-08-05T22:20:24.498854Z",
            "url": "https://files.pythonhosted.org/packages/89/ef/071936bc94829f4b72a338bd9568cd2df96b86cf1db1999cdfd5b7697f52/llm_queue_task_manager-0.1.3.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-08-05 22:20:24",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "yourusername",
    "github_project": "llm-queue-task-manager",
    "github_not_found": true,
    "lcname": "llm-queue-task-manager"
}
        
Elapsed time: 1.96919s