# LLM Queue Task Manager
A Redis-based queue system for managing LLM processing tasks with intelligent token budget management, priority queuing, and batch processing capabilities.
[](https://www.python.org/downloads/)
[](https://opensource.org/licenses/MIT)
[](https://redis.io/)
## π Features
- **π― Priority-based Queuing**: Automatically categorizes tasks by estimated token usage (low/medium/long)
- **π° Token Budget Management**: Prevents API quota exhaustion with intelligent rate limiting
- **β‘ Batch Processing**: Implements 3-2-1 cycle processing (3 low, 2 medium, 1 long priority tasks)
- **π Retry Logic**: Automatic retry with exponential backoff for failed requests
- **π Real-time Monitoring**: Comprehensive metrics and status tracking
- **π Extensible**: Easy to implement custom processors for different LLM providers
- **ποΈ Production Ready**: Built for high-throughput production environments
- **π‘οΈ Error Resilient**: Graceful error handling and recovery mechanisms
## π¦ Installation
### Using uv (recommended)
```bash
# Install uv if you haven't already
curl -LsSf https://astral.sh/uv/install.sh | sh
# Install the package
uv add llm-queue-task-manager
# With optional dependencies for specific providers
uv add "llm-queue-task-manager[aws]" # For AWS Bedrock
uv add "llm-queue-task-manager[openai]" # For OpenAI
uv add "llm-queue-task-manager[fastapi]" # For FastAPI integration
uv add "llm-queue-task-manager[all]" # All optional dependencies
### Using pip
```bash
pip install llm-queue-task-manager
# With optional dependencies
pip install "llm-queue-task-manager[aws,openai,fastapi]"
```
## π Quick Start
### 1. Basic Usage
```python
import asyncio
from llm_queue_manager import (
RedisQueueManager,
RedisBatchProcessor,
QueuedRequest,
BaseRequestProcessor
)
# Create a custom processor
class MyLLMProcessor(BaseRequestProcessor):
async def process_request(self, request_data):
# Your LLM processing logic here
prompt = request_data.get('prompt', '')
# ... process with your LLM provider
return {"response": f"Processed: {prompt}"}
# Initialize components
processor = MyLLMProcessor()
queue_manager = RedisQueueManager()
batch_processor = RedisBatchProcessor(processor, queue_manager)
# Start processing
async def main():
await batch_processor.start_processing()
# Submit a request
request = QueuedRequest.create(
request_data={'prompt': 'Hello, world!'},
estimated_tokens=1000
)
request_id = await queue_manager.add_request(request)
print(f"Request {request_id} queued")
# Check status
status = await queue_manager.get_request_status(request_id)
print(f"Status: {status}")
asyncio.run(main())
```
### 2. AWS Bedrock Integration
```python
from llm_queue_manager import RedisQueueManager, RedisBatchProcessor, QueuedRequest
from llm_queue_manager.processors.examples import BedrockProcessor
# Initialize with Bedrock processor
processor = BedrockProcessor(region_name='us-east-1')
queue_manager = RedisQueueManager()
batch_processor = RedisBatchProcessor(processor, queue_manager)
async def process_claude_request():
await batch_processor.start_processing()
request = QueuedRequest.create(
request_data={
'messages': [
{
'role': 'user',
'content': [{'text': 'Explain quantum computing'}]
}
],
'model': 'claude-3-sonnet',
'max_tokens': 2000,
'temperature': 0.7
},
estimated_tokens=10000
)
request_id = await queue_manager.add_request(request)
return request_id
```
### 3. OpenAI Integration
```python
from llm_queue_manager.processors.examples import OpenAIProcessor
# Initialize with OpenAI processor
processor = OpenAIProcessor(api_key="your-openai-api-key")
queue_manager = RedisQueueManager()
batch_processor = RedisBatchProcessor(processor, queue_manager)
async def process_gpt_request():
await batch_processor.start_processing()
request = QueuedRequest.create(
request_data={
'messages': [
{'role': 'user', 'content': 'Write a Python function for binary search'}
],
'model': 'gpt-4',
'max_tokens': 1000,
'temperature': 0.3
},
estimated_tokens=5000
)
request_id = await queue_manager.add_request(request)
return request_id
```
### 4. FastAPI Integration
```python
from fastapi import FastAPI
from llm_queue_manager import RedisQueueManager, RedisBatchProcessor, QueuedRequest
from your_processor import YourLLMProcessor
app = FastAPI()
processor = YourLLMProcessor()
queue_manager = RedisQueueManager()
batch_processor = RedisBatchProcessor(processor, queue_manager)
@app.on_event("startup")
async def startup():
await batch_processor.start_processing()
@app.post("/submit")
async def submit_request(prompt: str, max_tokens: int = 1000):
request = QueuedRequest.create(
request_data={'prompt': prompt, 'max_tokens': max_tokens},
estimated_tokens=max_tokens * 2
)
request_id = await queue_manager.add_request(request)
return {"request_id": request_id, "status": "queued"}
@app.get("/status/{request_id}")
async def get_status(request_id: str):
return await queue_manager.get_request_status(request_id)
@app.get("/metrics")
async def get_metrics():
return queue_manager.get_metrics()
```
## βοΈ Configuration
### Environment Variables
```bash
# Redis Configuration
REDIS_HOST=localhost # Redis host
REDIS_PORT=6379 # Redis port
REDIS_DB=0 # Redis database number
REDIS_PASSWORD=your_password # Redis password (optional)
REDIS_MAX_CONNECTIONS=20 # Maximum Redis connections
# Queue Configuration
LLM_QUEUE_TOTAL_TPM=1000000 # Total tokens per minute
LLM_QUEUE_API_ALLOCATION_PERCENT=40 # Percentage for API allocation
LLM_QUEUE_BATCH_LOW_COUNT=3 # Low priority batch size
LLM_QUEUE_BATCH_MEDIUM_COUNT=2 # Medium priority batch size
LLM_QUEUE_BATCH_LONG_COUNT=1 # Long priority batch size
LLM_QUEUE_MAX_EMPTY_CYCLES=5 # Max empty cycles before longer wait
LLM_QUEUE_QUOTA_WAIT_DURATION=65 # Seconds to wait for quota reset
```
### Redis Setup
```bash
# Using Docker
docker run -d --name redis -p 6379:6379 redis:latest
# Using Docker Compose
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
command: redis-server --appendonly yes
volumes:
redis_data:
```
## π§ Custom Processors
Create custom processors by extending `BaseRequestProcessor`:
```python
from llm_queue_manager import BaseRequestProcessor
import httpx
class CustomLLMProcessor(BaseRequestProcessor):
def __init__(self, api_key: str, base_url: str):
self.api_key = api_key
self.base_url = base_url
self.client = httpx.AsyncClient()
def get_required_fields(self):
return ['prompt', 'model']
async def validate_request(self, request_data):
# Custom validation logic
if not request_data.get('prompt'):
return False
return await super().validate_request(request_data)
async def process_request(self, request_data):
# Custom processing logic
response = await self.client.post(
f"{self.base_url}/completions",
headers={"Authorization": f"Bearer {self.api_key}"},
json={
"prompt": request_data['prompt'],
"model": request_data.get('model', 'default'),
"max_tokens": request_data.get('max_tokens', 1000)
}
)
return response.json()
async def estimate_tokens(self, request_data):
# Custom token estimation
prompt = request_data.get('prompt', '')
max_tokens = request_data.get('max_tokens', 1000)
return len(prompt) // 4 + max_tokens
async def handle_error(self, error, request_data):
# Custom error handling
if "rate_limit" in str(error).lower():
return None # Trigger retry
return {"error": str(error)}
```
## π Monitoring and Metrics
### Queue Metrics
```python
metrics = queue_manager.get_metrics()
print(f"Total in queue: {metrics['total_in_queue']}")
print(f"Success rate: {metrics['success_rate_percent']}%")
print(f"Token utilization: {metrics['token_usage']['utilization_percent']}%")
# Per-priority metrics
for priority in ['low', 'medium', 'long']:
completed = metrics[f'{priority}_completed']
failed = metrics[f'{priority}_failed']
print(f"{priority.upper()}: {completed} completed, {failed} failed")
```
### Processor Status
```python
status = batch_processor.get_processor_status()
print(f"Processing: {status['is_processing']}")
print(f"Current cycle: {status['current_cycle_position']}/6")
print(f"Current task type: {status['current_task_type']}")
print(f"Health status: {status['health_status']}")
print(f"Processing rate: {status['processing_rate_per_minute']} req/min")
```
### Health Checks
```python
# Test Redis connection
from llm_queue_manager.config import test_redis_connection
redis_healthy = test_redis_connection()
# Get system health
metrics = queue_manager.get_metrics()
health = metrics.get('system_health', {})
print(f"System status: {health.get('status', 'unknown')}")
print(f"Health score: {health.get('score', 0)}/100")
```
## ποΈ Architecture
The system consists of several key components:
### Core Components
1. **QueuedRequest**: Data model for requests with automatic task type classification
2. **RedisQueueManager**: Handles queue operations, token budgeting, and metrics
3. **RedisBatchProcessor**: Implements the 3-2-1 batch processing cycle
4. **BaseRequestProcessor**: Abstract base class for implementing custom LLM processors
5. **TokenEstimator**: Estimates token usage for different request types
### Processing Flow
```
Request Submission β Token Estimation β Priority Classification β Queue Assignment
β
Response Delivery β Result Processing β LLM API Call β Batch Processing β Queue Processing
```
### Priority System
Requests are automatically categorized into three priority levels:
- **π’ Low Priority** (< 15K tokens): Quick responses, simple prompts
- **π‘ Medium Priority** (15K-75K tokens): Standard requests, moderate complexity
- **π΄ Long Priority** (> 75K tokens): Complex prompts, detailed responses
The **3-2-1 processing cycle** ensures balanced throughput:
- 3 low priority requests
- 2 medium priority requests
- 1 long priority request
- Repeat cycle
## π‘οΈ Token Budget Management
The system implements intelligent token budget management:
- **π Per-minute quotas**: Tracks token usage per minute to prevent API limits
- **π‘οΈ Safety buffers**: Maintains 10% buffer to prevent quota exhaustion
- **βΈοΈ Automatic throttling**: Pauses processing when approaching limits
- **π Quota reset detection**: Automatically resumes when quotas reset
- **π Usage tracking**: Comprehensive token usage analytics
## π Error Handling and Resilience
### Retry Strategies
- **Exponential backoff**: Increasing delays between retry attempts
- **Error classification**: Different handling for different error types
- **Quota-aware retries**: Special handling for rate limit errors
- **Maximum retry limits**: Prevents infinite retry loops
### Error Types
- **Rate Limiting**: Automatic retry with backoff
- **Quota Exhaustion**: Queue pausing until reset
- **Validation Errors**: Request modification and retry
- **Authentication**: Immediate failure with clear error
- **Server Errors**: Retry with exponential backoff
- **Network Timeouts**: Retry with connection pooling
## π Examples
The package includes comprehensive examples:
- **`examples/basic_usage.py`**: Simple echo processor demonstration
- **`examples/fastapi_integration.py`**: Production FastAPI service
- **`examples/bedrock_example.py`**: AWS Bedrock with Claude models
- **`examples/openai_example.py`**: OpenAI GPT models with function calling
Run examples:
```bash
# Basic usage
python examples/basic_usage.py
# FastAPI service
python examples/fastapi_integration.py
# AWS Bedrock (requires AWS credentials)
python examples/bedrock_example.py
# OpenAI (requires OPENAI_API_KEY)
python examples/openai_example.py
```
## π Production Deployment
### Docker Deployment
```dockerfile
FROM python:3.11-slim
WORKDIR /app
# Install uv
RUN pip install uv
# Copy requirements
COPY pyproject.toml ./
RUN uv pip install --system -e ".[all]"
# Copy application
COPY . .
# Run application
CMD ["python", "-m", "your_app"]
```
### Kubernetes Deployment
```yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: llm-queue-manager
spec:
replicas: 3
selector:
matchLabels:
app: llm-queue-manager
template:
metadata:
labels:
app: llm-queue-manager
spec:
containers:
- name: app
image: your-registry/llm-queue-manager:latest
env:
- name: REDIS_HOST
value: "redis-service"
- name: LLM_QUEUE_TOTAL_TPM
value: "1000000"
ports:
- containerPort: 8000
livenessProbe:
httpGet:
path: /v1/queue/health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
name: redis-service
spec:
selector:
app: redis
ports:
- port: 6379
targetPort: 6379
```
### Monitoring Setup
```yaml
# Prometheus monitoring
- job_name: 'llm-queue-manager'
static_configs:
- targets: ['llm-queue-manager:8000']
metrics_path: '/v1/queue/metrics'
scrape_interval: 30s
```
## π€ Contributing
We welcome contributions! Please see our [Contributing Guide](CONTRIBUTING.md) for details.
### Development Setup
```bash
# Clone the repository
git clone https://github.com/AryamanGurjar/llm-queue-task-manager.git
cd llm-queue-task-manager
# Install uv
curl -LsSf https://astral.sh/uv/install.sh | sh
# Install dependencies
uv sync --all-extras --dev
# Install pre-commit hooks
uv run pre-commit install
# Run tests
uv run pytest
# Run examples
uv run python examples/basic_usage.py
```
### Code Quality
```bash
# Format code
uv run black src/ examples/
uv run isort src/ examples/
# Type checking
uv run mypy src/
# Linting
uv run flake8 src/ examples/
```
## π License
This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.
## π Acknowledgments
- Built with [Redis](https://redis.io/) for high-performance queuing
- Inspired by production LLM processing challenges
- Thanks to the open-source community for tools and libraries
## π Support
- **Issues**: [GitHub Issues](https://github.com/AryamanGurjar/llm-queue-task-manager/issues)
- **Documentation**: [README](https://github.com/AryamanGurjar/llm-queue-task-manager/blob/master/README.md)
- **Env Documentation**: [ENV_DOC](https://github.com/AryamanGurjar/llm-queue-task-manager/blob/master/ENV_DOC.md)
---
**Made with β€οΈ for the LLM community**
Raw data
{
"_id": null,
"home_page": null,
"name": "llm-queue-task-manager",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.8",
"maintainer_email": null,
"keywords": "llm, queue, redis, task-manager, token-management, ai",
"author": null,
"author_email": "Aryaman Gurjar <aryamangurjar6@gmail.com>",
"download_url": "https://files.pythonhosted.org/packages/89/ef/071936bc94829f4b72a338bd9568cd2df96b86cf1db1999cdfd5b7697f52/llm_queue_task_manager-0.1.3.tar.gz",
"platform": null,
"description": "\n# LLM Queue Task Manager\n\nA Redis-based queue system for managing LLM processing tasks with intelligent token budget management, priority queuing, and batch processing capabilities.\n\n[](https://www.python.org/downloads/)\n[](https://opensource.org/licenses/MIT)\n[](https://redis.io/)\n\n## \ud83d\ude80 Features\n\n- **\ud83c\udfaf Priority-based Queuing**: Automatically categorizes tasks by estimated token usage (low/medium/long)\n- **\ud83d\udcb0 Token Budget Management**: Prevents API quota exhaustion with intelligent rate limiting\n- **\u26a1 Batch Processing**: Implements 3-2-1 cycle processing (3 low, 2 medium, 1 long priority tasks)\n- **\ud83d\udd04 Retry Logic**: Automatic retry with exponential backoff for failed requests\n- **\ud83d\udcca Real-time Monitoring**: Comprehensive metrics and status tracking\n- **\ud83d\udd0c Extensible**: Easy to implement custom processors for different LLM providers\n- **\ud83c\udfd7\ufe0f Production Ready**: Built for high-throughput production environments\n- **\ud83d\udee1\ufe0f Error Resilient**: Graceful error handling and recovery mechanisms\n\n## \ud83d\udce6 Installation\n\n### Using uv (recommended)\n\n```bash\n# Install uv if you haven't already\ncurl -LsSf https://astral.sh/uv/install.sh | sh\n\n# Install the package\nuv add llm-queue-task-manager\n\n# With optional dependencies for specific providers\nuv add \"llm-queue-task-manager[aws]\" # For AWS Bedrock\nuv add \"llm-queue-task-manager[openai]\" # For OpenAI\nuv add \"llm-queue-task-manager[fastapi]\" # For FastAPI integration\nuv add \"llm-queue-task-manager[all]\" # All optional dependencies\n\n\n### Using pip\n\n```bash\npip install llm-queue-task-manager\n\n# With optional dependencies\npip install \"llm-queue-task-manager[aws,openai,fastapi]\"\n```\n\n## \ud83c\udfc3 Quick Start\n\n### 1. Basic Usage\n\n```python\nimport asyncio\nfrom llm_queue_manager import (\n RedisQueueManager, \n RedisBatchProcessor, \n QueuedRequest,\n BaseRequestProcessor\n)\n\n# Create a custom processor\nclass MyLLMProcessor(BaseRequestProcessor):\n async def process_request(self, request_data):\n # Your LLM processing logic here\n prompt = request_data.get('prompt', '')\n # ... process with your LLM provider\n return {\"response\": f\"Processed: {prompt}\"}\n\n# Initialize components\nprocessor = MyLLMProcessor()\nqueue_manager = RedisQueueManager()\nbatch_processor = RedisBatchProcessor(processor, queue_manager)\n\n# Start processing\nasync def main():\n await batch_processor.start_processing()\n \n # Submit a request\n request = QueuedRequest.create(\n request_data={'prompt': 'Hello, world!'},\n estimated_tokens=1000\n )\n \n request_id = await queue_manager.add_request(request)\n print(f\"Request {request_id} queued\")\n \n # Check status\n status = await queue_manager.get_request_status(request_id)\n print(f\"Status: {status}\")\n\nasyncio.run(main())\n```\n\n### 2. AWS Bedrock Integration\n\n```python\nfrom llm_queue_manager import RedisQueueManager, RedisBatchProcessor, QueuedRequest\nfrom llm_queue_manager.processors.examples import BedrockProcessor\n\n# Initialize with Bedrock processor\nprocessor = BedrockProcessor(region_name='us-east-1')\nqueue_manager = RedisQueueManager()\nbatch_processor = RedisBatchProcessor(processor, queue_manager)\n\nasync def process_claude_request():\n await batch_processor.start_processing()\n \n request = QueuedRequest.create(\n request_data={\n 'messages': [\n {\n 'role': 'user',\n 'content': [{'text': 'Explain quantum computing'}]\n }\n ],\n 'model': 'claude-3-sonnet',\n 'max_tokens': 2000,\n 'temperature': 0.7\n },\n estimated_tokens=10000\n )\n \n request_id = await queue_manager.add_request(request)\n return request_id\n```\n\n### 3. OpenAI Integration\n\n```python\nfrom llm_queue_manager.processors.examples import OpenAIProcessor\n\n# Initialize with OpenAI processor\nprocessor = OpenAIProcessor(api_key=\"your-openai-api-key\")\nqueue_manager = RedisQueueManager()\nbatch_processor = RedisBatchProcessor(processor, queue_manager)\n\nasync def process_gpt_request():\n await batch_processor.start_processing()\n \n request = QueuedRequest.create(\n request_data={\n 'messages': [\n {'role': 'user', 'content': 'Write a Python function for binary search'}\n ],\n 'model': 'gpt-4',\n 'max_tokens': 1000,\n 'temperature': 0.3\n },\n estimated_tokens=5000\n )\n \n request_id = await queue_manager.add_request(request)\n return request_id\n```\n\n### 4. FastAPI Integration\n\n```python\nfrom fastapi import FastAPI\nfrom llm_queue_manager import RedisQueueManager, RedisBatchProcessor, QueuedRequest\nfrom your_processor import YourLLMProcessor\n\napp = FastAPI()\nprocessor = YourLLMProcessor()\nqueue_manager = RedisQueueManager()\nbatch_processor = RedisBatchProcessor(processor, queue_manager)\n\n@app.on_event(\"startup\")\nasync def startup():\n await batch_processor.start_processing()\n\n@app.post(\"/submit\")\nasync def submit_request(prompt: str, max_tokens: int = 1000):\n request = QueuedRequest.create(\n request_data={'prompt': prompt, 'max_tokens': max_tokens},\n estimated_tokens=max_tokens * 2\n )\n \n request_id = await queue_manager.add_request(request)\n return {\"request_id\": request_id, \"status\": \"queued\"}\n\n@app.get(\"/status/{request_id}\")\nasync def get_status(request_id: str):\n return await queue_manager.get_request_status(request_id)\n\n@app.get(\"/metrics\")\nasync def get_metrics():\n return queue_manager.get_metrics()\n```\n\n## \u2699\ufe0f Configuration\n\n### Environment Variables\n\n```bash\n# Redis Configuration\nREDIS_HOST=localhost # Redis host\nREDIS_PORT=6379 # Redis port\nREDIS_DB=0 # Redis database number\nREDIS_PASSWORD=your_password # Redis password (optional)\nREDIS_MAX_CONNECTIONS=20 # Maximum Redis connections\n\n# Queue Configuration\nLLM_QUEUE_TOTAL_TPM=1000000 # Total tokens per minute\nLLM_QUEUE_API_ALLOCATION_PERCENT=40 # Percentage for API allocation\nLLM_QUEUE_BATCH_LOW_COUNT=3 # Low priority batch size\nLLM_QUEUE_BATCH_MEDIUM_COUNT=2 # Medium priority batch size\nLLM_QUEUE_BATCH_LONG_COUNT=1 # Long priority batch size\nLLM_QUEUE_MAX_EMPTY_CYCLES=5 # Max empty cycles before longer wait\nLLM_QUEUE_QUOTA_WAIT_DURATION=65 # Seconds to wait for quota reset\n```\n\n### Redis Setup\n\n```bash\n# Using Docker\ndocker run -d --name redis -p 6379:6379 redis:latest\n\n# Using Docker Compose\nversion: '3.8'\nservices:\n redis:\n image: redis:7-alpine\n ports:\n - \"6379:6379\"\n volumes:\n - redis_data:/data\n command: redis-server --appendonly yes\n\nvolumes:\n redis_data:\n```\n\n## \ud83d\udd27 Custom Processors\n\nCreate custom processors by extending `BaseRequestProcessor`:\n\n```python\nfrom llm_queue_manager import BaseRequestProcessor\nimport httpx\n\nclass CustomLLMProcessor(BaseRequestProcessor):\n def __init__(self, api_key: str, base_url: str):\n self.api_key = api_key\n self.base_url = base_url\n self.client = httpx.AsyncClient()\n \n def get_required_fields(self):\n return ['prompt', 'model']\n \n async def validate_request(self, request_data):\n # Custom validation logic\n if not request_data.get('prompt'):\n return False\n return await super().validate_request(request_data)\n \n async def process_request(self, request_data):\n # Custom processing logic\n response = await self.client.post(\n f\"{self.base_url}/completions\",\n headers={\"Authorization\": f\"Bearer {self.api_key}\"},\n json={\n \"prompt\": request_data['prompt'],\n \"model\": request_data.get('model', 'default'),\n \"max_tokens\": request_data.get('max_tokens', 1000)\n }\n )\n \n return response.json()\n \n async def estimate_tokens(self, request_data):\n # Custom token estimation\n prompt = request_data.get('prompt', '')\n max_tokens = request_data.get('max_tokens', 1000)\n return len(prompt) // 4 + max_tokens\n \n async def handle_error(self, error, request_data):\n # Custom error handling\n if \"rate_limit\" in str(error).lower():\n return None # Trigger retry\n return {\"error\": str(error)}\n```\n\n## \ud83d\udcca Monitoring and Metrics\n\n### Queue Metrics\n\n```python\nmetrics = queue_manager.get_metrics()\n\nprint(f\"Total in queue: {metrics['total_in_queue']}\")\nprint(f\"Success rate: {metrics['success_rate_percent']}%\")\nprint(f\"Token utilization: {metrics['token_usage']['utilization_percent']}%\")\n\n# Per-priority metrics\nfor priority in ['low', 'medium', 'long']:\n completed = metrics[f'{priority}_completed']\n failed = metrics[f'{priority}_failed']\n print(f\"{priority.upper()}: {completed} completed, {failed} failed\")\n```\n\n### Processor Status\n\n```python\nstatus = batch_processor.get_processor_status()\n\nprint(f\"Processing: {status['is_processing']}\")\nprint(f\"Current cycle: {status['current_cycle_position']}/6\")\nprint(f\"Current task type: {status['current_task_type']}\")\nprint(f\"Health status: {status['health_status']}\")\nprint(f\"Processing rate: {status['processing_rate_per_minute']} req/min\")\n```\n\n### Health Checks\n\n```python\n# Test Redis connection\nfrom llm_queue_manager.config import test_redis_connection\nredis_healthy = test_redis_connection()\n\n# Get system health\nmetrics = queue_manager.get_metrics()\nhealth = metrics.get('system_health', {})\nprint(f\"System status: {health.get('status', 'unknown')}\")\nprint(f\"Health score: {health.get('score', 0)}/100\")\n```\n\n## \ud83c\udfd7\ufe0f Architecture\n\nThe system consists of several key components:\n\n### Core Components\n\n1. **QueuedRequest**: Data model for requests with automatic task type classification\n2. **RedisQueueManager**: Handles queue operations, token budgeting, and metrics\n3. **RedisBatchProcessor**: Implements the 3-2-1 batch processing cycle\n4. **BaseRequestProcessor**: Abstract base class for implementing custom LLM processors\n5. **TokenEstimator**: Estimates token usage for different request types\n\n### Processing Flow\n\n```\nRequest Submission \u2192 Token Estimation \u2192 Priority Classification \u2192 Queue Assignment\n \u2193\nResponse Delivery \u2190 Result Processing \u2190 LLM API Call \u2190 Batch Processing \u2190 Queue Processing\n```\n\n### Priority System\n\nRequests are automatically categorized into three priority levels:\n\n- **\ud83d\udfe2 Low Priority** (< 15K tokens): Quick responses, simple prompts\n- **\ud83d\udfe1 Medium Priority** (15K-75K tokens): Standard requests, moderate complexity \n- **\ud83d\udd34 Long Priority** (> 75K tokens): Complex prompts, detailed responses\n\nThe **3-2-1 processing cycle** ensures balanced throughput:\n- 3 low priority requests\n- 2 medium priority requests \n- 1 long priority request\n- Repeat cycle\n\n## \ud83d\udee1\ufe0f Token Budget Management\n\nThe system implements intelligent token budget management:\n\n- **\ud83d\udcca Per-minute quotas**: Tracks token usage per minute to prevent API limits\n- **\ud83d\udee1\ufe0f Safety buffers**: Maintains 10% buffer to prevent quota exhaustion\n- **\u23f8\ufe0f Automatic throttling**: Pauses processing when approaching limits\n- **\ud83d\udd04 Quota reset detection**: Automatically resumes when quotas reset\n- **\ud83d\udcc8 Usage tracking**: Comprehensive token usage analytics\n\n## \ud83d\udd04 Error Handling and Resilience\n\n### Retry Strategies\n\n- **Exponential backoff**: Increasing delays between retry attempts\n- **Error classification**: Different handling for different error types\n- **Quota-aware retries**: Special handling for rate limit errors\n- **Maximum retry limits**: Prevents infinite retry loops\n\n### Error Types\n\n- **Rate Limiting**: Automatic retry with backoff\n- **Quota Exhaustion**: Queue pausing until reset\n- **Validation Errors**: Request modification and retry\n- **Authentication**: Immediate failure with clear error\n- **Server Errors**: Retry with exponential backoff\n- **Network Timeouts**: Retry with connection pooling\n\n## \ud83d\udcda Examples\n\nThe package includes comprehensive examples:\n\n- **`examples/basic_usage.py`**: Simple echo processor demonstration\n- **`examples/fastapi_integration.py`**: Production FastAPI service\n- **`examples/bedrock_example.py`**: AWS Bedrock with Claude models\n- **`examples/openai_example.py`**: OpenAI GPT models with function calling\n\nRun examples:\n\n```bash\n# Basic usage\npython examples/basic_usage.py\n\n# FastAPI service\npython examples/fastapi_integration.py\n\n# AWS Bedrock (requires AWS credentials)\npython examples/bedrock_example.py\n\n# OpenAI (requires OPENAI_API_KEY)\npython examples/openai_example.py\n```\n\n## \ud83d\ude80 Production Deployment\n\n### Docker Deployment\n\n```dockerfile\nFROM python:3.11-slim\n\nWORKDIR /app\n\n# Install uv\nRUN pip install uv\n\n# Copy requirements\nCOPY pyproject.toml ./\nRUN uv pip install --system -e \".[all]\"\n\n# Copy application\nCOPY . .\n\n# Run application\nCMD [\"python\", \"-m\", \"your_app\"]\n```\n\n### Kubernetes Deployment\n\n```yaml\napiVersion: apps/v1\nkind: Deployment\nmetadata:\n name: llm-queue-manager\nspec:\n replicas: 3\n selector:\n matchLabels:\n app: llm-queue-manager\n template:\n metadata:\n labels:\n app: llm-queue-manager\n spec:\n containers:\n - name: app\n image: your-registry/llm-queue-manager:latest\n env:\n - name: REDIS_HOST\n value: \"redis-service\"\n - name: LLM_QUEUE_TOTAL_TPM\n value: \"1000000\"\n ports:\n - containerPort: 8000\n livenessProbe:\n httpGet:\n path: /v1/queue/health\n port: 8000\n initialDelaySeconds: 30\n periodSeconds: 10\n---\napiVersion: v1\nkind: Service\nmetadata:\n name: redis-service\nspec:\n selector:\n app: redis\n ports:\n - port: 6379\n targetPort: 6379\n```\n\n### Monitoring Setup\n\n```yaml\n# Prometheus monitoring\n- job_name: 'llm-queue-manager'\n static_configs:\n - targets: ['llm-queue-manager:8000']\n metrics_path: '/v1/queue/metrics'\n scrape_interval: 30s\n```\n\n## \ud83e\udd1d Contributing\n\nWe welcome contributions! Please see our [Contributing Guide](CONTRIBUTING.md) for details.\n\n### Development Setup\n\n```bash\n# Clone the repository\ngit clone https://github.com/AryamanGurjar/llm-queue-task-manager.git\ncd llm-queue-task-manager\n\n# Install uv\ncurl -LsSf https://astral.sh/uv/install.sh | sh\n\n# Install dependencies\nuv sync --all-extras --dev\n\n# Install pre-commit hooks\nuv run pre-commit install\n\n# Run tests\nuv run pytest\n\n# Run examples\nuv run python examples/basic_usage.py\n```\n\n### Code Quality\n\n```bash\n# Format code\nuv run black src/ examples/\nuv run isort src/ examples/\n\n# Type checking\nuv run mypy src/\n\n# Linting\nuv run flake8 src/ examples/\n```\n\n## \ud83d\udcc4 License\n\nThis project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.\n\n## \ud83d\ude4f Acknowledgments\n\n- Built with [Redis](https://redis.io/) for high-performance queuing\n- Inspired by production LLM processing challenges\n- Thanks to the open-source community for tools and libraries\n\n## \ud83d\udcde Support\n\n- **Issues**: [GitHub Issues](https://github.com/AryamanGurjar/llm-queue-task-manager/issues)\n- **Documentation**: [README](https://github.com/AryamanGurjar/llm-queue-task-manager/blob/master/README.md)\n- **Env Documentation**: [ENV_DOC](https://github.com/AryamanGurjar/llm-queue-task-manager/blob/master/ENV_DOC.md)\n\n---\n\n**Made with \u2764\ufe0f for the LLM community**\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "A Redis-based queue task manager for LLM processing with intelligent token budget management",
"version": "0.1.3",
"project_urls": {
"Homepage": "https://github.com/yourusername/llm-queue-task-manager",
"Issues": "https://github.com/yourusername/llm-queue-task-manager/issues",
"Repository": "https://github.com/yourusername/llm-queue-task-manager"
},
"split_keywords": [
"llm",
" queue",
" redis",
" task-manager",
" token-management",
" ai"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "c89f778bd2202a238bfd7538029da28535844d27f9b1e7fee281d0a4d56d0ca6",
"md5": "2def62f1d20d262619c2ee79a2e0a199",
"sha256": "a370d95981d09e6fd6d7f6e226864a830868743129f44e23cbac8fbf7943a225"
},
"downloads": -1,
"filename": "llm_queue_task_manager-0.1.3-py3-none-any.whl",
"has_sig": false,
"md5_digest": "2def62f1d20d262619c2ee79a2e0a199",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.8",
"size": 39004,
"upload_time": "2025-08-05T22:20:22",
"upload_time_iso_8601": "2025-08-05T22:20:22.717428Z",
"url": "https://files.pythonhosted.org/packages/c8/9f/778bd2202a238bfd7538029da28535844d27f9b1e7fee281d0a4d56d0ca6/llm_queue_task_manager-0.1.3-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "89ef071936bc94829f4b72a338bd9568cd2df96b86cf1db1999cdfd5b7697f52",
"md5": "a1b3f3d9f26664c96dec9eff67d8b1a3",
"sha256": "a8faa62757ba967a9478b3c9ecab3e461a18d385bf84da45f489d62a9f975c2f"
},
"downloads": -1,
"filename": "llm_queue_task_manager-0.1.3.tar.gz",
"has_sig": false,
"md5_digest": "a1b3f3d9f26664c96dec9eff67d8b1a3",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.8",
"size": 37336,
"upload_time": "2025-08-05T22:20:24",
"upload_time_iso_8601": "2025-08-05T22:20:24.498854Z",
"url": "https://files.pythonhosted.org/packages/89/ef/071936bc94829f4b72a338bd9568cd2df96b86cf1db1999cdfd5b7697f52/llm_queue_task_manager-0.1.3.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-08-05 22:20:24",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "yourusername",
"github_project": "llm-queue-task-manager",
"github_not_found": true,
"lcname": "llm-queue-task-manager"
}