# KafkaBoost 🚀
**KafkaBoost** is an enhanced Apache Kafka library that extends standard Kafka functionality with priority-based message processing, automatic topic management, and intelligent consumer orchestration.
## 🌟 Key Features
### 🎯 Priority-Based Message Processing
- **Automatic Priority Detection**: Detects priority by topic name, message content rules, or manual specification
- **Priority Boost Mode**: Routes messages to priority-specific topics and serves highest priority first
- **Standard Mode**: Sorts messages by priority field within batches
- **Dynamic Consumer Management**: Automatically pauses/resumes consumers based on priority
### 🔧 Automatic Topic Management
- **Smart Topic Creation**: Automatically creates priority-specific topics with configurable partitions
- **S3 Configuration Integration**: Manages topic configurations through S3
- **Dynamic Configuration Updates**: Supports runtime configuration changes
### ⚡ Enhanced Consumer Experience
- **Intelligent Partitioning**: Configurable partition counts per priority level
- **Consumer Group Management**: Unique group IDs for each priority level
- **Priority-First Consumption**: Always serves highest priority messages first
## 🏗️ Architecture
```
┌─────────────────────────────────────────────────────────────────┐
│ KafkaBoost Wrapper │
│ │
│ ┌─────────────────┐ ┌──────────────────┐ ┌─────────────┐ │
│ │ Kafkaboost │ │ Kafkaboost │ │ Kafkaboost │ │
│ │ Producer │ │ Consumer │ │ Config │ │
│ │ │ │ │ │ Manager │ │
│ │ • Priority │ │ • Priority │ │ │ │
│ │ Routing │ │ Queues │ │ • Auto │ │
│ │ • S3 Config │ │ • Smart Polling │ │ Topic │ │
│ │ • Enhanced │ │ • Consumer │ │ Creation │ │
│ │ Kafka Client │ │ Management │ │ • S3 │ │
│ └─────────────────┘ └──────────────────┘ │ Config │ │
│ │ │ │ Manager │ │
│ ▼ ▼ └─────────────┘ │
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ Apache Kafka (Black Box) ││
│ │ ││
│ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ││
│ │ │ base_topic │ │base_topic_5 │ │base_topic_7 │ ... ││
│ │ └─────────────┘ └─────────────┘ └─────────────┘ ││
│ │ ││
│ │ • Message Storage • Partitioning • Replication ││
│ │ • Consumer Groups • Offset Management • Ordering ││
│ └─────────────────────────────────────────────────────────────┘│
└─────────────────────────────────────────────────────────────────┘
```
**KafkaBoost enhances Kafka by:**
- 🎯 **Automatic priority detection** by topic, rules, or manual specification
- 🔧 **Priority-based routing** to topic variants
- 📊 **Automatic topic creation** with configurable partitions
- ⚙️ **Smart consumer management** with priority queues
- 🚀 **S3 configuration integration** for dynamic settings
- 📈 **Priority-first consumption** for optimal message processing
## 🚀 Quick Start
### Installation
```bash
pip install kafkaboost
```
### Basic Usage
#### Step 1: Configure Your Settings
1. **Visit the KafkaBoost Configuration Website**:https://master.d1disovd4gm7yp.amplifyapp.com
2. **Login** to your account
3. **Select your required configuration** (topics, priorities, partitions, etc.)
4. **Copy your User ID** from the dashboard
#### Step 2: Use KafkaBoost (Just like Kafka + User ID)
```python
from kafkaboost.consumer import KafkaboostConsumer
from kafkaboost.producer import KafkaboostProducer
# Producer with priority routing (just add user_id to your existing Kafka code)
producer = KafkaboostProducer(
bootstrap_servers=['localhost:9092'],
user_id='your-user-id-from-website' # Copy from configuration website
)
# Send messages with different priorities
producer.send('orders', {'order_id': 1, 'priority': 5})
producer.send('orders', {'order_id': 2, 'priority': 10}) # Higher priority
# Consumer with priority boost (just add user_id to your existing Kafka code)
consumer = KafkaboostConsumer(
bootstrap_servers=['localhost:9092'],
topics=['orders'],
group_id='priority_group',
user_id='your-user-id-from-website' # Copy from configuration website
)
# Messages are automatically served by priority (10 first, then 5)
messages = consumer.poll(timeout_ms=1000)
```
#### Step 3: That's It!
- ✅ **No configuration files needed** - everything is managed on the website
- ✅ **Automatic topic creation** - topics are created based on your configuration
- ✅ **Priority routing** - messages are automatically routed to priority topics
- ✅ **Smart consumption** - highest priority messages are served first
## 📋 Configuration
### S3 Configuration Structure
```json
{
"user_id": "user123",
"max_priority": 10,
"default_priority": 0,
"Priority_boost": [
{
"topic_name": "orders",
"priority_boost_min_value": 5,
"number_of_partitions": 9
}
],
"Topics_priority": [
{
"topic": "notifications",
"priority": 8
}
],
"Rule_Base_priority": [
{
"role_name": "admin",
"value": "high",
"priority": 9
}
]
}
```
### Configuration Parameters
| Parameter | Description | Default |
|-----------|-------------|---------|
| `topic_name` | Base topic name for priority routing | Required |
| `priority_boost_min_value` | Minimum priority level for boost mode | 0 |
| `number_of_partitions` | Number of partitions for priority topics | 1 |
| `max_priority` | Maximum priority level supported | 10 |
## 🎯 Automatic Priority Detection
KafkaBoost automatically detects message priority using three methods:
### 1. **Topic-Based Priority** (`Topics_priority`)
Messages sent to specific topics automatically get assigned priority:
```json
"Topics_priority": [
{
"topic": "urgent_orders",
"priority": 9
},
{
"topic": "notifications",
"priority": 7
},
{
"topic": "reports",
"priority": 3
}
]
```
**Usage:**
```python
# Messages to 'urgent_orders' automatically get priority 9
producer.send('urgent_orders', {'order_id': 123, 'customer': 'VIP'})
# Messages to 'notifications' automatically get priority 7
producer.send('notifications', {'message': 'Order shipped'})
# Messages to 'reports' automatically get priority 3
producer.send('reports', {'report_type': 'daily_summary'})
```
### 2. **Rule-Based Priority** (`Rule_Base_priority`)
Messages are prioritized based on content rules:
```json
"Rule_Base_priority": [
{
"role_name": "user_role",
"value": "admin",
"priority": 9
},
{
"role_name": "user_role",
"value": "premium",
"priority": 7
},
{
"role_name": "order_type",
"value": "express",
"priority": 8
}
]
```
**Usage:**
```python
# Message with admin role gets priority 9
producer.send('orders', {
'order_id': 123,
'user_role': 'admin', # Matches rule: priority 9
'amount': 100
})
# Message with premium user gets priority 7
producer.send('orders', {
'order_id': 124,
'user_role': 'premium', # Matches rule: priority 7
'amount': 50
})
# Message with express order gets priority 8
producer.send('orders', {
'order_id': 125,
'order_type': 'express', # Matches rule: priority 8
'amount': 75
})
```
### 3. **Manual Priority** (Fallback)
If no automatic rules match, you can still specify priority manually:
```python
# Manual priority override
producer.send('orders', {
'order_id': 126,
'amount': 200
}, priority=10) # Explicit priority 10
```
### **Priority Resolution Order:**
1. **Manual priority** (if specified) - Highest precedence
2. **Rule-based priority** (if message matches rules)
3. **Topic-based priority** (if topic has priority configured)
4. **Default priority** (from configuration)
## 🔄 Priority Boost Mode
### How It Works
1. **Topic Discovery**: Automatically finds priority-specific topics (e.g., `orders_5`, `orders_7`, `orders_10`)
2. **Consumer Creation**: Creates separate consumers for each priority level
3. **Smart Polling**: Serves messages from highest priority first
4. **Dynamic Management**: Pauses lower priority consumers when higher priority has messages
### Topic Naming Convention
Priority topics follow the pattern: `{base_topic}_{priority_level}`
Examples:
- `orders_0` - Lowest priority orders
- `orders_5` - Medium priority orders
- `orders_10` - Highest priority orders
### Consumer Group Management
Each priority level gets its own consumer group:
- `group_id_base` - For base topic
- `group_id_priority_5` - For priority 5 topics
- `group_id_priority_10` - For priority 10 topics
## 🛠️ Advanced Usage
### Producer with Priority Routing
```python
from kafkaboost.producer import KafkaboostProducer
producer = KafkaboostProducer(
bootstrap_servers=['localhost:9092'],
user_id='user123'
)
# Messages are automatically routed to priority topics
producer.send('orders', {
'order_id': 123,
'customer_id': 'cust_456',
'amount': 99.99
}, priority=10) # Goes to orders_10 topic
producer.send('orders', {
'order_id': 124,
'customer_id': 'cust_789',
'amount': 49.99
}, priority=5) # Goes to orders_5 topic
```
### Continuous Message Processing
```python
from kafkaboost.consumer import KafkaboostConsumer
# Create consumer with your user ID from the website
consumer = KafkaboostConsumer(
bootstrap_servers=['localhost:9092'],
topics=['orders'],
group_id='order_processing_group',
user_id='your-user-id-from-website'
)
try:
while True:
# Poll for messages (highest priority first)
messages = consumer.poll(timeout_ms=1000)
for msg in messages:
# Get message data
order_data = msg.value
priority = order_data.get('priority', 0)
order_id = order_data.get('order_id')
print(f"Processing order {order_id} with priority {priority}")
# Process the order based on priority
if priority >= 8:
print(f"🚨 URGENT: Processing high-priority order {order_id}")
elif priority >= 5:
print(f"⚡ Processing medium-priority order {order_id}")
else:
print(f"📋 Processing standard order {order_id}")
except KeyboardInterrupt:
print("Stopping consumer...")
finally:
consumer.close()
```
### Working with Multiple Topics
```python
from kafkaboost.consumer import KafkaboostConsumer
# Consumer can handle multiple topics
consumer = KafkaboostConsumer(
bootstrap_servers=['localhost:9092'],
topics=['orders', 'notifications', 'payments'],
group_id='multi_topic_group',
user_id='your-user-id-from-website'
)
try:
while True:
messages = consumer.poll(timeout_ms=1000)
for msg in messages:
topic = msg.topic
data = msg.value
# Handle different message types
if 'orders' in topic:
print(f"📦 Order message: {data}")
elif 'notifications' in topic:
print(f"🔔 Notification: {data}")
elif 'payments' in topic:
print(f"💳 Payment: {data}")
except KeyboardInterrupt:
print("Stopping consumer...")
finally:
consumer.close()
```
### Configuration Management
```python
from kafkaboost.kafka_utils import KafkaConfigManager
# Initialize config manager
config_manager = KafkaConfigManager(
bootstrap_servers='localhost:9092',
user_id='user123'
)
# Ensure priority topics exist
config_manager.check_and_create_priority_topics()
# Get configuration summary
summary = config_manager.get_config_summary()
print(f"Max priority: {summary['max_priority']}")
print(f"Topics count: {summary['topics_count']}")
```
## 🔧 Automatic Topic Creation
### Features
- **Configurable Partitions**: Each priority topic can have different partition counts
- **Idempotent Creation**: Won't create topics that already exist
- **Error Handling**: Graceful handling of creation failures
- **S3 Integration**: Uses S3 configuration for topic specifications
### Example
```python
# Topics are automatically created based on configuration
# For config: {"topic_name": "orders", "priority_boost_min_value": 5, "number_of_partitions": 9}
# Creates:
# - orders_5 (9 partitions)
# - orders_6 (9 partitions)
# - orders_7 (9 partitions)
# - orders_8 (9 partitions)
# - orders_9 (9 partitions)
# - orders_10 (9 partitions)
```
## 📊 Monitoring and Debugging
### Configuration Summary
```python
# Get detailed configuration information
summary = consumer.get_config_summary()
print(f"Priority boost enabled: {summary['priority_boost_enabled']}")
print(f"Current subscription: {summary['current_subscription']}")
print(f"Max priority: {summary['max_priority']}")
```
### Consumer State
```python
# Check consumer status
print(f"Priority boost enabled: {consumer.priority_boost_enabled}")
print(f"Active consumers: {len(consumer.priority_consumer_manager.consumers)}")
print(f"Current subscription: {consumer.current_subscription}")
```
## 🚨 Troubleshooting
### Debug Mode
```python
import logging
logging.basicConfig(level=logging.DEBUG)
# Enable debug logging for detailed information
consumer = KafkaboostConsumer(
bootstrap_servers=['localhost:9092'],
topics=['orders'],
user_id='user123'
)
```
## 📚 API Reference
### KafkaboostConsumer
#### Constructor Parameters
- `bootstrap_servers`: Kafka server address(es)
- `topics`: Topic(s) to consume from
- `group_id`: Consumer group ID
- `user_id`: User ID for S3 config lookup (enables priority boost)
- `auto_offset_reset`: Offset reset strategy ('earliest', 'latest', 'none')
- `**kwargs`: Additional KafkaConsumer parameters
#### Key Methods
- `poll(timeout_ms=1000, max_records=None)`: Poll for messages (highest priority first)
- `refresh_config()`: Refresh configuration from S3
- `get_config_summary()`: Get configuration summary
- `close()`: Close consumer and cleanup
### KafkaboostProducer
#### Constructor Parameters
- `bootstrap_servers`: Kafka server address(es)
- `user_id`: User ID for S3 config lookup
- `**kwargs`: Additional KafkaProducer parameters
#### Key Methods
- `send(topic, value, priority=None)`: Send message with optional priority
- `close()`: Close producer
### KafkaConfigManager
#### Constructor Parameters
- `bootstrap_servers`: Kafka server address(es)
- `user_id`: User ID for S3 config lookup
#### Key Methods
- `get_config_summary()`: Get configuration summary
- `find_matching_topics(base_topics)`: Find priority topic variants
## 🔄 Migration Guide
### Step 1: Configure on Website
1. **Visit**: [https://master.d1hgz5clxamnqf.amplifyapp.com/](https://master.d1hgz5clxamnqf.amplifyapp.com/)
2. **Login** and configure your topics, priorities, and settings
3. **Copy your User ID** from the dashboard
### Step 2: Update Your Code
#### From Standard Kafka Consumer
```python
# Before
from kafka import KafkaConsumer
consumer = KafkaConsumer('orders', bootstrap_servers=['localhost:9092'])
# After (just add user_id!)
from kafkaboost.consumer import KafkaboostConsumer
consumer = KafkaboostConsumer(
bootstrap_servers=['localhost:9092'],
topics=['orders'],
user_id='your-user-id-from-website' # Copy from configuration website
)
```
#### From Standard Kafka Producer
```python
# Before
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
# After (just add user_id!)
from kafkaboost.producer import KafkaboostProducer
producer = KafkaboostProducer(
bootstrap_servers=['localhost:9092'],
user_id='your-user-id-from-website' # Copy from configuration website
)
```
## 🏆 Best Practices
### Topic Design
- Use descriptive base topic names
- Keep priority levels manageable (0-10 recommended)
- Ensure consistent naming across environments
### Consumer Groups
- Use different group IDs for different priority requirements
- Consider separate consumers for different priority ranges
- Monitor consumer group rebalancing
### Performance
- Priority boost mode is most effective with high message volumes
- Consider batch sizes for optimal throughput
- Monitor partition assignment and rebalancing
### Error Handling
- Always close consumers in finally blocks
- Handle configuration refresh errors gracefully
- Monitor partition pausing/resuming for performance
## 📦 Dependencies
- `kafka-python` - Core Kafka functionality
- `boto3` - S3 configuration management
## 📄 License
This project extends the existing kafkaboost library with priority-aware features while maintaining backward compatibility.
## 🤝 Contributing
1. Fork the repository
2. Create a feature branch
3. Add tests for new functionality
4. Ensure all tests pass
5. Submit a pull request
---
**KafkaBoost** - Making Kafka priority-aware and production-ready! 🚀
Raw data
{
"_id": null,
"home_page": "https://github.com/your-org/kafkaboost",
"name": "kafkaboost",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.7",
"maintainer_email": "KafkaBoost Team <support@kafkaboost.com>",
"keywords": "kafka, priority, messaging, distributed, streaming, async",
"author": "KafkaBoost Team",
"author_email": "KafkaBoost Team <support@kafkaboost.com>",
"download_url": "https://files.pythonhosted.org/packages/ad/ad/28961bec882eca4379391442093409e89bde6f1c996860e2c1e0edb64f7a/kafkaboost-0.2.0.tar.gz",
"platform": null,
"description": "# KafkaBoost \ud83d\ude80\n\n**KafkaBoost** is an enhanced Apache Kafka library that extends standard Kafka functionality with priority-based message processing, automatic topic management, and intelligent consumer orchestration.\n\n## \ud83c\udf1f Key Features\n\n### \ud83c\udfaf Priority-Based Message Processing\n- **Automatic Priority Detection**: Detects priority by topic name, message content rules, or manual specification\n- **Priority Boost Mode**: Routes messages to priority-specific topics and serves highest priority first\n- **Standard Mode**: Sorts messages by priority field within batches\n- **Dynamic Consumer Management**: Automatically pauses/resumes consumers based on priority\n\n### \ud83d\udd27 Automatic Topic Management\n- **Smart Topic Creation**: Automatically creates priority-specific topics with configurable partitions\n- **S3 Configuration Integration**: Manages topic configurations through S3\n- **Dynamic Configuration Updates**: Supports runtime configuration changes\n\n### \u26a1 Enhanced Consumer Experience\n- **Intelligent Partitioning**: Configurable partition counts per priority level\n- **Consumer Group Management**: Unique group IDs for each priority level\n- **Priority-First Consumption**: Always serves highest priority messages first\n\n## \ud83c\udfd7\ufe0f Architecture\n\n```\n\u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510\n\u2502 KafkaBoost Wrapper \u2502\n\u2502 \u2502\n\u2502 \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510 \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510 \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510 \u2502\n\u2502 \u2502 Kafkaboost \u2502 \u2502 Kafkaboost \u2502 \u2502 Kafkaboost \u2502 \u2502\n\u2502 \u2502 Producer \u2502 \u2502 Consumer \u2502 \u2502 Config \u2502 \u2502\n\u2502 \u2502 \u2502 \u2502 \u2502 \u2502 Manager \u2502 \u2502\n\u2502 \u2502 \u2022 Priority \u2502 \u2502 \u2022 Priority \u2502 \u2502 \u2502 \u2502\n\u2502 \u2502 Routing \u2502 \u2502 Queues \u2502 \u2502 \u2022 Auto \u2502 \u2502\n\u2502 \u2502 \u2022 S3 Config \u2502 \u2502 \u2022 Smart Polling \u2502 \u2502 Topic \u2502 \u2502\n\u2502 \u2502 \u2022 Enhanced \u2502 \u2502 \u2022 Consumer \u2502 \u2502 Creation \u2502 \u2502\n\u2502 \u2502 Kafka Client \u2502 \u2502 Management \u2502 \u2502 \u2022 S3 \u2502 \u2502\n\u2502 \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518 \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518 \u2502 Config \u2502 \u2502\n\u2502 \u2502 \u2502 \u2502 Manager \u2502 \u2502\n\u2502 \u25bc \u25bc \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518 \u2502\n\u2502 \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510\u2502\n\u2502 \u2502 Apache Kafka (Black Box) \u2502\u2502\n\u2502 \u2502 \u2502\u2502\n\u2502 \u2502 \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510 \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510 \u250c\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2510 \u2502\u2502\n\u2502 \u2502 \u2502 base_topic \u2502 \u2502base_topic_5 \u2502 \u2502base_topic_7 \u2502 ... \u2502\u2502\n\u2502 \u2502 \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518 \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518 \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518 \u2502\u2502\n\u2502 \u2502 \u2502\u2502\n\u2502 \u2502 \u2022 Message Storage \u2022 Partitioning \u2022 Replication \u2502\u2502\n\u2502 \u2502 \u2022 Consumer Groups \u2022 Offset Management \u2022 Ordering \u2502\u2502\n\u2502 \u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518\u2502\n\u2514\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2518\n```\n\n**KafkaBoost enhances Kafka by:**\n- \ud83c\udfaf **Automatic priority detection** by topic, rules, or manual specification\n- \ud83d\udd27 **Priority-based routing** to topic variants\n- \ud83d\udcca **Automatic topic creation** with configurable partitions \n- \u2699\ufe0f **Smart consumer management** with priority queues\n- \ud83d\ude80 **S3 configuration integration** for dynamic settings\n- \ud83d\udcc8 **Priority-first consumption** for optimal message processing\n\n## \ud83d\ude80 Quick Start\n\n### Installation\n\n```bash\npip install kafkaboost\n```\n\n### Basic Usage\n\n#### Step 1: Configure Your Settings\n1. **Visit the KafkaBoost Configuration Website**:https://master.d1disovd4gm7yp.amplifyapp.com\n2. **Login** to your account\n3. **Select your required configuration** (topics, priorities, partitions, etc.)\n4. **Copy your User ID** from the dashboard\n\n#### Step 2: Use KafkaBoost (Just like Kafka + User ID)\n\n```python\nfrom kafkaboost.consumer import KafkaboostConsumer\nfrom kafkaboost.producer import KafkaboostProducer\n\n# Producer with priority routing (just add user_id to your existing Kafka code)\nproducer = KafkaboostProducer(\n bootstrap_servers=['localhost:9092'],\n user_id='your-user-id-from-website' # Copy from configuration website\n)\n\n# Send messages with different priorities\nproducer.send('orders', {'order_id': 1, 'priority': 5})\nproducer.send('orders', {'order_id': 2, 'priority': 10}) # Higher priority\n\n# Consumer with priority boost (just add user_id to your existing Kafka code)\nconsumer = KafkaboostConsumer(\n bootstrap_servers=['localhost:9092'],\n topics=['orders'],\n group_id='priority_group',\n user_id='your-user-id-from-website' # Copy from configuration website\n)\n\n# Messages are automatically served by priority (10 first, then 5)\nmessages = consumer.poll(timeout_ms=1000)\n```\n\n#### Step 3: That's It!\n- \u2705 **No configuration files needed** - everything is managed on the website\n- \u2705 **Automatic topic creation** - topics are created based on your configuration\n- \u2705 **Priority routing** - messages are automatically routed to priority topics\n- \u2705 **Smart consumption** - highest priority messages are served first\n\n## \ud83d\udccb Configuration\n\n### S3 Configuration Structure\n\n```json\n{\n \"user_id\": \"user123\",\n \"max_priority\": 10,\n \"default_priority\": 0,\n \"Priority_boost\": [\n {\n \"topic_name\": \"orders\",\n \"priority_boost_min_value\": 5,\n \"number_of_partitions\": 9\n }\n ],\n \"Topics_priority\": [\n {\n \"topic\": \"notifications\",\n \"priority\": 8\n }\n ],\n \"Rule_Base_priority\": [\n {\n \"role_name\": \"admin\",\n \"value\": \"high\",\n \"priority\": 9\n }\n ]\n}\n```\n\n### Configuration Parameters\n\n| Parameter | Description | Default |\n|-----------|-------------|---------|\n| `topic_name` | Base topic name for priority routing | Required |\n| `priority_boost_min_value` | Minimum priority level for boost mode | 0 |\n| `number_of_partitions` | Number of partitions for priority topics | 1 |\n| `max_priority` | Maximum priority level supported | 10 |\n\n## \ud83c\udfaf Automatic Priority Detection\n\nKafkaBoost automatically detects message priority using three methods:\n\n### 1. **Topic-Based Priority** (`Topics_priority`)\nMessages sent to specific topics automatically get assigned priority:\n\n```json\n\"Topics_priority\": [\n {\n \"topic\": \"urgent_orders\",\n \"priority\": 9\n },\n {\n \"topic\": \"notifications\", \n \"priority\": 7\n },\n {\n \"topic\": \"reports\",\n \"priority\": 3\n }\n]\n```\n\n**Usage:**\n```python\n# Messages to 'urgent_orders' automatically get priority 9\nproducer.send('urgent_orders', {'order_id': 123, 'customer': 'VIP'})\n\n# Messages to 'notifications' automatically get priority 7 \nproducer.send('notifications', {'message': 'Order shipped'})\n\n# Messages to 'reports' automatically get priority 3\nproducer.send('reports', {'report_type': 'daily_summary'})\n```\n\n### 2. **Rule-Based Priority** (`Rule_Base_priority`)\nMessages are prioritized based on content rules:\n\n```json\n\"Rule_Base_priority\": [\n {\n \"role_name\": \"user_role\",\n \"value\": \"admin\", \n \"priority\": 9\n },\n {\n \"role_name\": \"user_role\",\n \"value\": \"premium\",\n \"priority\": 7\n },\n {\n \"role_name\": \"order_type\",\n \"value\": \"express\",\n \"priority\": 8\n }\n]\n```\n\n**Usage:**\n```python\n# Message with admin role gets priority 9\nproducer.send('orders', {\n 'order_id': 123,\n 'user_role': 'admin', # Matches rule: priority 9\n 'amount': 100\n})\n\n# Message with premium user gets priority 7\nproducer.send('orders', {\n 'order_id': 124, \n 'user_role': 'premium', # Matches rule: priority 7\n 'amount': 50\n})\n\n# Message with express order gets priority 8\nproducer.send('orders', {\n 'order_id': 125,\n 'order_type': 'express', # Matches rule: priority 8\n 'amount': 75\n})\n```\n\n### 3. **Manual Priority** (Fallback)\nIf no automatic rules match, you can still specify priority manually:\n\n```python\n# Manual priority override\nproducer.send('orders', {\n 'order_id': 126,\n 'amount': 200\n}, priority=10) # Explicit priority 10\n```\n\n### **Priority Resolution Order:**\n1. **Manual priority** (if specified) - Highest precedence\n2. **Rule-based priority** (if message matches rules)\n3. **Topic-based priority** (if topic has priority configured)\n4. **Default priority** (from configuration)\n\n## \ud83d\udd04 Priority Boost Mode\n\n### How It Works\n\n1. **Topic Discovery**: Automatically finds priority-specific topics (e.g., `orders_5`, `orders_7`, `orders_10`)\n2. **Consumer Creation**: Creates separate consumers for each priority level\n3. **Smart Polling**: Serves messages from highest priority first\n4. **Dynamic Management**: Pauses lower priority consumers when higher priority has messages\n\n### Topic Naming Convention\n\nPriority topics follow the pattern: `{base_topic}_{priority_level}`\n\nExamples:\n- `orders_0` - Lowest priority orders\n- `orders_5` - Medium priority orders \n- `orders_10` - Highest priority orders\n\n### Consumer Group Management\n\nEach priority level gets its own consumer group:\n- `group_id_base` - For base topic\n- `group_id_priority_5` - For priority 5 topics\n- `group_id_priority_10` - For priority 10 topics\n\n## \ud83d\udee0\ufe0f Advanced Usage\n\n### Producer with Priority Routing\n\n```python\nfrom kafkaboost.producer import KafkaboostProducer\n\nproducer = KafkaboostProducer(\n bootstrap_servers=['localhost:9092'],\n user_id='user123'\n)\n\n# Messages are automatically routed to priority topics\nproducer.send('orders', {\n 'order_id': 123,\n 'customer_id': 'cust_456',\n 'amount': 99.99\n}, priority=10) # Goes to orders_10 topic\n\nproducer.send('orders', {\n 'order_id': 124,\n 'customer_id': 'cust_789',\n 'amount': 49.99\n}, priority=5) # Goes to orders_5 topic\n```\n\n### Continuous Message Processing\n\n```python\nfrom kafkaboost.consumer import KafkaboostConsumer\n\n# Create consumer with your user ID from the website\nconsumer = KafkaboostConsumer(\n bootstrap_servers=['localhost:9092'],\n topics=['orders'],\n group_id='order_processing_group',\n user_id='your-user-id-from-website'\n)\n\ntry:\n while True:\n # Poll for messages (highest priority first)\n messages = consumer.poll(timeout_ms=1000)\n \n for msg in messages:\n # Get message data\n order_data = msg.value\n priority = order_data.get('priority', 0)\n order_id = order_data.get('order_id')\n \n print(f\"Processing order {order_id} with priority {priority}\")\n \n # Process the order based on priority\n if priority >= 8:\n print(f\"\ud83d\udea8 URGENT: Processing high-priority order {order_id}\")\n elif priority >= 5:\n print(f\"\u26a1 Processing medium-priority order {order_id}\")\n else:\n print(f\"\ud83d\udccb Processing standard order {order_id}\")\n \nexcept KeyboardInterrupt:\n print(\"Stopping consumer...\")\nfinally:\n consumer.close()\n```\n\n### Working with Multiple Topics\n\n```python\nfrom kafkaboost.consumer import KafkaboostConsumer\n\n# Consumer can handle multiple topics\nconsumer = KafkaboostConsumer(\n bootstrap_servers=['localhost:9092'],\n topics=['orders', 'notifications', 'payments'],\n group_id='multi_topic_group',\n user_id='your-user-id-from-website'\n)\n\ntry:\n while True:\n messages = consumer.poll(timeout_ms=1000)\n \n for msg in messages:\n topic = msg.topic\n data = msg.value\n \n # Handle different message types\n if 'orders' in topic:\n print(f\"\ud83d\udce6 Order message: {data}\")\n elif 'notifications' in topic:\n print(f\"\ud83d\udd14 Notification: {data}\")\n elif 'payments' in topic:\n print(f\"\ud83d\udcb3 Payment: {data}\")\n \nexcept KeyboardInterrupt:\n print(\"Stopping consumer...\")\nfinally:\n consumer.close()\n```\n\n### Configuration Management\n\n```python\nfrom kafkaboost.kafka_utils import KafkaConfigManager\n\n# Initialize config manager\nconfig_manager = KafkaConfigManager(\n bootstrap_servers='localhost:9092',\n user_id='user123'\n)\n\n# Ensure priority topics exist\nconfig_manager.check_and_create_priority_topics()\n\n# Get configuration summary\nsummary = config_manager.get_config_summary()\nprint(f\"Max priority: {summary['max_priority']}\")\nprint(f\"Topics count: {summary['topics_count']}\")\n```\n\n## \ud83d\udd27 Automatic Topic Creation\n\n### Features\n\n- **Configurable Partitions**: Each priority topic can have different partition counts\n- **Idempotent Creation**: Won't create topics that already exist\n- **Error Handling**: Graceful handling of creation failures\n- **S3 Integration**: Uses S3 configuration for topic specifications\n\n### Example\n\n```python\n# Topics are automatically created based on configuration\n# For config: {\"topic_name\": \"orders\", \"priority_boost_min_value\": 5, \"number_of_partitions\": 9}\n\n# Creates:\n# - orders_5 (9 partitions)\n# - orders_6 (9 partitions) \n# - orders_7 (9 partitions)\n# - orders_8 (9 partitions)\n# - orders_9 (9 partitions)\n# - orders_10 (9 partitions)\n```\n\n## \ud83d\udcca Monitoring and Debugging\n\n### Configuration Summary\n\n```python\n# Get detailed configuration information\nsummary = consumer.get_config_summary()\nprint(f\"Priority boost enabled: {summary['priority_boost_enabled']}\")\nprint(f\"Current subscription: {summary['current_subscription']}\")\nprint(f\"Max priority: {summary['max_priority']}\")\n```\n\n### Consumer State\n\n```python\n# Check consumer status\nprint(f\"Priority boost enabled: {consumer.priority_boost_enabled}\")\nprint(f\"Active consumers: {len(consumer.priority_consumer_manager.consumers)}\")\nprint(f\"Current subscription: {consumer.current_subscription}\")\n```\n\n## \ud83d\udea8 Troubleshooting\n\n### Debug Mode\n\n```python\nimport logging\nlogging.basicConfig(level=logging.DEBUG)\n\n# Enable debug logging for detailed information\nconsumer = KafkaboostConsumer(\n bootstrap_servers=['localhost:9092'],\n topics=['orders'],\n user_id='user123'\n)\n```\n\n## \ud83d\udcda API Reference\n\n### KafkaboostConsumer\n\n#### Constructor Parameters\n- `bootstrap_servers`: Kafka server address(es)\n- `topics`: Topic(s) to consume from\n- `group_id`: Consumer group ID\n- `user_id`: User ID for S3 config lookup (enables priority boost)\n- `auto_offset_reset`: Offset reset strategy ('earliest', 'latest', 'none')\n- `**kwargs`: Additional KafkaConsumer parameters\n\n#### Key Methods\n- `poll(timeout_ms=1000, max_records=None)`: Poll for messages (highest priority first)\n- `refresh_config()`: Refresh configuration from S3\n- `get_config_summary()`: Get configuration summary\n- `close()`: Close consumer and cleanup\n\n### KafkaboostProducer\n\n#### Constructor Parameters\n- `bootstrap_servers`: Kafka server address(es)\n- `user_id`: User ID for S3 config lookup\n- `**kwargs`: Additional KafkaProducer parameters\n\n#### Key Methods\n- `send(topic, value, priority=None)`: Send message with optional priority\n- `close()`: Close producer\n\n### KafkaConfigManager\n\n#### Constructor Parameters\n- `bootstrap_servers`: Kafka server address(es)\n- `user_id`: User ID for S3 config lookup\n\n#### Key Methods\n- `get_config_summary()`: Get configuration summary\n- `find_matching_topics(base_topics)`: Find priority topic variants\n\n## \ud83d\udd04 Migration Guide\n\n### Step 1: Configure on Website\n1. **Visit**: [https://master.d1hgz5clxamnqf.amplifyapp.com/](https://master.d1hgz5clxamnqf.amplifyapp.com/)\n2. **Login** and configure your topics, priorities, and settings\n3. **Copy your User ID** from the dashboard\n\n### Step 2: Update Your Code\n\n#### From Standard Kafka Consumer\n\n```python\n# Before\nfrom kafka import KafkaConsumer\nconsumer = KafkaConsumer('orders', bootstrap_servers=['localhost:9092'])\n\n# After (just add user_id!)\nfrom kafkaboost.consumer import KafkaboostConsumer\nconsumer = KafkaboostConsumer(\n bootstrap_servers=['localhost:9092'],\n topics=['orders'],\n user_id='your-user-id-from-website' # Copy from configuration website\n)\n```\n\n#### From Standard Kafka Producer\n\n```python\n# Before\nfrom kafka import KafkaProducer\nproducer = KafkaProducer(bootstrap_servers=['localhost:9092'])\n\n# After (just add user_id!)\nfrom kafkaboost.producer import KafkaboostProducer\nproducer = KafkaboostProducer(\n bootstrap_servers=['localhost:9092'],\n user_id='your-user-id-from-website' # Copy from configuration website\n)\n```\n\n## \ud83c\udfc6 Best Practices\n\n### Topic Design\n- Use descriptive base topic names\n- Keep priority levels manageable (0-10 recommended)\n- Ensure consistent naming across environments\n\n### Consumer Groups\n- Use different group IDs for different priority requirements\n- Consider separate consumers for different priority ranges\n- Monitor consumer group rebalancing\n\n### Performance\n- Priority boost mode is most effective with high message volumes\n- Consider batch sizes for optimal throughput\n- Monitor partition assignment and rebalancing\n\n### Error Handling\n- Always close consumers in finally blocks\n- Handle configuration refresh errors gracefully\n- Monitor partition pausing/resuming for performance\n\n## \ud83d\udce6 Dependencies\n\n- `kafka-python` - Core Kafka functionality\n- `boto3` - S3 configuration management\n\n## \ud83d\udcc4 License\n\nThis project extends the existing kafkaboost library with priority-aware features while maintaining backward compatibility.\n\n## \ud83e\udd1d Contributing\n\n1. Fork the repository\n2. Create a feature branch\n3. Add tests for new functionality\n4. Ensure all tests pass\n5. Submit a pull request\n\n---\n\n**KafkaBoost** - Making Kafka priority-aware and production-ready! \ud83d\ude80\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Enhanced Apache Kafka library with priority-based message processing",
"version": "0.2.0",
"project_urls": {
"Bug Tracker": "https://github.com/your-org/kafkaboost/issues",
"Documentation": "https://kafkaboost.readthedocs.io/",
"Homepage": "https://github.com/your-org/kafkaboost",
"Repository": "https://github.com/your-org/kafkaboost"
},
"split_keywords": [
"kafka",
" priority",
" messaging",
" distributed",
" streaming",
" async"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "0ad8c9620e2354719a3dca938e3b1514aaf6e99488745d9a7d196d953d893e98",
"md5": "81f50280f5ff3891a08ec3c723b82ce7",
"sha256": "c5eda599213d0102414537049dff45e44a74db5a8574bd6845a17e008a922550"
},
"downloads": -1,
"filename": "kafkaboost-0.2.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "81f50280f5ff3891a08ec3c723b82ce7",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.7",
"size": 90175,
"upload_time": "2025-09-08T10:31:09",
"upload_time_iso_8601": "2025-09-08T10:31:09.712206Z",
"url": "https://files.pythonhosted.org/packages/0a/d8/c9620e2354719a3dca938e3b1514aaf6e99488745d9a7d196d953d893e98/kafkaboost-0.2.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "adad28961bec882eca4379391442093409e89bde6f1c996860e2c1e0edb64f7a",
"md5": "a62d33c438533dca3f2284e51f19dac8",
"sha256": "1e6fe830d5b5c33d9a8e4996c7b6600b925f20bbfa8e9f5b6138abb004d6c41c"
},
"downloads": -1,
"filename": "kafkaboost-0.2.0.tar.gz",
"has_sig": false,
"md5_digest": "a62d33c438533dca3f2284e51f19dac8",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.7",
"size": 17055953,
"upload_time": "2025-09-08T10:31:12",
"upload_time_iso_8601": "2025-09-08T10:31:12.874050Z",
"url": "https://files.pythonhosted.org/packages/ad/ad/28961bec882eca4379391442093409e89bde6f1c996860e2c1e0edb64f7a/kafkaboost-0.2.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-09-08 10:31:12",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "your-org",
"github_project": "kafkaboost",
"github_not_found": true,
"lcname": "kafkaboost"
}