# Kafka Smart Producer
[](https://github.com/namphv/kafka-smart-producer/actions/workflows/ci.yml)
[](https://badge.fury.io/py/kafka-smart-producer)
[](https://pypi.org/project/kafka-smart-producer/)
[](https://opensource.org/licenses/MIT)
**Kafka Smart Producer** is a Python library that extends `confluent-kafka-python` with intelligent, real-time, lag-aware partition selection. It solves the "hot partition" problem by monitoring consumer health and routing messages to the healthiest partitions.
## ๐ Key Features
- **Intelligent Partition Selection**: Routes messages to healthy partitions based on real-time consumer lag monitoring
- **Performance Optimized**: Sub-millisecond overhead with smart caching strategies
- **Dual API Support**: Both synchronous and asynchronous producer implementations
- **Flexible Architecture**: Protocol-based design with extensible lag collection and health calculation
- **Graceful Degradation**: Falls back to default partitioner when health data is unavailable
- **Simple Configuration**: Minimal setup with sensible defaults, advanced configuration when needed
## ๐ฆ Installation
```bash
pip install kafka-smart-producer
```
### Optional Dependencies
For Redis-based distributed caching:
```bash
pip install kafka-smart-producer[redis]
```
For development:
```bash
pip install kafka-smart-producer[dev]
```
## ๐ง Quick Start
### Minimal Configuration
```python
from kafka_smart_producer import SmartProducer, SmartProducerConfig
# Simple setup - just specify Kafka, topics, and consumer group
config = SmartProducerConfig.from_dict({
"bootstrap.servers": "localhost:9092",
"topics": ["orders", "payments"],
"consumer_group": "order-processors" # Automatically enables health monitoring
})
# Create producer with automatic health monitoring
with SmartProducer(config) as producer:
# Messages are automatically routed to healthy partitions
producer.produce(
topic="orders",
key=b"customer-123",
value=b"order-data"
)
# Manual flush for guaranteed delivery
producer.flush()
```
### Advanced Configuration
```python
# Full control over health monitoring and caching
config = SmartProducerConfig.from_dict({
"bootstrap.servers": "localhost:9092",
"topics": ["orders", "payments"],
"health_manager": {
"consumer_group": "order-processors",
"health_threshold": 0.3, # More sensitive to lag
"refresh_interval": 3.0, # Faster refresh
"max_lag_for_health": 500, # Lower lag threshold
},
"cache": {
"local_max_size": 2000,
"local_ttl_seconds": 600,
"remote_enabled": True, # Redis caching
"redis_host": "localhost",
"redis_port": 6379
}
})
with SmartProducer(config) as producer:
# Get health information
healthy_partitions = producer.health_manager.get_healthy_partitions("orders")
health_summary = producer.health_manager.get_health_summary()
# Produce with smart partitioning
producer.produce(topic="orders", key=b"key", value=b"value")
```
### Async Producer
```python
from kafka_smart_producer import AsyncSmartProducer
async def main():
config = SmartProducerConfig.from_dict({
"bootstrap.servers": "localhost:9092",
"topics": ["orders"],
"consumer_group": "processors"
})
async with AsyncSmartProducer(config) as producer:
await producer.produce(topic="orders", key=b"key", value=b"value")
await producer.flush()
# Run with asyncio.run(main())
```
## ๐๏ธ Architecture
### Core Components
1. **LagDataCollector Protocol**: Fetches consumer lag data from various sources
- `KafkaAdminLagCollector`: Uses Kafka AdminClient (default)
- Extensible for custom data sources (Redis, Prometheus, etc.)
2. **HotPartitionCalculator Protocol**: Transforms lag data into health scores
- `ThresholdHotPartitionCalculator`: Basic threshold-based scoring (default)
- Extensible for custom health algorithms
3. **HealthManager**: Central coordinator for health monitoring
- `PartitionHealthMonitor`: Sync implementation with threading
- `AsyncPartitionHealthMonitor`: Async implementation with asyncio
### Caching Strategy
- **L1 Cache**: In-memory LRU cache for sub-millisecond lookups
- **L2 Cache**: Optional Redis-based distributed cache
- **Strategy**: Read-through pattern with TTL-based invalidation
## ๐ How It Works
1. **Health Monitoring**: Background threads/tasks continuously monitor consumer lag for configured topics
2. **Health Scoring**: Lag data is converted to health scores (0.0-1.0) using configurable algorithms
3. **Partition Selection**: During message production, the producer selects partitions with health scores above the threshold
4. **Caching**: Health data is cached to minimize latency impact on message production
5. **Fallback**: If no healthy partitions are available, falls back to confluent-kafka's default partitioner
## ๐ Performance
- **Overhead**: <1ms additional latency per message
- **Throughput**: Minimal impact on producer throughput
- **Memory**: Efficient caching with configurable TTL and size limits
- **Network**: Optional Redis caching for distributed deployments
## ๐ง Configuration Options
### SmartProducerConfig
| Parameter | Type | Default | Description |
| ---------------- | --------- | -------- | -------------------------------------------------------- |
| `kafka_config` | dict | Required | Standard confluent-kafka producer config |
| `topics` | list[str] | Required | Topics for smart partitioning |
| `consumer_group` | str | None | Consumer group for health monitoring (simplified config) |
| `health_manager` | dict | None | Detailed health manager configuration |
| `cache` | dict | None | Caching configuration |
| `smart_enabled` | bool | True | Enable/disable smart partitioning |
| `key_stickiness` | bool | True | Enable partition stickiness for keys |
### Health Manager Configuration
| Parameter | Type | Default | Description |
| -------------------- | ----- | -------- | ------------------------------------------- |
| `consumer_group` | str | Required | Consumer group to monitor |
| `health_threshold` | float | 0.5 | Minimum health score for healthy partitions |
| `refresh_interval` | float | 5.0 | Seconds between health data refreshes |
| `max_lag_for_health` | int | 1000 | Maximum lag for 0.0 health score |
| `timeout_seconds` | float | 5.0 | Timeout for lag collection operations |
## ๐งช Testing
```bash
# Install with dev dependencies
pip install kafka-smart-producer[dev]
# Run tests
pytest
# Run with coverage
pytest --cov=kafka_smart_producer
# Type checking
mypy src/
# Linting
ruff check .
```
## ๐ค Contributing
1. Fork the repository
2. Create a feature branch (`git checkout -b feature/amazing-feature`)
3. Commit your changes (`git commit -m 'Add amazing feature'`)
4. Push to the branch (`git push origin feature/amazing-feature`)
5. Open a Pull Request
## ๐ License
This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.
## ๐โโ๏ธ Support
- ๐ [Documentation](https://github.com/pham-nam/kafka-smart-producer)
- ๐ [Issue Tracker](https://github.com/pham-nam/kafka-smart-producer/issues)
- ๐ฌ [Discussions](https://github.com/pham-nam/kafka-smart-producer/discussions)
## ๐ Version History
### 0.1.0 (Initial Release)
- Core smart partitioning functionality
- Sync and async producer implementations
- Health monitoring with threading/asyncio
- Flexible caching with local and Redis support
- Protocol-based extensible architecture
- Comprehensive test suite
Raw data
{
"_id": null,
"home_page": null,
"name": "kafka-smart-producer",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.9",
"maintainer_email": null,
"keywords": "consumer-lag, kafka, partition-selection, producer, smart-routing",
"author": null,
"author_email": "Pham Nam <namph.data@gmail.com>",
"download_url": "https://files.pythonhosted.org/packages/72/74/84ac84dba9ebd07982b8ef129df49b338670bf8dc53a49569ad9697590c5/kafka_smart_producer-0.0.1.tar.gz",
"platform": null,
"description": "# Kafka Smart Producer\n\n[](https://github.com/namphv/kafka-smart-producer/actions/workflows/ci.yml)\n[](https://badge.fury.io/py/kafka-smart-producer)\n[](https://pypi.org/project/kafka-smart-producer/)\n[](https://opensource.org/licenses/MIT)\n\n**Kafka Smart Producer** is a Python library that extends `confluent-kafka-python` with intelligent, real-time, lag-aware partition selection. It solves the \"hot partition\" problem by monitoring consumer health and routing messages to the healthiest partitions.\n\n## \ud83d\ude80 Key Features\n\n- **Intelligent Partition Selection**: Routes messages to healthy partitions based on real-time consumer lag monitoring\n- **Performance Optimized**: Sub-millisecond overhead with smart caching strategies\n- **Dual API Support**: Both synchronous and asynchronous producer implementations\n- **Flexible Architecture**: Protocol-based design with extensible lag collection and health calculation\n- **Graceful Degradation**: Falls back to default partitioner when health data is unavailable\n- **Simple Configuration**: Minimal setup with sensible defaults, advanced configuration when needed\n\n## \ud83d\udce6 Installation\n\n```bash\npip install kafka-smart-producer\n```\n\n### Optional Dependencies\n\nFor Redis-based distributed caching:\n\n```bash\npip install kafka-smart-producer[redis]\n```\n\nFor development:\n\n```bash\npip install kafka-smart-producer[dev]\n```\n\n## \ud83d\udd27 Quick Start\n\n### Minimal Configuration\n\n```python\nfrom kafka_smart_producer import SmartProducer, SmartProducerConfig\n\n# Simple setup - just specify Kafka, topics, and consumer group\nconfig = SmartProducerConfig.from_dict({\n \"bootstrap.servers\": \"localhost:9092\",\n \"topics\": [\"orders\", \"payments\"],\n \"consumer_group\": \"order-processors\" # Automatically enables health monitoring\n})\n\n# Create producer with automatic health monitoring\nwith SmartProducer(config) as producer:\n # Messages are automatically routed to healthy partitions\n producer.produce(\n topic=\"orders\",\n key=b\"customer-123\",\n value=b\"order-data\"\n )\n\n # Manual flush for guaranteed delivery\n producer.flush()\n```\n\n### Advanced Configuration\n\n```python\n# Full control over health monitoring and caching\nconfig = SmartProducerConfig.from_dict({\n \"bootstrap.servers\": \"localhost:9092\",\n \"topics\": [\"orders\", \"payments\"],\n \"health_manager\": {\n \"consumer_group\": \"order-processors\",\n \"health_threshold\": 0.3, # More sensitive to lag\n \"refresh_interval\": 3.0, # Faster refresh\n \"max_lag_for_health\": 500, # Lower lag threshold\n },\n \"cache\": {\n \"local_max_size\": 2000,\n \"local_ttl_seconds\": 600,\n \"remote_enabled\": True, # Redis caching\n \"redis_host\": \"localhost\",\n \"redis_port\": 6379\n }\n})\n\nwith SmartProducer(config) as producer:\n # Get health information\n healthy_partitions = producer.health_manager.get_healthy_partitions(\"orders\")\n health_summary = producer.health_manager.get_health_summary()\n\n # Produce with smart partitioning\n producer.produce(topic=\"orders\", key=b\"key\", value=b\"value\")\n```\n\n### Async Producer\n\n```python\nfrom kafka_smart_producer import AsyncSmartProducer\n\nasync def main():\n config = SmartProducerConfig.from_dict({\n \"bootstrap.servers\": \"localhost:9092\",\n \"topics\": [\"orders\"],\n \"consumer_group\": \"processors\"\n })\n\n async with AsyncSmartProducer(config) as producer:\n await producer.produce(topic=\"orders\", key=b\"key\", value=b\"value\")\n await producer.flush()\n\n# Run with asyncio.run(main())\n```\n\n## \ud83c\udfd7\ufe0f Architecture\n\n### Core Components\n\n1. **LagDataCollector Protocol**: Fetches consumer lag data from various sources\n - `KafkaAdminLagCollector`: Uses Kafka AdminClient (default)\n - Extensible for custom data sources (Redis, Prometheus, etc.)\n\n2. **HotPartitionCalculator Protocol**: Transforms lag data into health scores\n - `ThresholdHotPartitionCalculator`: Basic threshold-based scoring (default)\n - Extensible for custom health algorithms\n\n3. **HealthManager**: Central coordinator for health monitoring\n - `PartitionHealthMonitor`: Sync implementation with threading\n - `AsyncPartitionHealthMonitor`: Async implementation with asyncio\n\n### Caching Strategy\n\n- **L1 Cache**: In-memory LRU cache for sub-millisecond lookups\n- **L2 Cache**: Optional Redis-based distributed cache\n- **Strategy**: Read-through pattern with TTL-based invalidation\n\n## \ud83d\udd04 How It Works\n\n1. **Health Monitoring**: Background threads/tasks continuously monitor consumer lag for configured topics\n2. **Health Scoring**: Lag data is converted to health scores (0.0-1.0) using configurable algorithms\n3. **Partition Selection**: During message production, the producer selects partitions with health scores above the threshold\n4. **Caching**: Health data is cached to minimize latency impact on message production\n5. **Fallback**: If no healthy partitions are available, falls back to confluent-kafka's default partitioner\n\n## \ud83d\udcca Performance\n\n- **Overhead**: <1ms additional latency per message\n- **Throughput**: Minimal impact on producer throughput\n- **Memory**: Efficient caching with configurable TTL and size limits\n- **Network**: Optional Redis caching for distributed deployments\n\n## \ud83d\udd27 Configuration Options\n\n### SmartProducerConfig\n\n| Parameter | Type | Default | Description |\n| ---------------- | --------- | -------- | -------------------------------------------------------- |\n| `kafka_config` | dict | Required | Standard confluent-kafka producer config |\n| `topics` | list[str] | Required | Topics for smart partitioning |\n| `consumer_group` | str | None | Consumer group for health monitoring (simplified config) |\n| `health_manager` | dict | None | Detailed health manager configuration |\n| `cache` | dict | None | Caching configuration |\n| `smart_enabled` | bool | True | Enable/disable smart partitioning |\n| `key_stickiness` | bool | True | Enable partition stickiness for keys |\n\n### Health Manager Configuration\n\n| Parameter | Type | Default | Description |\n| -------------------- | ----- | -------- | ------------------------------------------- |\n| `consumer_group` | str | Required | Consumer group to monitor |\n| `health_threshold` | float | 0.5 | Minimum health score for healthy partitions |\n| `refresh_interval` | float | 5.0 | Seconds between health data refreshes |\n| `max_lag_for_health` | int | 1000 | Maximum lag for 0.0 health score |\n| `timeout_seconds` | float | 5.0 | Timeout for lag collection operations |\n\n## \ud83e\uddea Testing\n\n```bash\n# Install with dev dependencies\npip install kafka-smart-producer[dev]\n\n# Run tests\npytest\n\n# Run with coverage\npytest --cov=kafka_smart_producer\n\n# Type checking\nmypy src/\n\n# Linting\nruff check .\n```\n\n## \ud83e\udd1d Contributing\n\n1. Fork the repository\n2. Create a feature branch (`git checkout -b feature/amazing-feature`)\n3. Commit your changes (`git commit -m 'Add amazing feature'`)\n4. Push to the branch (`git push origin feature/amazing-feature`)\n5. Open a Pull Request\n\n## \ud83d\udcdd License\n\nThis project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.\n\n## \ud83d\ude4b\u200d\u2642\ufe0f Support\n\n- \ud83d\udcd6 [Documentation](https://github.com/pham-nam/kafka-smart-producer)\n- \ud83d\udc1b [Issue Tracker](https://github.com/pham-nam/kafka-smart-producer/issues)\n- \ud83d\udcac [Discussions](https://github.com/pham-nam/kafka-smart-producer/discussions)\n\n## \ud83d\udd04 Version History\n\n### 0.1.0 (Initial Release)\n\n- Core smart partitioning functionality\n- Sync and async producer implementations\n- Health monitoring with threading/asyncio\n- Flexible caching with local and Redis support\n- Protocol-based extensible architecture\n- Comprehensive test suite\n",
"bugtrack_url": null,
"license": null,
"summary": "Intelligent Kafka producer with real-time, lag-aware partition selection",
"version": "0.0.1",
"project_urls": {
"Homepage": "https://github.com/namphv/kafka-smart-producer",
"Issues": "https://github.com/namphv/kafka-smart-producer/issues",
"Repository": "https://github.com/namphv/kafka-smart-producer"
},
"split_keywords": [
"consumer-lag",
" kafka",
" partition-selection",
" producer",
" smart-routing"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "ae111b199ca61b1309c0139af6dc2a06047d33aee856d3c37f2a4233a22dcc92",
"md5": "d7ff25708c651409dfeef740033bdd1d",
"sha256": "752eb2399fa0bff172c4013ccd6cf164334601daa1c15f3b6fbea91f4fb80849"
},
"downloads": -1,
"filename": "kafka_smart_producer-0.0.1-py3-none-any.whl",
"has_sig": false,
"md5_digest": "d7ff25708c651409dfeef740033bdd1d",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.9",
"size": 43503,
"upload_time": "2025-07-13T06:25:48",
"upload_time_iso_8601": "2025-07-13T06:25:48.066192Z",
"url": "https://files.pythonhosted.org/packages/ae/11/1b199ca61b1309c0139af6dc2a06047d33aee856d3c37f2a4233a22dcc92/kafka_smart_producer-0.0.1-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "727484ac84dba9ebd07982b8ef129df49b338670bf8dc53a49569ad9697590c5",
"md5": "52692cce4624e0b2bb8b5a913bf4fb2c",
"sha256": "316e8b3f790059a5a1e090fd29fe34185d150f755c6e0af3748d2b2d3752ebf9"
},
"downloads": -1,
"filename": "kafka_smart_producer-0.0.1.tar.gz",
"has_sig": false,
"md5_digest": "52692cce4624e0b2bb8b5a913bf4fb2c",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.9",
"size": 94374,
"upload_time": "2025-07-13T06:25:49",
"upload_time_iso_8601": "2025-07-13T06:25:49.650756Z",
"url": "https://files.pythonhosted.org/packages/72/74/84ac84dba9ebd07982b8ef129df49b338670bf8dc53a49569ad9697590c5/kafka_smart_producer-0.0.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-07-13 06:25:49",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "namphv",
"github_project": "kafka-smart-producer",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "kafka-smart-producer"
}