kafka-smart-producer


Namekafka-smart-producer JSON
Version 0.0.1 PyPI version JSON
download
home_pageNone
SummaryIntelligent Kafka producer with real-time, lag-aware partition selection
upload_time2025-07-13 06:25:49
maintainerNone
docs_urlNone
authorNone
requires_python>=3.9
licenseNone
keywords consumer-lag kafka partition-selection producer smart-routing
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Kafka Smart Producer

[![CI](https://github.com/namphv/kafka-smart-producer/workflows/CI/badge.svg)](https://github.com/namphv/kafka-smart-producer/actions/workflows/ci.yml)
[![PyPI version](https://badge.fury.io/py/kafka-smart-producer.svg)](https://badge.fury.io/py/kafka-smart-producer)
[![Python Version](https://img.shields.io/pypi/pyversions/kafka-smart-producer.svg)](https://pypi.org/project/kafka-smart-producer/)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)

**Kafka Smart Producer** is a Python library that extends `confluent-kafka-python` with intelligent, real-time, lag-aware partition selection. It solves the "hot partition" problem by monitoring consumer health and routing messages to the healthiest partitions.

## ๐Ÿš€ Key Features

- **Intelligent Partition Selection**: Routes messages to healthy partitions based on real-time consumer lag monitoring
- **Performance Optimized**: Sub-millisecond overhead with smart caching strategies
- **Dual API Support**: Both synchronous and asynchronous producer implementations
- **Flexible Architecture**: Protocol-based design with extensible lag collection and health calculation
- **Graceful Degradation**: Falls back to default partitioner when health data is unavailable
- **Simple Configuration**: Minimal setup with sensible defaults, advanced configuration when needed

## ๐Ÿ“ฆ Installation

```bash
pip install kafka-smart-producer
```

### Optional Dependencies

For Redis-based distributed caching:

```bash
pip install kafka-smart-producer[redis]
```

For development:

```bash
pip install kafka-smart-producer[dev]
```

## ๐Ÿ”ง Quick Start

### Minimal Configuration

```python
from kafka_smart_producer import SmartProducer, SmartProducerConfig

# Simple setup - just specify Kafka, topics, and consumer group
config = SmartProducerConfig.from_dict({
    "bootstrap.servers": "localhost:9092",
    "topics": ["orders", "payments"],
    "consumer_group": "order-processors"  # Automatically enables health monitoring
})

# Create producer with automatic health monitoring
with SmartProducer(config) as producer:
    # Messages are automatically routed to healthy partitions
    producer.produce(
        topic="orders",
        key=b"customer-123",
        value=b"order-data"
    )

    # Manual flush for guaranteed delivery
    producer.flush()
```

### Advanced Configuration

```python
# Full control over health monitoring and caching
config = SmartProducerConfig.from_dict({
    "bootstrap.servers": "localhost:9092",
    "topics": ["orders", "payments"],
    "health_manager": {
        "consumer_group": "order-processors",
        "health_threshold": 0.3,      # More sensitive to lag
        "refresh_interval": 3.0,      # Faster refresh
        "max_lag_for_health": 500,    # Lower lag threshold
    },
    "cache": {
        "local_max_size": 2000,
        "local_ttl_seconds": 600,
        "remote_enabled": True,       # Redis caching
        "redis_host": "localhost",
        "redis_port": 6379
    }
})

with SmartProducer(config) as producer:
    # Get health information
    healthy_partitions = producer.health_manager.get_healthy_partitions("orders")
    health_summary = producer.health_manager.get_health_summary()

    # Produce with smart partitioning
    producer.produce(topic="orders", key=b"key", value=b"value")
```

### Async Producer

```python
from kafka_smart_producer import AsyncSmartProducer

async def main():
    config = SmartProducerConfig.from_dict({
        "bootstrap.servers": "localhost:9092",
        "topics": ["orders"],
        "consumer_group": "processors"
    })

    async with AsyncSmartProducer(config) as producer:
        await producer.produce(topic="orders", key=b"key", value=b"value")
        await producer.flush()

# Run with asyncio.run(main())
```

## ๐Ÿ—๏ธ Architecture

### Core Components

1. **LagDataCollector Protocol**: Fetches consumer lag data from various sources
   - `KafkaAdminLagCollector`: Uses Kafka AdminClient (default)
   - Extensible for custom data sources (Redis, Prometheus, etc.)

2. **HotPartitionCalculator Protocol**: Transforms lag data into health scores
   - `ThresholdHotPartitionCalculator`: Basic threshold-based scoring (default)
   - Extensible for custom health algorithms

3. **HealthManager**: Central coordinator for health monitoring
   - `PartitionHealthMonitor`: Sync implementation with threading
   - `AsyncPartitionHealthMonitor`: Async implementation with asyncio

### Caching Strategy

- **L1 Cache**: In-memory LRU cache for sub-millisecond lookups
- **L2 Cache**: Optional Redis-based distributed cache
- **Strategy**: Read-through pattern with TTL-based invalidation

## ๐Ÿ”„ How It Works

1. **Health Monitoring**: Background threads/tasks continuously monitor consumer lag for configured topics
2. **Health Scoring**: Lag data is converted to health scores (0.0-1.0) using configurable algorithms
3. **Partition Selection**: During message production, the producer selects partitions with health scores above the threshold
4. **Caching**: Health data is cached to minimize latency impact on message production
5. **Fallback**: If no healthy partitions are available, falls back to confluent-kafka's default partitioner

## ๐Ÿ“Š Performance

- **Overhead**: <1ms additional latency per message
- **Throughput**: Minimal impact on producer throughput
- **Memory**: Efficient caching with configurable TTL and size limits
- **Network**: Optional Redis caching for distributed deployments

## ๐Ÿ”ง Configuration Options

### SmartProducerConfig

| Parameter        | Type      | Default  | Description                                              |
| ---------------- | --------- | -------- | -------------------------------------------------------- |
| `kafka_config`   | dict      | Required | Standard confluent-kafka producer config                 |
| `topics`         | list[str] | Required | Topics for smart partitioning                            |
| `consumer_group` | str       | None     | Consumer group for health monitoring (simplified config) |
| `health_manager` | dict      | None     | Detailed health manager configuration                    |
| `cache`          | dict      | None     | Caching configuration                                    |
| `smart_enabled`  | bool      | True     | Enable/disable smart partitioning                        |
| `key_stickiness` | bool      | True     | Enable partition stickiness for keys                     |

### Health Manager Configuration

| Parameter            | Type  | Default  | Description                                 |
| -------------------- | ----- | -------- | ------------------------------------------- |
| `consumer_group`     | str   | Required | Consumer group to monitor                   |
| `health_threshold`   | float | 0.5      | Minimum health score for healthy partitions |
| `refresh_interval`   | float | 5.0      | Seconds between health data refreshes       |
| `max_lag_for_health` | int   | 1000     | Maximum lag for 0.0 health score            |
| `timeout_seconds`    | float | 5.0      | Timeout for lag collection operations       |

## ๐Ÿงช Testing

```bash
# Install with dev dependencies
pip install kafka-smart-producer[dev]

# Run tests
pytest

# Run with coverage
pytest --cov=kafka_smart_producer

# Type checking
mypy src/

# Linting
ruff check .
```

## ๐Ÿค Contributing

1. Fork the repository
2. Create a feature branch (`git checkout -b feature/amazing-feature`)
3. Commit your changes (`git commit -m 'Add amazing feature'`)
4. Push to the branch (`git push origin feature/amazing-feature`)
5. Open a Pull Request

## ๐Ÿ“ License

This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.

## ๐Ÿ™‹โ€โ™‚๏ธ Support

- ๐Ÿ“– [Documentation](https://github.com/pham-nam/kafka-smart-producer)
- ๐Ÿ› [Issue Tracker](https://github.com/pham-nam/kafka-smart-producer/issues)
- ๐Ÿ’ฌ [Discussions](https://github.com/pham-nam/kafka-smart-producer/discussions)

## ๐Ÿ”„ Version History

### 0.1.0 (Initial Release)

- Core smart partitioning functionality
- Sync and async producer implementations
- Health monitoring with threading/asyncio
- Flexible caching with local and Redis support
- Protocol-based extensible architecture
- Comprehensive test suite

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "kafka-smart-producer",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.9",
    "maintainer_email": null,
    "keywords": "consumer-lag, kafka, partition-selection, producer, smart-routing",
    "author": null,
    "author_email": "Pham Nam <namph.data@gmail.com>",
    "download_url": "https://files.pythonhosted.org/packages/72/74/84ac84dba9ebd07982b8ef129df49b338670bf8dc53a49569ad9697590c5/kafka_smart_producer-0.0.1.tar.gz",
    "platform": null,
    "description": "# Kafka Smart Producer\n\n[![CI](https://github.com/namphv/kafka-smart-producer/workflows/CI/badge.svg)](https://github.com/namphv/kafka-smart-producer/actions/workflows/ci.yml)\n[![PyPI version](https://badge.fury.io/py/kafka-smart-producer.svg)](https://badge.fury.io/py/kafka-smart-producer)\n[![Python Version](https://img.shields.io/pypi/pyversions/kafka-smart-producer.svg)](https://pypi.org/project/kafka-smart-producer/)\n[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)\n\n**Kafka Smart Producer** is a Python library that extends `confluent-kafka-python` with intelligent, real-time, lag-aware partition selection. It solves the \"hot partition\" problem by monitoring consumer health and routing messages to the healthiest partitions.\n\n## \ud83d\ude80 Key Features\n\n- **Intelligent Partition Selection**: Routes messages to healthy partitions based on real-time consumer lag monitoring\n- **Performance Optimized**: Sub-millisecond overhead with smart caching strategies\n- **Dual API Support**: Both synchronous and asynchronous producer implementations\n- **Flexible Architecture**: Protocol-based design with extensible lag collection and health calculation\n- **Graceful Degradation**: Falls back to default partitioner when health data is unavailable\n- **Simple Configuration**: Minimal setup with sensible defaults, advanced configuration when needed\n\n## \ud83d\udce6 Installation\n\n```bash\npip install kafka-smart-producer\n```\n\n### Optional Dependencies\n\nFor Redis-based distributed caching:\n\n```bash\npip install kafka-smart-producer[redis]\n```\n\nFor development:\n\n```bash\npip install kafka-smart-producer[dev]\n```\n\n## \ud83d\udd27 Quick Start\n\n### Minimal Configuration\n\n```python\nfrom kafka_smart_producer import SmartProducer, SmartProducerConfig\n\n# Simple setup - just specify Kafka, topics, and consumer group\nconfig = SmartProducerConfig.from_dict({\n    \"bootstrap.servers\": \"localhost:9092\",\n    \"topics\": [\"orders\", \"payments\"],\n    \"consumer_group\": \"order-processors\"  # Automatically enables health monitoring\n})\n\n# Create producer with automatic health monitoring\nwith SmartProducer(config) as producer:\n    # Messages are automatically routed to healthy partitions\n    producer.produce(\n        topic=\"orders\",\n        key=b\"customer-123\",\n        value=b\"order-data\"\n    )\n\n    # Manual flush for guaranteed delivery\n    producer.flush()\n```\n\n### Advanced Configuration\n\n```python\n# Full control over health monitoring and caching\nconfig = SmartProducerConfig.from_dict({\n    \"bootstrap.servers\": \"localhost:9092\",\n    \"topics\": [\"orders\", \"payments\"],\n    \"health_manager\": {\n        \"consumer_group\": \"order-processors\",\n        \"health_threshold\": 0.3,      # More sensitive to lag\n        \"refresh_interval\": 3.0,      # Faster refresh\n        \"max_lag_for_health\": 500,    # Lower lag threshold\n    },\n    \"cache\": {\n        \"local_max_size\": 2000,\n        \"local_ttl_seconds\": 600,\n        \"remote_enabled\": True,       # Redis caching\n        \"redis_host\": \"localhost\",\n        \"redis_port\": 6379\n    }\n})\n\nwith SmartProducer(config) as producer:\n    # Get health information\n    healthy_partitions = producer.health_manager.get_healthy_partitions(\"orders\")\n    health_summary = producer.health_manager.get_health_summary()\n\n    # Produce with smart partitioning\n    producer.produce(topic=\"orders\", key=b\"key\", value=b\"value\")\n```\n\n### Async Producer\n\n```python\nfrom kafka_smart_producer import AsyncSmartProducer\n\nasync def main():\n    config = SmartProducerConfig.from_dict({\n        \"bootstrap.servers\": \"localhost:9092\",\n        \"topics\": [\"orders\"],\n        \"consumer_group\": \"processors\"\n    })\n\n    async with AsyncSmartProducer(config) as producer:\n        await producer.produce(topic=\"orders\", key=b\"key\", value=b\"value\")\n        await producer.flush()\n\n# Run with asyncio.run(main())\n```\n\n## \ud83c\udfd7\ufe0f Architecture\n\n### Core Components\n\n1. **LagDataCollector Protocol**: Fetches consumer lag data from various sources\n   - `KafkaAdminLagCollector`: Uses Kafka AdminClient (default)\n   - Extensible for custom data sources (Redis, Prometheus, etc.)\n\n2. **HotPartitionCalculator Protocol**: Transforms lag data into health scores\n   - `ThresholdHotPartitionCalculator`: Basic threshold-based scoring (default)\n   - Extensible for custom health algorithms\n\n3. **HealthManager**: Central coordinator for health monitoring\n   - `PartitionHealthMonitor`: Sync implementation with threading\n   - `AsyncPartitionHealthMonitor`: Async implementation with asyncio\n\n### Caching Strategy\n\n- **L1 Cache**: In-memory LRU cache for sub-millisecond lookups\n- **L2 Cache**: Optional Redis-based distributed cache\n- **Strategy**: Read-through pattern with TTL-based invalidation\n\n## \ud83d\udd04 How It Works\n\n1. **Health Monitoring**: Background threads/tasks continuously monitor consumer lag for configured topics\n2. **Health Scoring**: Lag data is converted to health scores (0.0-1.0) using configurable algorithms\n3. **Partition Selection**: During message production, the producer selects partitions with health scores above the threshold\n4. **Caching**: Health data is cached to minimize latency impact on message production\n5. **Fallback**: If no healthy partitions are available, falls back to confluent-kafka's default partitioner\n\n## \ud83d\udcca Performance\n\n- **Overhead**: <1ms additional latency per message\n- **Throughput**: Minimal impact on producer throughput\n- **Memory**: Efficient caching with configurable TTL and size limits\n- **Network**: Optional Redis caching for distributed deployments\n\n## \ud83d\udd27 Configuration Options\n\n### SmartProducerConfig\n\n| Parameter        | Type      | Default  | Description                                              |\n| ---------------- | --------- | -------- | -------------------------------------------------------- |\n| `kafka_config`   | dict      | Required | Standard confluent-kafka producer config                 |\n| `topics`         | list[str] | Required | Topics for smart partitioning                            |\n| `consumer_group` | str       | None     | Consumer group for health monitoring (simplified config) |\n| `health_manager` | dict      | None     | Detailed health manager configuration                    |\n| `cache`          | dict      | None     | Caching configuration                                    |\n| `smart_enabled`  | bool      | True     | Enable/disable smart partitioning                        |\n| `key_stickiness` | bool      | True     | Enable partition stickiness for keys                     |\n\n### Health Manager Configuration\n\n| Parameter            | Type  | Default  | Description                                 |\n| -------------------- | ----- | -------- | ------------------------------------------- |\n| `consumer_group`     | str   | Required | Consumer group to monitor                   |\n| `health_threshold`   | float | 0.5      | Minimum health score for healthy partitions |\n| `refresh_interval`   | float | 5.0      | Seconds between health data refreshes       |\n| `max_lag_for_health` | int   | 1000     | Maximum lag for 0.0 health score            |\n| `timeout_seconds`    | float | 5.0      | Timeout for lag collection operations       |\n\n## \ud83e\uddea Testing\n\n```bash\n# Install with dev dependencies\npip install kafka-smart-producer[dev]\n\n# Run tests\npytest\n\n# Run with coverage\npytest --cov=kafka_smart_producer\n\n# Type checking\nmypy src/\n\n# Linting\nruff check .\n```\n\n## \ud83e\udd1d Contributing\n\n1. Fork the repository\n2. Create a feature branch (`git checkout -b feature/amazing-feature`)\n3. Commit your changes (`git commit -m 'Add amazing feature'`)\n4. Push to the branch (`git push origin feature/amazing-feature`)\n5. Open a Pull Request\n\n## \ud83d\udcdd License\n\nThis project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.\n\n## \ud83d\ude4b\u200d\u2642\ufe0f Support\n\n- \ud83d\udcd6 [Documentation](https://github.com/pham-nam/kafka-smart-producer)\n- \ud83d\udc1b [Issue Tracker](https://github.com/pham-nam/kafka-smart-producer/issues)\n- \ud83d\udcac [Discussions](https://github.com/pham-nam/kafka-smart-producer/discussions)\n\n## \ud83d\udd04 Version History\n\n### 0.1.0 (Initial Release)\n\n- Core smart partitioning functionality\n- Sync and async producer implementations\n- Health monitoring with threading/asyncio\n- Flexible caching with local and Redis support\n- Protocol-based extensible architecture\n- Comprehensive test suite\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "Intelligent Kafka producer with real-time, lag-aware partition selection",
    "version": "0.0.1",
    "project_urls": {
        "Homepage": "https://github.com/namphv/kafka-smart-producer",
        "Issues": "https://github.com/namphv/kafka-smart-producer/issues",
        "Repository": "https://github.com/namphv/kafka-smart-producer"
    },
    "split_keywords": [
        "consumer-lag",
        " kafka",
        " partition-selection",
        " producer",
        " smart-routing"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "ae111b199ca61b1309c0139af6dc2a06047d33aee856d3c37f2a4233a22dcc92",
                "md5": "d7ff25708c651409dfeef740033bdd1d",
                "sha256": "752eb2399fa0bff172c4013ccd6cf164334601daa1c15f3b6fbea91f4fb80849"
            },
            "downloads": -1,
            "filename": "kafka_smart_producer-0.0.1-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "d7ff25708c651409dfeef740033bdd1d",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.9",
            "size": 43503,
            "upload_time": "2025-07-13T06:25:48",
            "upload_time_iso_8601": "2025-07-13T06:25:48.066192Z",
            "url": "https://files.pythonhosted.org/packages/ae/11/1b199ca61b1309c0139af6dc2a06047d33aee856d3c37f2a4233a22dcc92/kafka_smart_producer-0.0.1-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "727484ac84dba9ebd07982b8ef129df49b338670bf8dc53a49569ad9697590c5",
                "md5": "52692cce4624e0b2bb8b5a913bf4fb2c",
                "sha256": "316e8b3f790059a5a1e090fd29fe34185d150f755c6e0af3748d2b2d3752ebf9"
            },
            "downloads": -1,
            "filename": "kafka_smart_producer-0.0.1.tar.gz",
            "has_sig": false,
            "md5_digest": "52692cce4624e0b2bb8b5a913bf4fb2c",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.9",
            "size": 94374,
            "upload_time": "2025-07-13T06:25:49",
            "upload_time_iso_8601": "2025-07-13T06:25:49.650756Z",
            "url": "https://files.pythonhosted.org/packages/72/74/84ac84dba9ebd07982b8ef129df49b338670bf8dc53a49569ad9697590c5/kafka_smart_producer-0.0.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-07-13 06:25:49",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "namphv",
    "github_project": "kafka-smart-producer",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "kafka-smart-producer"
}
        
Elapsed time: 0.43535s