fastkafka


Namefastkafka JSON
Version 0.8.0 PyPI version JSON
download
home_pagehttps://github.com/airtai/fastkafka
SummaryFastKafka is a powerful and easy-to-use Python library for building asynchronous web services that interact with Kafka topics. Built on top of FastAPI, Starlette, Pydantic, AIOKafka and AsyncAPI, FastKafka simplifies the process of writing producers and consumers for Kafka topics.
upload_time2023-07-06 14:45:21
maintainer
docs_urlNone
authorairt
requires_python>=3.8
licenseApache Software License 2.0
keywords nbdev jupyter notebook python kafka
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # FastKafka

<!-- WARNING: THIS FILE WAS AUTOGENERATED! DO NOT EDIT! -->

<b>Effortless Kafka integration for your web services</b>

------------------------------------------------------------------------

![PyPI](https://img.shields.io/pypi/v/fastkafka.png) ![PyPI -
Downloads](https://img.shields.io/pypi/dm/fastkafka.png) ![PyPI - Python
Version](https://img.shields.io/pypi/pyversions/fastkafka.png)

![GitHub Workflow
Status](https://img.shields.io/github/actions/workflow/status/airtai/fastkafka/test.yaml)
![CodeQL](https://github.com/airtai/fastkafka//actions/workflows/codeql.yml/badge.svg)
![Dependency
Review](https://github.com/airtai/fastkafka//actions/workflows/dependency-review.yml/badge.svg)

![GitHub](https://img.shields.io/github/license/airtai/fastkafka.png)

------------------------------------------------------------------------

[FastKafka](https://fastkafka.airt.ai/) is a powerful and easy-to-use
Python library for building asynchronous services that interact with
Kafka topics. Built on top of [Pydantic](https://docs.pydantic.dev/),
[AIOKafka](https://github.com/aio-libs/aiokafka) and
[AsyncAPI](https://www.asyncapi.com/), FastKafka simplifies the process
of writing producers and consumers for Kafka topics, handling all the
parsing, networking, task scheduling and data generation automatically.
With FastKafka, you can quickly prototype and develop high-performance
Kafka-based services with minimal code, making it an ideal choice for
developers looking to streamline their workflow and accelerate their
projects.

------------------------------------------------------------------------

#### ⭐⭐⭐ Stay in touch ⭐⭐⭐

Please show your support and stay in touch by:

- giving our [GitHub repository](https://github.com/airtai/fastkafka/) a
  star, and

- joining our [Discord server](https://discord.gg/CJWmYpyFbc).

Your support helps us to stay in touch with you and encourages us to
continue developing and improving the library. Thank you for your
support!

------------------------------------------------------------------------

#### 🐝🐝🐝 We were busy lately 🐝🐝🐝

![Activity](https://repobeats.axiom.co/api/embed/21f36049093d5eb8e5fdad18c3c5d8df5428ca30.svg "Repobeats analytics image")

## Install

FastKafka works on Windows, macOS, Linux, and most Unix-style operating
systems. You can install base version of FastKafka with `pip` as usual:

``` sh
pip install fastkafka
```

To install FastKafka with testing features please use:

``` sh
pip install fastkafka[test]
```

To install FastKafka with asyncapi docs please use:

``` sh
pip install fastkafka[docs]
```

To install FastKafka with all the features please use:

``` sh
pip install fastkafka[test,docs]
```

## Tutorial

You can start an interactive tutorial in Google Colab by clicking the
button below:

<a href="https://colab.research.google.com/github/airtai/fastkafka/blob/main/nbs/index.ipynb" target=”_blank”>
<img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open in Colab" />
</a>

## Writing server code

To demonstrate FastKafka simplicity of using `@produces` and `@consumes`
decorators, we will focus on a simple app.

The app will consume jsons containig positive floats from one topic, log
them and then produce incremented values to another topic.

### Messages

FastKafka uses [Pydantic](https://docs.pydantic.dev/) to parse input
JSON-encoded data into Python objects, making it easy to work with
structured data in your Kafka-based applications. Pydantic’s
[`BaseModel`](https://docs.pydantic.dev/usage/models/) class allows you
to define messages using a declarative syntax, making it easy to specify
the fields and types of your messages.

This example defines one `Data` mesage class. This Class will model the
consumed and produced data in our app demo, it contains one
`NonNegativeFloat` field `data` that will be logged and “processed”
before being produced to another topic.

These message class will be used to parse and validate incoming data in
Kafka consumers and producers.

``` python
from pydantic import BaseModel, Field, NonNegativeFloat


class Data(BaseModel):
    data: NonNegativeFloat = Field(
        ..., example=0.5, description="Float data example"
    )
```

### Application

This example shows how to initialize a FastKafka application.

It starts by defining a dictionary called `kafka_brokers`, which
contains two entries: `"localhost"` and `"production"`, specifying local
development and production Kafka brokers. Each entry specifies the URL,
port, and other details of a Kafka broker. This dictionary is used for
both generating the documentation and later to run the actual server
against one of the given kafka broker.

Next, an object of the
[`FastKafka`](https://airtai.github.io/fastkafka/docs/api/fastkafka#fastkafka.FastKafka)
class is initialized with the minimum set of arguments:

- `kafka_brokers`: a dictionary used for generation of documentation

We will also import and create a logger so that we can log the incoming
data in our consuming function.

``` python
from logging import getLogger
from fastkafka import FastKafka

logger = getLogger("Demo Kafka app")

kafka_brokers = {
    "localhost": {
        "url": "localhost",
        "description": "local development kafka broker",
        "port": 9092,
    },
    "production": {
        "url": "kafka.airt.ai",
        "description": "production kafka broker",
        "port": 9092,
        "protocol": "kafka-secure",
        "security": {"type": "plain"},
    },
}

kafka_app = FastKafka(
    title="Demo Kafka app",
    kafka_brokers=kafka_brokers,
)
```

### Function decorators

FastKafka provides convenient function decorators `@kafka_app.consumes`
and `@kafka_app.produces` to allow you to delegate the actual process of

- consuming and producing data to Kafka, and

- decoding and encoding JSON encode messages

from user defined functions to the framework. The FastKafka framework
delegates these jobs to AIOKafka and Pydantic libraries.

These decorators make it easy to specify the processing logic for your
Kafka consumers and producers, allowing you to focus on the core
business logic of your application without worrying about the underlying
Kafka integration.

This following example shows how to use the `@kafka_app.consumes` and
`@kafka_app.produces` decorators in a FastKafka application:

- The `@kafka_app.consumes` decorator is applied to the `on_input_data`
  function, which specifies that this function should be called whenever
  a message is received on the “input_data” Kafka topic. The
  `on_input_data` function takes a single argument which is expected to
  be an instance of the `Data` message class. Specifying the type of the
  single argument is instructing the Pydantic to use `Data.parse_raw()`
  on the consumed message before passing it to the user defined function
  `on_input_data`.

- The `@produces` decorator is applied to the `to_output_data` function,
  which specifies that this function should produce a message to the
  “output_data” Kafka topic whenever it is called. The `to_output_data`
  function takes a single float argument `data`. It it increments the
  data returns it wrapped in a `Data` object. The framework will call
  the `Data.json().encode("utf-8")` function on the returned value and
  produce it to the specified topic.

``` python
@kafka_app.consumes(topic="input_data", auto_offset_reset="latest")
async def on_input_data(msg: Data):
    logger.info(f"Got data: {msg.data}")
    await to_output_data(msg.data)


@kafka_app.produces(topic="output_data")
async def to_output_data(data: float) -> Data:
    processed_data = Data(data=data+1.0)
    return processed_data
```

## Testing the service

The service can be tested using the
[`Tester`](https://airtai.github.io/fastkafka/docs/api/fastkafka/testing/Tester#fastkafka.testing.Tester)
instances which internally starts InMemory implementation of Kafka
broker.

The Tester will redirect your consumes and produces decorated functions
to the InMemory Kafka broker so that you can quickly test your app
without the need for a running Kafka broker and all its dependencies.

``` python
from fastkafka.testing import Tester

msg = Data(
    data=0.1,
)

# Start Tester app and create InMemory Kafka broker for testing
async with Tester(kafka_app) as tester:
    # Send Data message to input_data topic
    await tester.to_input_data(msg)

    # Assert that the kafka_app responded with incremented data in output_data topic
    await tester.awaited_mocks.on_output_data.assert_awaited_with(
        Data(data=1.1), timeout=2
    )
```

    [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker._start() called
    [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker._patch_consumers_and_producers(): Patching consumers and producers!
    [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker starting
    [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': 'localhost:9092'}'
    [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched start() called()
    [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': 'localhost:9092'}'
    [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched start() called()
    [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
    [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'bootstrap_servers': 'localhost:9092', 'auto_offset_reset': 'latest', 'max_poll_records': 100}
    [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched start() called()
    [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
    [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched subscribe() called
    [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer.subscribe(), subscribing to: ['input_data']
    [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
    [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
    [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'bootstrap_servers': 'localhost:9092', 'auto_offset_reset': 'earliest', 'max_poll_records': 100}
    [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched start() called()
    [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
    [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched subscribe() called
    [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer.subscribe(), subscribing to: ['output_data']
    [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
    [INFO] Demo Kafka app: Got data: 0.1
    [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched stop() called
    [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
    [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
    [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched stop() called
    [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched stop() called
    [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
    [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
    [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched stop() called
    [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker._stop() called
    [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker stopping

### Recap

We have created a simple FastKafka application. The app will consume the
`Data` from the `input_data` topic, log it and produce the incremented
data to `output_data` topic.

To test the app we have:

1.  Created the app

2.  Started our Tester class which mirrors the developed app topics for
    testing purposes

3.  Sent Data message to `input_data` topic

4.  Asserted and checked that the developed service has reacted to Data
    message

## Running the service

The service can be started using builtin faskafka run CLI command.
Before we can do that, we will concatenate the code snippets from above
and save them in a file `"application.py"`

``` python
# content of the "application.py" file

from pydantic import BaseModel, Field, NonNegativeFloat

from fastkafka import FastKafka
from fastkafka._components.logger import get_logger

logger = get_logger(__name__)

class Data(BaseModel):
    data: NonNegativeFloat = Field(
        ..., example=0.5, description="Float data example"
    )

kafka_brokers = {
    "localhost": {
        "url": "localhost",
        "description": "local development kafka broker",
        "port": 9092,
    },
    "production": {
        "url": "kafka.airt.ai",
        "description": "production kafka broker",
        "port": 9092,
        "protocol": "kafka-secure",
        "security": {"type": "plain"},
    },
}

kafka_app = FastKafka(
    title="Demo Kafka app",
    kafka_brokers=kafka_brokers,
)

@kafka_app.consumes(topic="input_data", auto_offset_reset="latest")
async def on_input_data(msg: Data):
    logger.info(f"Got data: {msg.data}")
    await to_output_data(msg.data)


@kafka_app.produces(topic="output_data")
async def to_output_data(data: float) -> Data:
    processed_data = Data(data=data+1.0)
    return processed_data
```

To run the service, use the FastKafka CLI command and pass the module
(in this case, the file where the app implementation is located) and the
app simbol to the command.

``` sh
fastkafka run --num-workers=1 --kafka-broker localhost application:kafka_app
```

After running the command, you should see the following output in your
command line:

    [1504]: 23-05-31 11:36:45.874 [INFO] fastkafka._application.app: set_kafka_broker() : Setting bootstrap_servers value to 'localhost:9092'
    [1504]: 23-05-31 11:36:45.875 [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': 'localhost:9092'}'
    [1504]: 23-05-31 11:36:45.937 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
    [1504]: 23-05-31 11:36:45.937 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'bootstrap_servers': 'localhost:9092', 'auto_offset_reset': 'latest', 'max_poll_records': 100}
    [1504]: 23-05-31 11:36:45.956 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
    [1504]: 23-05-31 11:36:45.956 [INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'input_data'})
    [1504]: 23-05-31 11:36:45.956 [INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'input_data'}
    [1504]: 23-05-31 11:36:45.956 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
    [1506]: 23-05-31 11:36:45.993 [INFO] fastkafka._application.app: set_kafka_broker() : Setting bootstrap_servers value to 'localhost:9092'
    [1506]: 23-05-31 11:36:45.994 [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': 'localhost:9092'}'
    [1506]: 23-05-31 11:36:46.014 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...
    [1506]: 23-05-31 11:36:46.015 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'bootstrap_servers': 'localhost:9092', 'auto_offset_reset': 'latest', 'max_poll_records': 100}
    [1506]: 23-05-31 11:36:46.040 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.
    [1506]: 23-05-31 11:36:46.042 [INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'input_data'})
    [1506]: 23-05-31 11:36:46.043 [INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'input_data'}
    [1506]: 23-05-31 11:36:46.043 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.
    [1506]: 23-05-31 11:36:46.068 [ERROR] aiokafka.cluster: Topic input_data not found in cluster metadata
    [1506]: 23-05-31 11:36:46.070 [INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'input_data': 0}. 
    [1504]: 23-05-31 11:36:46.131 [WARNING] aiokafka.cluster: Topic input_data is not available during auto-create initialization
    [1504]: 23-05-31 11:36:46.132 [INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'input_data': 0}. 
    [1506]: 23-05-31 11:37:00.237 [ERROR] aiokafka: Unable connect to node with id 0: [Errno 111] Connect call failed ('172.28.0.12', 9092)
    [1506]: 23-05-31 11:37:00.237 [ERROR] aiokafka: Unable to update metadata from [0]
    [1504]: 23-05-31 11:37:00.238 [ERROR] aiokafka: Unable connect to node with id 0: [Errno 111] Connect call failed ('172.28.0.12', 9092)
    [1504]: 23-05-31 11:37:00.238 [ERROR] aiokafka: Unable to update metadata from [0]
    [1506]: 23-05-31 11:37:00.294 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
    [1506]: 23-05-31 11:37:00.294 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
    Starting process cleanup, this may take a few seconds...
    23-05-31 11:37:00.345 [INFO] fastkafka._server: terminate_asyncio_process(): Terminating the process 1504...
    23-05-31 11:37:00.345 [INFO] fastkafka._server: terminate_asyncio_process(): Terminating the process 1506...
    [1504]: 23-05-31 11:37:00.347 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.
    [1504]: 23-05-31 11:37:00.347 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.
    23-05-31 11:37:00.607 [INFO] fastkafka._server: terminate_asyncio_process(): Process 1506 was already terminated.
    23-05-31 11:37:00.822 [INFO] fastkafka._server: terminate_asyncio_process(): Process 1504 was already terminated.

## Documentation

The kafka app comes with builtin documentation generation using
[AsyncApi HTML generator](https://www.asyncapi.com/tools/generator).

AsyncApi requires Node.js to be installed and we provide the following
convenience command line for it:

``` sh
fastkafka docs install_deps
```

    23-05-31 11:38:24.128 [INFO] fastkafka._components.docs_dependencies: AsyncAPI generator installed

To generate the documentation programatically you just need to call the
following command:

``` sh
fastkafka docs generate application:kafka_app
```

    23-05-31 11:38:25.113 [INFO] fastkafka._components.asyncapi: Old async specifications at '/content/asyncapi/spec/asyncapi.yml' does not exist.
    23-05-31 11:38:25.118 [INFO] fastkafka._components.asyncapi: New async specifications generated at: '/content/asyncapi/spec/asyncapi.yml'
    23-05-31 11:38:43.455 [INFO] fastkafka._components.asyncapi: Async docs generated at 'asyncapi/docs'
    23-05-31 11:38:43.455 [INFO] fastkafka._components.asyncapi: Output of '$ npx -y -p @asyncapi/generator ag asyncapi/spec/asyncapi.yml @asyncapi/html-template -o asyncapi/docs --force-write'

    Done! ✨
    Check out your shiny new generated files at /content/asyncapi/docs.

This will generate the *asyncapi* folder in relative path where all your
documentation will be saved. You can check out the content of it with:

``` sh
ls -l asyncapi
```

    total 8
    drwxr-xr-x 4 root root 4096 May 31 11:38 docs
    drwxr-xr-x 2 root root 4096 May 31 11:38 spec

In docs folder you will find the servable static html file of your
documentation. This can also be served using our `fastkafka docs serve`
CLI command (more on that in our guides).

In spec folder you will find a asyncapi.yml file containing the async
API specification of your application.

We can locally preview the generated documentation by running the
following command:

``` sh
fastkafka docs serve application:kafka_app
```

    23-05-31 11:38:45.250 [INFO] fastkafka._components.asyncapi: New async specifications generated at: '/content/asyncapi/spec/asyncapi.yml'
    23-05-31 11:39:04.410 [INFO] fastkafka._components.asyncapi: Async docs generated at 'asyncapi/docs'
    23-05-31 11:39:04.411 [INFO] fastkafka._components.asyncapi: Output of '$ npx -y -p @asyncapi/generator ag asyncapi/spec/asyncapi.yml @asyncapi/html-template -o asyncapi/docs --force-write'

    Done! ✨
    Check out your shiny new generated files at /content/asyncapi/docs.


    Serving documentation on http://127.0.0.1:8000
    127.0.0.1 - - [31/May/2023 11:39:14] "GET / HTTP/1.1" 200 -
    127.0.0.1 - - [31/May/2023 11:39:14] "GET /css/global.min.css HTTP/1.1" 200 -
    127.0.0.1 - - [31/May/2023 11:39:14] "GET /js/asyncapi-ui.min.js HTTP/1.1" 200 -
    127.0.0.1 - - [31/May/2023 11:39:14] "GET /css/asyncapi.min.css HTTP/1.1" 200 -
    Interupting serving of documentation and cleaning up...

From the parameters passed to the application constructor, we get the
documentation bellow:

``` python
from fastkafka import FastKafka

kafka_brokers = {
    "localhost": {
        "url": "localhost",
        "description": "local development kafka broker",
        "port": 9092,
    },
    "production": {
        "url": "kafka.airt.ai",
        "description": "production kafka broker",
        "port": 9092,
        "protocol": "kafka-secure",
        "security": {"type": "plain"},
    },
}

kafka_app = FastKafka(
    title="Demo Kafka app",
    kafka_brokers=kafka_brokers,
)
```

![Kafka_servers](https://raw.githubusercontent.com/airtai/fastkafka/main/nbs/images/screenshot-kafka-servers.png)

The following documentation snippet are for the consumer as specified in
the code above:

![Kafka_consumer](https://raw.githubusercontent.com/airtai/fastkafka/main/nbs/images/screenshot-kafka-consumer.png)

The following documentation snippet are for the producer as specified in
the code above:

![Kafka_producer](https://raw.githubusercontent.com/airtai/fastkafka/main/nbs/images/screenshot-kafka-producer.png)

Finally, all messages as defined as subclasses of *BaseModel* are
documented as well:

![Kafka\_![Kafka_servers](https://raw.githubusercontent.com/airtai/fastkafka/main/nbs/images/screenshot-kafka-messages.png)](https://raw.githubusercontent.com/airtai/fastkafka/main/nbs/images/screenshot-kafka-messages.png)

## License

FastKafka is licensed under the Apache License 2.0

A permissive license whose main conditions require preservation of
copyright and license notices. Contributors provide an express grant of
patent rights. Licensed works, modifications, and larger works may be
distributed under different terms and without source code.

The full text of the license can be found
[here](https://raw.githubusercontent.com/airtai/fastkafka/main/LICENSE).



            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/airtai/fastkafka",
    "name": "fastkafka",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.8",
    "maintainer_email": "",
    "keywords": "nbdev jupyter notebook python kafka",
    "author": "airt",
    "author_email": "info@airt.ai",
    "download_url": "https://files.pythonhosted.org/packages/10/40/64b445a7d9228b9d90c22f5d761eb535c8321ff7b438d3f2041c18578813/fastkafka-0.8.0.tar.gz",
    "platform": null,
    "description": "# FastKafka\n\n<!-- WARNING: THIS FILE WAS AUTOGENERATED! DO NOT EDIT! -->\n\n<b>Effortless Kafka integration for your web services</b>\n\n------------------------------------------------------------------------\n\n![PyPI](https://img.shields.io/pypi/v/fastkafka.png) ![PyPI -\nDownloads](https://img.shields.io/pypi/dm/fastkafka.png) ![PyPI - Python\nVersion](https://img.shields.io/pypi/pyversions/fastkafka.png)\n\n![GitHub Workflow\nStatus](https://img.shields.io/github/actions/workflow/status/airtai/fastkafka/test.yaml)\n![CodeQL](https://github.com/airtai/fastkafka//actions/workflows/codeql.yml/badge.svg)\n![Dependency\nReview](https://github.com/airtai/fastkafka//actions/workflows/dependency-review.yml/badge.svg)\n\n![GitHub](https://img.shields.io/github/license/airtai/fastkafka.png)\n\n------------------------------------------------------------------------\n\n[FastKafka](https://fastkafka.airt.ai/) is a powerful and easy-to-use\nPython library for building asynchronous services that interact with\nKafka topics. Built on top of [Pydantic](https://docs.pydantic.dev/),\n[AIOKafka](https://github.com/aio-libs/aiokafka) and\n[AsyncAPI](https://www.asyncapi.com/), FastKafka simplifies the process\nof writing producers and consumers for Kafka topics, handling all the\nparsing, networking, task scheduling and data generation automatically.\nWith FastKafka, you can quickly prototype and develop high-performance\nKafka-based services with minimal code, making it an ideal choice for\ndevelopers looking to streamline their workflow and accelerate their\nprojects.\n\n------------------------------------------------------------------------\n\n#### \u2b50\u2b50\u2b50 Stay in touch \u2b50\u2b50\u2b50\n\nPlease show your support and stay in touch by:\n\n- giving our [GitHub repository](https://github.com/airtai/fastkafka/) a\n  star, and\n\n- joining our [Discord server](https://discord.gg/CJWmYpyFbc).\n\nYour support helps us to stay in touch with you and encourages us to\ncontinue developing and improving the library. Thank you for your\nsupport!\n\n------------------------------------------------------------------------\n\n#### \ud83d\udc1d\ud83d\udc1d\ud83d\udc1d We were busy lately \ud83d\udc1d\ud83d\udc1d\ud83d\udc1d\n\n![Activity](https://repobeats.axiom.co/api/embed/21f36049093d5eb8e5fdad18c3c5d8df5428ca30.svg \"Repobeats analytics image\")\n\n## Install\n\nFastKafka works on Windows, macOS, Linux, and most Unix-style operating\nsystems. You can install base version of FastKafka with `pip` as usual:\n\n``` sh\npip install fastkafka\n```\n\nTo install FastKafka with testing features please use:\n\n``` sh\npip install fastkafka[test]\n```\n\nTo install FastKafka with asyncapi docs please use:\n\n``` sh\npip install fastkafka[docs]\n```\n\nTo install FastKafka with all the features please use:\n\n``` sh\npip install fastkafka[test,docs]\n```\n\n## Tutorial\n\nYou can start an interactive tutorial in Google Colab by clicking the\nbutton below:\n\n<a href=\"https://colab.research.google.com/github/airtai/fastkafka/blob/main/nbs/index.ipynb\" target=\u201d_blank\u201d>\n<img src=\"https://colab.research.google.com/assets/colab-badge.svg\" alt=\"Open in Colab\" />\n</a>\n\n## Writing server code\n\nTo demonstrate FastKafka simplicity of using `@produces` and `@consumes`\ndecorators, we will focus on a simple app.\n\nThe app will consume jsons containig positive floats from one topic, log\nthem and then produce incremented values to another topic.\n\n### Messages\n\nFastKafka uses [Pydantic](https://docs.pydantic.dev/) to parse input\nJSON-encoded data into Python objects, making it easy to work with\nstructured data in your Kafka-based applications. Pydantic\u2019s\n[`BaseModel`](https://docs.pydantic.dev/usage/models/) class allows you\nto define messages using a declarative syntax, making it easy to specify\nthe fields and types of your messages.\n\nThis example defines one `Data` mesage class. This Class will model the\nconsumed and produced data in our app demo, it contains one\n`NonNegativeFloat` field `data` that will be logged and \u201cprocessed\u201d\nbefore being produced to another topic.\n\nThese message class will be used to parse and validate incoming data in\nKafka consumers and producers.\n\n``` python\nfrom pydantic import BaseModel, Field, NonNegativeFloat\n\n\nclass Data(BaseModel):\n    data: NonNegativeFloat = Field(\n        ..., example=0.5, description=\"Float data example\"\n    )\n```\n\n### Application\n\nThis example shows how to initialize a FastKafka application.\n\nIt starts by defining a dictionary called `kafka_brokers`, which\ncontains two entries: `\"localhost\"` and `\"production\"`, specifying local\ndevelopment and production Kafka brokers. Each entry specifies the URL,\nport, and other details of a Kafka broker. This dictionary is used for\nboth generating the documentation and later to run the actual server\nagainst one of the given kafka broker.\n\nNext, an object of the\n[`FastKafka`](https://airtai.github.io/fastkafka/docs/api/fastkafka#fastkafka.FastKafka)\nclass is initialized with the minimum set of arguments:\n\n- `kafka_brokers`: a dictionary used for generation of documentation\n\nWe will also import and create a logger so that we can log the incoming\ndata in our consuming function.\n\n``` python\nfrom logging import getLogger\nfrom fastkafka import FastKafka\n\nlogger = getLogger(\"Demo Kafka app\")\n\nkafka_brokers = {\n    \"localhost\": {\n        \"url\": \"localhost\",\n        \"description\": \"local development kafka broker\",\n        \"port\": 9092,\n    },\n    \"production\": {\n        \"url\": \"kafka.airt.ai\",\n        \"description\": \"production kafka broker\",\n        \"port\": 9092,\n        \"protocol\": \"kafka-secure\",\n        \"security\": {\"type\": \"plain\"},\n    },\n}\n\nkafka_app = FastKafka(\n    title=\"Demo Kafka app\",\n    kafka_brokers=kafka_brokers,\n)\n```\n\n### Function decorators\n\nFastKafka provides convenient function decorators `@kafka_app.consumes`\nand `@kafka_app.produces` to allow you to delegate the actual process of\n\n- consuming and producing data to Kafka, and\n\n- decoding and encoding JSON encode messages\n\nfrom user defined functions to the framework. The FastKafka framework\ndelegates these jobs to AIOKafka and Pydantic libraries.\n\nThese decorators make it easy to specify the processing logic for your\nKafka consumers and producers, allowing you to focus on the core\nbusiness logic of your application without worrying about the underlying\nKafka integration.\n\nThis following example shows how to use the `@kafka_app.consumes` and\n`@kafka_app.produces` decorators in a FastKafka application:\n\n- The `@kafka_app.consumes` decorator is applied to the `on_input_data`\n  function, which specifies that this function should be called whenever\n  a message is received on the \u201cinput_data\u201d Kafka topic. The\n  `on_input_data` function takes a single argument which is expected to\n  be an instance of the `Data` message class. Specifying the type of the\n  single argument is instructing the Pydantic to use `Data.parse_raw()`\n  on the consumed message before passing it to the user defined function\n  `on_input_data`.\n\n- The `@produces` decorator is applied to the `to_output_data` function,\n  which specifies that this function should produce a message to the\n  \u201coutput_data\u201d Kafka topic whenever it is called. The `to_output_data`\n  function takes a single float argument `data`. It it increments the\n  data returns it wrapped in a `Data` object. The framework will call\n  the `Data.json().encode(\"utf-8\")` function on the returned value and\n  produce it to the specified topic.\n\n``` python\n@kafka_app.consumes(topic=\"input_data\", auto_offset_reset=\"latest\")\nasync def on_input_data(msg: Data):\n    logger.info(f\"Got data: {msg.data}\")\n    await to_output_data(msg.data)\n\n\n@kafka_app.produces(topic=\"output_data\")\nasync def to_output_data(data: float) -> Data:\n    processed_data = Data(data=data+1.0)\n    return processed_data\n```\n\n## Testing the service\n\nThe service can be tested using the\n[`Tester`](https://airtai.github.io/fastkafka/docs/api/fastkafka/testing/Tester#fastkafka.testing.Tester)\ninstances which internally starts InMemory implementation of Kafka\nbroker.\n\nThe Tester will redirect your consumes and produces decorated functions\nto the InMemory Kafka broker so that you can quickly test your app\nwithout the need for a running Kafka broker and all its dependencies.\n\n``` python\nfrom fastkafka.testing import Tester\n\nmsg = Data(\n    data=0.1,\n)\n\n# Start Tester app and create InMemory Kafka broker for testing\nasync with Tester(kafka_app) as tester:\n    # Send Data message to input_data topic\n    await tester.to_input_data(msg)\n\n    # Assert that the kafka_app responded with incremented data in output_data topic\n    await tester.awaited_mocks.on_output_data.assert_awaited_with(\n        Data(data=1.1), timeout=2\n    )\n```\n\n    [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker._start() called\n    [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker._patch_consumers_and_producers(): Patching consumers and producers!\n    [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker starting\n    [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': 'localhost:9092'}'\n    [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched start() called()\n    [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': 'localhost:9092'}'\n    [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched start() called()\n    [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...\n    [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'bootstrap_servers': 'localhost:9092', 'auto_offset_reset': 'latest', 'max_poll_records': 100}\n    [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched start() called()\n    [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.\n    [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched subscribe() called\n    [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer.subscribe(), subscribing to: ['input_data']\n    [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.\n    [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...\n    [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'bootstrap_servers': 'localhost:9092', 'auto_offset_reset': 'earliest', 'max_poll_records': 100}\n    [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched start() called()\n    [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.\n    [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched subscribe() called\n    [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer.subscribe(), subscribing to: ['output_data']\n    [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.\n    [INFO] Demo Kafka app: Got data: 0.1\n    [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched stop() called\n    [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.\n    [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.\n    [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched stop() called\n    [INFO] fastkafka._testing.in_memory_broker: AIOKafkaConsumer patched stop() called\n    [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.\n    [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.\n    [INFO] fastkafka._testing.in_memory_broker: AIOKafkaProducer patched stop() called\n    [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker._stop() called\n    [INFO] fastkafka._testing.in_memory_broker: InMemoryBroker stopping\n\n### Recap\n\nWe have created a simple FastKafka application. The app will consume the\n`Data` from the `input_data` topic, log it and produce the incremented\ndata to `output_data` topic.\n\nTo test the app we have:\n\n1.  Created the app\n\n2.  Started our Tester class which mirrors the developed app topics for\n    testing purposes\n\n3.  Sent Data message to `input_data` topic\n\n4.  Asserted and checked that the developed service has reacted to Data\n    message\n\n## Running the service\n\nThe service can be started using builtin faskafka run CLI command.\nBefore we can do that, we will concatenate the code snippets from above\nand save them in a file `\"application.py\"`\n\n``` python\n# content of the \"application.py\" file\n\nfrom pydantic import BaseModel, Field, NonNegativeFloat\n\nfrom fastkafka import FastKafka\nfrom fastkafka._components.logger import get_logger\n\nlogger = get_logger(__name__)\n\nclass Data(BaseModel):\n    data: NonNegativeFloat = Field(\n        ..., example=0.5, description=\"Float data example\"\n    )\n\nkafka_brokers = {\n    \"localhost\": {\n        \"url\": \"localhost\",\n        \"description\": \"local development kafka broker\",\n        \"port\": 9092,\n    },\n    \"production\": {\n        \"url\": \"kafka.airt.ai\",\n        \"description\": \"production kafka broker\",\n        \"port\": 9092,\n        \"protocol\": \"kafka-secure\",\n        \"security\": {\"type\": \"plain\"},\n    },\n}\n\nkafka_app = FastKafka(\n    title=\"Demo Kafka app\",\n    kafka_brokers=kafka_brokers,\n)\n\n@kafka_app.consumes(topic=\"input_data\", auto_offset_reset=\"latest\")\nasync def on_input_data(msg: Data):\n    logger.info(f\"Got data: {msg.data}\")\n    await to_output_data(msg.data)\n\n\n@kafka_app.produces(topic=\"output_data\")\nasync def to_output_data(data: float) -> Data:\n    processed_data = Data(data=data+1.0)\n    return processed_data\n```\n\nTo run the service, use the FastKafka CLI command and pass the module\n(in this case, the file where the app implementation is located) and the\napp simbol to the command.\n\n``` sh\nfastkafka run --num-workers=1 --kafka-broker localhost application:kafka_app\n```\n\nAfter running the command, you should see the following output in your\ncommand line:\n\n    [1504]: 23-05-31 11:36:45.874 [INFO] fastkafka._application.app: set_kafka_broker() : Setting bootstrap_servers value to 'localhost:9092'\n    [1504]: 23-05-31 11:36:45.875 [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': 'localhost:9092'}'\n    [1504]: 23-05-31 11:36:45.937 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...\n    [1504]: 23-05-31 11:36:45.937 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'bootstrap_servers': 'localhost:9092', 'auto_offset_reset': 'latest', 'max_poll_records': 100}\n    [1504]: 23-05-31 11:36:45.956 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.\n    [1504]: 23-05-31 11:36:45.956 [INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'input_data'})\n    [1504]: 23-05-31 11:36:45.956 [INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'input_data'}\n    [1504]: 23-05-31 11:36:45.956 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.\n    [1506]: 23-05-31 11:36:45.993 [INFO] fastkafka._application.app: set_kafka_broker() : Setting bootstrap_servers value to 'localhost:9092'\n    [1506]: 23-05-31 11:36:45.994 [INFO] fastkafka._application.app: _create_producer() : created producer using the config: '{'bootstrap_servers': 'localhost:9092'}'\n    [1506]: 23-05-31 11:36:46.014 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() starting...\n    [1506]: 23-05-31 11:36:46.015 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer created using the following parameters: {'bootstrap_servers': 'localhost:9092', 'auto_offset_reset': 'latest', 'max_poll_records': 100}\n    [1506]: 23-05-31 11:36:46.040 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer started.\n    [1506]: 23-05-31 11:36:46.042 [INFO] aiokafka.consumer.subscription_state: Updating subscribed topics to: frozenset({'input_data'})\n    [1506]: 23-05-31 11:36:46.043 [INFO] aiokafka.consumer.consumer: Subscribed to topic(s): {'input_data'}\n    [1506]: 23-05-31 11:36:46.043 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer subscribed.\n    [1506]: 23-05-31 11:36:46.068 [ERROR] aiokafka.cluster: Topic input_data not found in cluster metadata\n    [1506]: 23-05-31 11:36:46.070 [INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'input_data': 0}. \n    [1504]: 23-05-31 11:36:46.131 [WARNING] aiokafka.cluster: Topic input_data is not available during auto-create initialization\n    [1504]: 23-05-31 11:36:46.132 [INFO] aiokafka.consumer.group_coordinator: Metadata for topic has changed from {} to {'input_data': 0}. \n    [1506]: 23-05-31 11:37:00.237 [ERROR] aiokafka: Unable connect to node with id 0: [Errno 111] Connect call failed ('172.28.0.12', 9092)\n    [1506]: 23-05-31 11:37:00.237 [ERROR] aiokafka: Unable to update metadata from [0]\n    [1504]: 23-05-31 11:37:00.238 [ERROR] aiokafka: Unable connect to node with id 0: [Errno 111] Connect call failed ('172.28.0.12', 9092)\n    [1504]: 23-05-31 11:37:00.238 [ERROR] aiokafka: Unable to update metadata from [0]\n    [1506]: 23-05-31 11:37:00.294 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.\n    [1506]: 23-05-31 11:37:00.294 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.\n    Starting process cleanup, this may take a few seconds...\n    23-05-31 11:37:00.345 [INFO] fastkafka._server: terminate_asyncio_process(): Terminating the process 1504...\n    23-05-31 11:37:00.345 [INFO] fastkafka._server: terminate_asyncio_process(): Terminating the process 1506...\n    [1504]: 23-05-31 11:37:00.347 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop(): Consumer stopped.\n    [1504]: 23-05-31 11:37:00.347 [INFO] fastkafka._components.aiokafka_consumer_loop: aiokafka_consumer_loop() finished.\n    23-05-31 11:37:00.607 [INFO] fastkafka._server: terminate_asyncio_process(): Process 1506 was already terminated.\n    23-05-31 11:37:00.822 [INFO] fastkafka._server: terminate_asyncio_process(): Process 1504 was already terminated.\n\n## Documentation\n\nThe kafka app comes with builtin documentation generation using\n[AsyncApi HTML generator](https://www.asyncapi.com/tools/generator).\n\nAsyncApi requires Node.js to be installed and we provide the following\nconvenience command line for it:\n\n``` sh\nfastkafka docs install_deps\n```\n\n    23-05-31 11:38:24.128 [INFO] fastkafka._components.docs_dependencies: AsyncAPI generator installed\n\nTo generate the documentation programatically you just need to call the\nfollowing command:\n\n``` sh\nfastkafka docs generate application:kafka_app\n```\n\n    23-05-31 11:38:25.113 [INFO] fastkafka._components.asyncapi: Old async specifications at '/content/asyncapi/spec/asyncapi.yml' does not exist.\n    23-05-31 11:38:25.118 [INFO] fastkafka._components.asyncapi: New async specifications generated at: '/content/asyncapi/spec/asyncapi.yml'\n    23-05-31 11:38:43.455 [INFO] fastkafka._components.asyncapi: Async docs generated at 'asyncapi/docs'\n    23-05-31 11:38:43.455 [INFO] fastkafka._components.asyncapi: Output of '$ npx -y -p @asyncapi/generator ag asyncapi/spec/asyncapi.yml @asyncapi/html-template -o asyncapi/docs --force-write'\n\n    Done! \u2728\n    Check out your shiny new generated files at /content/asyncapi/docs.\n\nThis will generate the *asyncapi* folder in relative path where all your\ndocumentation will be saved. You can check out the content of it with:\n\n``` sh\nls -l asyncapi\n```\n\n    total 8\n    drwxr-xr-x 4 root root 4096 May 31 11:38 docs\n    drwxr-xr-x 2 root root 4096 May 31 11:38 spec\n\nIn docs folder you will find the servable static html file of your\ndocumentation. This can also be served using our `fastkafka docs serve`\nCLI command (more on that in our guides).\n\nIn spec folder you will find a asyncapi.yml file containing the async\nAPI specification of your application.\n\nWe can locally preview the generated documentation by running the\nfollowing command:\n\n``` sh\nfastkafka docs serve application:kafka_app\n```\n\n    23-05-31 11:38:45.250 [INFO] fastkafka._components.asyncapi: New async specifications generated at: '/content/asyncapi/spec/asyncapi.yml'\n    23-05-31 11:39:04.410 [INFO] fastkafka._components.asyncapi: Async docs generated at 'asyncapi/docs'\n    23-05-31 11:39:04.411 [INFO] fastkafka._components.asyncapi: Output of '$ npx -y -p @asyncapi/generator ag asyncapi/spec/asyncapi.yml @asyncapi/html-template -o asyncapi/docs --force-write'\n\n    Done! \u2728\n    Check out your shiny new generated files at /content/asyncapi/docs.\n\n\n    Serving documentation on http://127.0.0.1:8000\n    127.0.0.1 - - [31/May/2023 11:39:14] \"GET / HTTP/1.1\" 200 -\n    127.0.0.1 - - [31/May/2023 11:39:14] \"GET /css/global.min.css HTTP/1.1\" 200 -\n    127.0.0.1 - - [31/May/2023 11:39:14] \"GET /js/asyncapi-ui.min.js HTTP/1.1\" 200 -\n    127.0.0.1 - - [31/May/2023 11:39:14] \"GET /css/asyncapi.min.css HTTP/1.1\" 200 -\n    Interupting serving of documentation and cleaning up...\n\nFrom the parameters passed to the application constructor, we get the\ndocumentation bellow:\n\n``` python\nfrom fastkafka import FastKafka\n\nkafka_brokers = {\n    \"localhost\": {\n        \"url\": \"localhost\",\n        \"description\": \"local development kafka broker\",\n        \"port\": 9092,\n    },\n    \"production\": {\n        \"url\": \"kafka.airt.ai\",\n        \"description\": \"production kafka broker\",\n        \"port\": 9092,\n        \"protocol\": \"kafka-secure\",\n        \"security\": {\"type\": \"plain\"},\n    },\n}\n\nkafka_app = FastKafka(\n    title=\"Demo Kafka app\",\n    kafka_brokers=kafka_brokers,\n)\n```\n\n![Kafka_servers](https://raw.githubusercontent.com/airtai/fastkafka/main/nbs/images/screenshot-kafka-servers.png)\n\nThe following documentation snippet are for the consumer as specified in\nthe code above:\n\n![Kafka_consumer](https://raw.githubusercontent.com/airtai/fastkafka/main/nbs/images/screenshot-kafka-consumer.png)\n\nThe following documentation snippet are for the producer as specified in\nthe code above:\n\n![Kafka_producer](https://raw.githubusercontent.com/airtai/fastkafka/main/nbs/images/screenshot-kafka-producer.png)\n\nFinally, all messages as defined as subclasses of *BaseModel* are\ndocumented as well:\n\n![Kafka\\_![Kafka_servers](https://raw.githubusercontent.com/airtai/fastkafka/main/nbs/images/screenshot-kafka-messages.png)](https://raw.githubusercontent.com/airtai/fastkafka/main/nbs/images/screenshot-kafka-messages.png)\n\n## License\n\nFastKafka is licensed under the Apache License 2.0\n\nA permissive license whose main conditions require preservation of\ncopyright and license notices. Contributors provide an express grant of\npatent rights. Licensed works, modifications, and larger works may be\ndistributed under different terms and without source code.\n\nThe full text of the license can be found\n[here](https://raw.githubusercontent.com/airtai/fastkafka/main/LICENSE).\n\n\n",
    "bugtrack_url": null,
    "license": "Apache Software License 2.0",
    "summary": "FastKafka is a powerful and easy-to-use Python library for building asynchronous web services that interact with Kafka topics. Built on top of FastAPI, Starlette, Pydantic, AIOKafka and AsyncAPI, FastKafka simplifies the process of writing producers and consumers for Kafka topics.",
    "version": "0.8.0",
    "project_urls": {
        "Bug Tracker": "https://github.com/airtai/fastkafka/issues",
        "CI": "https://github.com/airtai/fastkafka/actions",
        "Documentation": "https://fastkafka.airt.ai/",
        "Homepage": "https://github.com/airtai/fastkafka",
        "Tutorial": "https://colab.research.google.com/github/airtai/fastkafka/blob/main/nbs/guides/Guide_00_FastKafka_Demo.ipynb"
    },
    "split_keywords": [
        "nbdev",
        "jupyter",
        "notebook",
        "python",
        "kafka"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "fd208a9c96c5c30287411a6ed4b227ff746f655c7f27a72f628719785a2fb8e3",
                "md5": "d89c4ca72496d100c7dffeacd5f330d8",
                "sha256": "7e39ef32c7e4bb534a7f907aaa3baad3b758779f374d43b39c3072ca4aada0fa"
            },
            "downloads": -1,
            "filename": "fastkafka-0.8.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "d89c4ca72496d100c7dffeacd5f330d8",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.8",
            "size": 106843,
            "upload_time": "2023-07-06T14:45:19",
            "upload_time_iso_8601": "2023-07-06T14:45:19.038539Z",
            "url": "https://files.pythonhosted.org/packages/fd/20/8a9c96c5c30287411a6ed4b227ff746f655c7f27a72f628719785a2fb8e3/fastkafka-0.8.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "104064b445a7d9228b9d90c22f5d761eb535c8321ff7b438d3f2041c18578813",
                "md5": "10a253a956c98febb397683e7780b7f6",
                "sha256": "f41a44bb95a3e7bb0306355555dc4cab4929363796ed907d656aa67abee05bf0"
            },
            "downloads": -1,
            "filename": "fastkafka-0.8.0.tar.gz",
            "has_sig": false,
            "md5_digest": "10a253a956c98febb397683e7780b7f6",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8",
            "size": 104633,
            "upload_time": "2023-07-06T14:45:21",
            "upload_time_iso_8601": "2023-07-06T14:45:21.445609Z",
            "url": "https://files.pythonhosted.org/packages/10/40/64b445a7d9228b9d90c22f5d761eb535c8321ff7b438d3f2041c18578813/fastkafka-0.8.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2023-07-06 14:45:21",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "airtai",
    "github_project": "fastkafka",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "fastkafka"
}
        
Elapsed time: 0.14084s