muffin-kafka


Namemuffin-kafka JSON
Version 0.7.4 PyPI version JSON
download
home_pagehttps://github.com/klen/muffin-kafka
SummaryKafka Integration for Muffin framework
upload_time2025-07-15 11:52:00
maintainerNone
docs_urlNone
authorKirill Klenov
requires_python<4.0,>=3.10
licenseMIT
keywords kafka asyncio asgi muffin
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Muffin-Kafka

**Muffin-Kafka** is an [Apache Kafka](https://kafka.apache.org) integration plugin for the [Muffin](https://klen.github.io/muffin) web framework, built on top of `aiokafka`.

[![Tests Status](https://github.com/klen/muffin-kafka/workflows/tests/badge.svg)](https://github.com/klen/muffin-kafka/actions)
[![PYPI Version](https://img.shields.io/pypi/v/muffin-kafka)](https://pypi.org/project/muffin-kafka/)
[![Python Versions](https://img.shields.io/pypi/pyversions/muffin-kafka)](https://pypi.org/project/muffin-kafka/)

---

## 🚀 Features

- **Async Kafka integration** using `aiokafka`
- **Per-topic task model** — each topic is consumed in an isolated asyncio task
- **Simple handler registration** using `@plugin.handle_topics(...)`
- **Manual or auto-commit support**, custom group IDs
- **Producer support** (`send` / `send_and_wait`)
- **Built-in monitoring** with offsets, lag, and poll delay
- **Healthcheck support** for liveness probes and observability
- Optional error handler via `@plugin.handle_error(...)`

---

## ✅ Requirements

- Python ≥ 3.10
- Muffin ≥ 0.71
- Kafka cluster or broker (local or cloud)

---

## 📦 Installation

```bash
pip install muffin-kafka
```

## ⚙️ Usage

```python
    from muffin import Application
    from muffin_kafka import Kafka

    app = Application("example")

    # Initialize the plugin with config options
    kafka = Kafka(app, bootstrap_servers="localhost:9092", produce=True, listen=True)
```

### 🧩 Registering Handlers

Use `@kafka.handle_topics(...)` to register a handler for specific Kafka topics:

```python
    @kafka.handle_topics("events.user", "events.auth")
    async def handle_event(message):
        data = message.value.decode()
        print("Received:", data)
```

You can also register a global error handler:

```python
    @kafka.handle_error
    async def on_error(exc):
        print("Kafka error:", exc)
```

### 📤 Sending Messages

You can send messages to Kafka topics using the `send` or `send_and_wait` methods:

```python
    # Send a message without waiting for acknowledgment
    await kafka.send("events.user", {"action": "signup"}, key="user123")

    # Or wait for broker acknowledgment
    result = await kafka.send_and_wait("events.user", {"action": "login"})
```

### 🔄 Healthcheck

You can monitor consumer health by checking lag across partitions:

```python
    # Check if Kafka lag is within acceptable limits
    ok = await kafka.healthcheck(max_lag=1000)
    if not ok:
        raise RuntimeError("Kafka lag too high")
```

## 📊 Monitoring

If monitor=True is passed, the plugin will log:

- Committed offsets
- Latest offsets
- Poll timestamps
- Per-partition lag and delay

This data can be extended for Prometheus/Grafana metrics or alerting.

## ⚙️ Configuration Options

You can pass configuration options either as keyword arguments to the plugin:

```python
kafka = Kafka(app, bootstrap_servers="localhost:9092", produce=True)
```

Or set them via Muffin's config system (e.g. `.env`, YAML):

```python
"KAFKA_BOOTSTRAP_SERVERS": "localhost:9092",
"KAFKA_PRODUCE": True,
```

### Supported Options:

| Option                | Type   | Default            | Description                                 |
| --------------------- | ------ | ------------------ | ------------------------------------------- |
| `bootstrap_servers`   | `str`  | `"localhost:9092"` | Kafka broker connection string              |
| `group_id`            | `str`  | `None`             | Kafka consumer group ID                     |
| `client_id`           | `str`  | `"muffin"`         | Kafka client ID                             |
| `produce`             | `bool` | `False`            | Enable Kafka producer                       |
| `listen`              | `bool` | `True`             | Enable consumers (message listening)        |
| `monitor`             | `bool` | `False`            | Enable internal consumer monitor            |
| `monitor_interval`    | `int`  | `60`               | Monitor frequency in seconds                |
| `auto_offset_reset`   | `str`  | `"earliest"`       | Where to start if no committed offset       |
| `enable_auto_commit`  | `bool` | `False`            | Automatically commit offsets                |
| `max_poll_records`    | `int`  | `None`             | Max records to poll in one batch            |
| `request_timeout_ms`  | `int`  | `30000`            | Request timeout                             |
| `retry_backoff_ms`    | `int`  | `1000`             | Retry interval on failure                   |
| `security_protocol`   | `str`  | `"PLAINTEXT"`      | Kafka protocol (`SSL`, `SASL_PLAINTEXT`, …) |
| `sasl_mechanism`      | `str`  | `"PLAIN"`          | SASL auth mechanism                         |
| `sasl_plain_username` | `str`  | `None`             | SASL auth user                              |
| `sasl_plain_password` | `str`  | `None`             | SASL auth password                          |
| `ssl_cafile`          | `str`  | `None`             | Path to trusted CA certs                    |

---

## 🐞 Bug Tracker

Found a bug or have a feature request? Please open an issue at:
[https://github.com/klen/muffin-kafka/issues](https://github.com/klen/muffin-kafka/issues)

---

## 🤝 Contributing

Pull requests are welcome! Development happens here:
[https://github.com/klen/muffin-kafka](https://github.com/klen/muffin-kafka)

---

## 🪪 License

**MIT** – See [LICENSE](./LICENSE) for full details.

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/klen/muffin-kafka",
    "name": "muffin-kafka",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<4.0,>=3.10",
    "maintainer_email": null,
    "keywords": "kafka, asyncio, asgi, muffin",
    "author": "Kirill Klenov",
    "author_email": "horneds@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/d0/75/8def6e8297adacf36c103c5f52dbc96393f5d801c607872ab2f4809af5be/muffin_kafka-0.7.4.tar.gz",
    "platform": null,
    "description": "# Muffin-Kafka\n\n**Muffin-Kafka** is an [Apache Kafka](https://kafka.apache.org) integration plugin for the [Muffin](https://klen.github.io/muffin) web framework, built on top of `aiokafka`.\n\n[![Tests Status](https://github.com/klen/muffin-kafka/workflows/tests/badge.svg)](https://github.com/klen/muffin-kafka/actions)\n[![PYPI Version](https://img.shields.io/pypi/v/muffin-kafka)](https://pypi.org/project/muffin-kafka/)\n[![Python Versions](https://img.shields.io/pypi/pyversions/muffin-kafka)](https://pypi.org/project/muffin-kafka/)\n\n---\n\n## \ud83d\ude80 Features\n\n- **Async Kafka integration** using `aiokafka`\n- **Per-topic task model** \u2014 each topic is consumed in an isolated asyncio task\n- **Simple handler registration** using `@plugin.handle_topics(...)`\n- **Manual or auto-commit support**, custom group IDs\n- **Producer support** (`send` / `send_and_wait`)\n- **Built-in monitoring** with offsets, lag, and poll delay\n- **Healthcheck support** for liveness probes and observability\n- Optional error handler via `@plugin.handle_error(...)`\n\n---\n\n## \u2705 Requirements\n\n- Python \u2265 3.10\n- Muffin \u2265 0.71\n- Kafka cluster or broker (local or cloud)\n\n---\n\n## \ud83d\udce6 Installation\n\n```bash\npip install muffin-kafka\n```\n\n## \u2699\ufe0f Usage\n\n```python\n    from muffin import Application\n    from muffin_kafka import Kafka\n\n    app = Application(\"example\")\n\n    # Initialize the plugin with config options\n    kafka = Kafka(app, bootstrap_servers=\"localhost:9092\", produce=True, listen=True)\n```\n\n### \ud83e\udde9 Registering Handlers\n\nUse `@kafka.handle_topics(...)` to register a handler for specific Kafka topics:\n\n```python\n    @kafka.handle_topics(\"events.user\", \"events.auth\")\n    async def handle_event(message):\n        data = message.value.decode()\n        print(\"Received:\", data)\n```\n\nYou can also register a global error handler:\n\n```python\n    @kafka.handle_error\n    async def on_error(exc):\n        print(\"Kafka error:\", exc)\n```\n\n### \ud83d\udce4 Sending Messages\n\nYou can send messages to Kafka topics using the `send` or `send_and_wait` methods:\n\n```python\n    # Send a message without waiting for acknowledgment\n    await kafka.send(\"events.user\", {\"action\": \"signup\"}, key=\"user123\")\n\n    # Or wait for broker acknowledgment\n    result = await kafka.send_and_wait(\"events.user\", {\"action\": \"login\"})\n```\n\n### \ud83d\udd04 Healthcheck\n\nYou can monitor consumer health by checking lag across partitions:\n\n```python\n    # Check if Kafka lag is within acceptable limits\n    ok = await kafka.healthcheck(max_lag=1000)\n    if not ok:\n        raise RuntimeError(\"Kafka lag too high\")\n```\n\n## \ud83d\udcca Monitoring\n\nIf monitor=True is passed, the plugin will log:\n\n- Committed offsets\n- Latest offsets\n- Poll timestamps\n- Per-partition lag and delay\n\nThis data can be extended for Prometheus/Grafana metrics or alerting.\n\n## \u2699\ufe0f Configuration Options\n\nYou can pass configuration options either as keyword arguments to the plugin:\n\n```python\nkafka = Kafka(app, bootstrap_servers=\"localhost:9092\", produce=True)\n```\n\nOr set them via Muffin's config system (e.g. `.env`, YAML):\n\n```python\n\"KAFKA_BOOTSTRAP_SERVERS\": \"localhost:9092\",\n\"KAFKA_PRODUCE\": True,\n```\n\n### Supported Options:\n\n| Option                | Type   | Default            | Description                                 |\n| --------------------- | ------ | ------------------ | ------------------------------------------- |\n| `bootstrap_servers`   | `str`  | `\"localhost:9092\"` | Kafka broker connection string              |\n| `group_id`            | `str`  | `None`             | Kafka consumer group ID                     |\n| `client_id`           | `str`  | `\"muffin\"`         | Kafka client ID                             |\n| `produce`             | `bool` | `False`            | Enable Kafka producer                       |\n| `listen`              | `bool` | `True`             | Enable consumers (message listening)        |\n| `monitor`             | `bool` | `False`            | Enable internal consumer monitor            |\n| `monitor_interval`    | `int`  | `60`               | Monitor frequency in seconds                |\n| `auto_offset_reset`   | `str`  | `\"earliest\"`       | Where to start if no committed offset       |\n| `enable_auto_commit`  | `bool` | `False`            | Automatically commit offsets                |\n| `max_poll_records`    | `int`  | `None`             | Max records to poll in one batch            |\n| `request_timeout_ms`  | `int`  | `30000`            | Request timeout                             |\n| `retry_backoff_ms`    | `int`  | `1000`             | Retry interval on failure                   |\n| `security_protocol`   | `str`  | `\"PLAINTEXT\"`      | Kafka protocol (`SSL`, `SASL_PLAINTEXT`, \u2026) |\n| `sasl_mechanism`      | `str`  | `\"PLAIN\"`          | SASL auth mechanism                         |\n| `sasl_plain_username` | `str`  | `None`             | SASL auth user                              |\n| `sasl_plain_password` | `str`  | `None`             | SASL auth password                          |\n| `ssl_cafile`          | `str`  | `None`             | Path to trusted CA certs                    |\n\n---\n\n## \ud83d\udc1e Bug Tracker\n\nFound a bug or have a feature request? Please open an issue at:\n[https://github.com/klen/muffin-kafka/issues](https://github.com/klen/muffin-kafka/issues)\n\n---\n\n## \ud83e\udd1d Contributing\n\nPull requests are welcome! Development happens here:\n[https://github.com/klen/muffin-kafka](https://github.com/klen/muffin-kafka)\n\n---\n\n## \ud83e\udeaa License\n\n**MIT** \u2013 See [LICENSE](./LICENSE) for full details.\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Kafka Integration for Muffin framework",
    "version": "0.7.4",
    "project_urls": {
        "Homepage": "https://github.com/klen/muffin-kafka",
        "Repository": "https://github.com/klen/muffin-kafka"
    },
    "split_keywords": [
        "kafka",
        " asyncio",
        " asgi",
        " muffin"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "ae84829fd355d5477aa44df822b7d236b1da2a34156ca75a8ef25278f0238d74",
                "md5": "ca2f75d03f3dc43a3c6e2d3e028a1241",
                "sha256": "1a3e7715726fe32602007c8fd797dfdb164d5cad0527ddd4ad2a46d59d4f8dc9"
            },
            "downloads": -1,
            "filename": "muffin_kafka-0.7.4-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "ca2f75d03f3dc43a3c6e2d3e028a1241",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<4.0,>=3.10",
            "size": 6610,
            "upload_time": "2025-07-15T11:51:59",
            "upload_time_iso_8601": "2025-07-15T11:51:59.390663Z",
            "url": "https://files.pythonhosted.org/packages/ae/84/829fd355d5477aa44df822b7d236b1da2a34156ca75a8ef25278f0238d74/muffin_kafka-0.7.4-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "d0758def6e8297adacf36c103c5f52dbc96393f5d801c607872ab2f4809af5be",
                "md5": "3ede5705022fba2f63c15c4d827db1c7",
                "sha256": "d7dd7e677e7a0db2448d833cf5291d7b48c8e2f2e82c9381bcbcecd1241c1549"
            },
            "downloads": -1,
            "filename": "muffin_kafka-0.7.4.tar.gz",
            "has_sig": false,
            "md5_digest": "3ede5705022fba2f63c15c4d827db1c7",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<4.0,>=3.10",
            "size": 6201,
            "upload_time": "2025-07-15T11:52:00",
            "upload_time_iso_8601": "2025-07-15T11:52:00.354117Z",
            "url": "https://files.pythonhosted.org/packages/d0/75/8def6e8297adacf36c103c5f52dbc96393f5d801c607872ab2f4809af5be/muffin_kafka-0.7.4.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-07-15 11:52:00",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "klen",
    "github_project": "muffin-kafka",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "muffin-kafka"
}
        
Elapsed time: 0.42193s