kaflow


Namekaflow JSON
Version 0.2.1 PyPI version JSON
download
home_page
SummaryPython Stream processing backed by Apache Kafka.
upload_time2023-04-20 15:39:43
maintainer
docs_urlNone
author
requires_python>=3.8
license
keywords data flow kafka pipeline processing stream
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            <div align="center">
  <h1>kaflow</h1>
  <p>
    <em>Kafka streams topic processing in Python.</em>
  </p>
  <p>
    <a href="https://github.com/gabrielmbmb/kaflow/actions?query=workflow%3ATest+event%3Apush+branch%3Amain" target="_blank">
      <img src="https://github.com/gabrielmbmb/kaflow/workflows/Test/badge.svg?event=push&branch=main" alt="Test">
    </a>
    <a href="https://pypi.org/project/kaflow">
      <img src="https://img.shields.io/pypi/v/kaflow?color=#2cbe4e">
    </a>
    <a href="https://pypi.org/project/kaflow">
      <img src="https://img.shields.io/pypi/pyversions/kaflow?color=#2cbe4e">
    </a>
  </p>
</div>

---

`kaflow` is a simple framework that allows you to build Kafka streams processing aplications in Python with ease.

Some of the features offered by `kaflow`:

- Dependency Injection system inspired by [FastAPI](https://github.com/tiangolo/fastapi) and [xpresso](https://github.com/adriangb/xpresso), and backed by [di](https://github.com/adriangb/di).
- Automatic deserialization of incoming messages and serialization of outgoing messages. Supports popular formats like `JSON`, `Avro` or `Protobuf`.
- Message validation thanks to [pydantic](https://github.com/pydantic/pydantic).

## Requirements

Python 3.8+

## Installation

```shell
pip install kaflow
```

## Example

```python
from kaflow import (
    FromHeader,
    FromKey,
    FromValue,
    Json,
    Kaflow,
    Message,
    MessageOffset,
    MessagePartition,
    MessageTimestamp,
    String,
)
from pydantic import BaseModel


class UserClick(BaseModel):
    user_id: int
    url: str
    timestamp: int


class Key(BaseModel):
    environment: str


app = Kaflow(name="AwesomeKakfaApp", brokers="localhost:9092")


@app.consume(topic="user_clicks", sink_topics=["user_clicks_json"])
async def consume_user_clicks(
    message: FromValue[Json[UserClick]],
    key: FromKey[Json[Key]],
    x_correlation_id: FromHeader[String[str]],
    x_request_id: FromHeader[String[str]],
    partition: MessagePartition,
    offset: MessageOffset,
    timestamp: MessageTimestamp,
) -> Message:
    # Do something with the message
    ...

    # Publish to another topic
    return Message(value=b'{"user_clicked": "true"}')


app.run()

```

            

Raw data

            {
    "_id": null,
    "home_page": "",
    "name": "kaflow",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.8",
    "maintainer_email": "",
    "keywords": "data,flow,kafka,pipeline,processing,stream",
    "author": "",
    "author_email": "Gabriel Martin Blazquez <gmartinbdev@gmail.com>",
    "download_url": "https://files.pythonhosted.org/packages/be/5e/5955a9cc70d2a5a4d6511c5fb6a1122414c99a5791e6036ccd118a30c737/kaflow-0.2.1.tar.gz",
    "platform": null,
    "description": "<div align=\"center\">\n  <h1>kaflow</h1>\n  <p>\n    <em>Kafka streams topic processing in Python.</em>\n  </p>\n  <p>\n    <a href=\"https://github.com/gabrielmbmb/kaflow/actions?query=workflow%3ATest+event%3Apush+branch%3Amain\" target=\"_blank\">\n      <img src=\"https://github.com/gabrielmbmb/kaflow/workflows/Test/badge.svg?event=push&branch=main\" alt=\"Test\">\n    </a>\n    <a href=\"https://pypi.org/project/kaflow\">\n      <img src=\"https://img.shields.io/pypi/v/kaflow?color=#2cbe4e\">\n    </a>\n    <a href=\"https://pypi.org/project/kaflow\">\n      <img src=\"https://img.shields.io/pypi/pyversions/kaflow?color=#2cbe4e\">\n    </a>\n  </p>\n</div>\n\n---\n\n`kaflow` is a simple framework that allows you to build Kafka streams processing aplications in Python with ease.\n\nSome of the features offered by `kaflow`:\n\n- Dependency Injection system inspired by [FastAPI](https://github.com/tiangolo/fastapi) and [xpresso](https://github.com/adriangb/xpresso), and backed by [di](https://github.com/adriangb/di).\n- Automatic deserialization of incoming messages and serialization of outgoing messages. Supports popular formats like `JSON`, `Avro` or `Protobuf`.\n- Message validation thanks to [pydantic](https://github.com/pydantic/pydantic).\n\n## Requirements\n\nPython 3.8+\n\n## Installation\n\n```shell\npip install kaflow\n```\n\n## Example\n\n```python\nfrom kaflow import (\n    FromHeader,\n    FromKey,\n    FromValue,\n    Json,\n    Kaflow,\n    Message,\n    MessageOffset,\n    MessagePartition,\n    MessageTimestamp,\n    String,\n)\nfrom pydantic import BaseModel\n\n\nclass UserClick(BaseModel):\n    user_id: int\n    url: str\n    timestamp: int\n\n\nclass Key(BaseModel):\n    environment: str\n\n\napp = Kaflow(name=\"AwesomeKakfaApp\", brokers=\"localhost:9092\")\n\n\n@app.consume(topic=\"user_clicks\", sink_topics=[\"user_clicks_json\"])\nasync def consume_user_clicks(\n    message: FromValue[Json[UserClick]],\n    key: FromKey[Json[Key]],\n    x_correlation_id: FromHeader[String[str]],\n    x_request_id: FromHeader[String[str]],\n    partition: MessagePartition,\n    offset: MessageOffset,\n    timestamp: MessageTimestamp,\n) -> Message:\n    # Do something with the message\n    ...\n\n    # Publish to another topic\n    return Message(value=b'{\"user_clicked\": \"true\"}')\n\n\napp.run()\n\n```\n",
    "bugtrack_url": null,
    "license": "",
    "summary": "Python Stream processing backed by Apache Kafka.",
    "version": "0.2.1",
    "split_keywords": [
        "data",
        "flow",
        "kafka",
        "pipeline",
        "processing",
        "stream"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "29bb91bd0610f736b55ed7b7d4931fc7edfc55d05cf79199e3e33f4c351d503c",
                "md5": "a5bed1a0fb25eb20bbdc554f21f6df0f",
                "sha256": "c7374c357ce42fce48353fd384fddf48d96dc982143f20d95be54eb75404951a"
            },
            "downloads": -1,
            "filename": "kaflow-0.2.1-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "a5bed1a0fb25eb20bbdc554f21f6df0f",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.8",
            "size": 22866,
            "upload_time": "2023-04-20T15:39:42",
            "upload_time_iso_8601": "2023-04-20T15:39:42.174363Z",
            "url": "https://files.pythonhosted.org/packages/29/bb/91bd0610f736b55ed7b7d4931fc7edfc55d05cf79199e3e33f4c351d503c/kaflow-0.2.1-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "be5e5955a9cc70d2a5a4d6511c5fb6a1122414c99a5791e6036ccd118a30c737",
                "md5": "d852d0b67a6084bf106d2f2259fd2d80",
                "sha256": "c3a614bd444f413b8c3933b4bf9ac9943e3ec473e4c54b401642b868fca81821"
            },
            "downloads": -1,
            "filename": "kaflow-0.2.1.tar.gz",
            "has_sig": false,
            "md5_digest": "d852d0b67a6084bf106d2f2259fd2d80",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8",
            "size": 21064,
            "upload_time": "2023-04-20T15:39:43",
            "upload_time_iso_8601": "2023-04-20T15:39:43.619497Z",
            "url": "https://files.pythonhosted.org/packages/be/5e/5955a9cc70d2a5a4d6511c5fb6a1122414c99a5791e6036ccd118a30c737/kaflow-0.2.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2023-04-20 15:39:43",
    "github": false,
    "gitlab": false,
    "bitbucket": false,
    "lcname": "kaflow"
}
        
Elapsed time: 0.05659s