confluent_avro


Nameconfluent_avro JSON
Version 1.8.0 PyPI version JSON
download
home_pagehttps://github.com/DhiaTN/confluent-avro-py
SummaryAn Avro SerDe implementation that integrates with the confluent
upload_time2020-05-21 11:43:39
maintainer
docs_urlNone
authorDhia Abbassi
requires_python>=3.6
license
keywords avro kafka confluent schema registry
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI
coveralls test coverage No coveralls.
            [![PyPI - Python Version](https://img.shields.io/pypi/pyversions/confluent_avro?label=Python)](https://pypi.org/project/confluent-avro/)
[![Build Status](https://travis-ci.com/DhiaTN/confluent-avro-py.svg?branch=master)](https://travis-ci.com/DhiaTN/avrokafka-py)
[![Maintainability](https://api.codeclimate.com/v1/badges/fd596527a28dcaea1a2d/maintainability)](https://codeclimate.com/github/DhiaTN/confluent-avro-py/maintainability)
[![codecov](https://codecov.io/gh/DhiaTN/confluent-avro-py/branch/master/graph/badge.svg)](https://codecov.io/gh/DhiaTN/confluent-avro-py)
[![PyPI version](https://badge.fury.io/py/confluent_avro.svg)](https://badge.fury.io/py/confluent_avro)
[![PyPI - License](https://img.shields.io/pypi/l/confluent_avro?color=ff69b4&label=License)](https://opensource.org/licenses/Apache-2.0)

<br />
<p align="center">
  <h1 align="center">ConfluentAvro</h1>

  <p align="center">
    An Avro SerDe implementation that integrates with the <a href="https://www.confluent.io/confluent-schema-registry/">confluent schema registry</a> and serializes and deserializes data according to the defined <a href="http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#wire-format">confluent wire format</a>.
    <br />
    <br />
    <a href="examples">View Demo</a>
    ·
    <a href="https://github.com/DhiaTN/confluent-avro-py/issues">Report Bug</a>
    ·
    <a href="https://github.com/DhiaTN/confluent-avro-py/issues">Request Feature</a>
  </p>
</p>

## Getting Started

### Background

To solve [schema management](https://docs.confluent.io/current/schema-registry/index.html) issues and ensure compatibility in the development of Kafka-based applications, the confluent team introduced the schema registry to store and share the schema between the different apps and apply compatibility checks on each newly registered schema. To make the schema sharing easy, they extend the Avro binary format by prepending the schema id before the actual record instead of including the full schema.

-» You can find more about Confluent and Schema Registry in [Confluent documentation](https://docs.confluent.io/current/schema-registry/index.html).

### Implementation 

ConfluentAvro implemented according to the above specification. Before publishing to Kafka topic, the library prepends the schema id to the generated Avro binary and when consuming from Kafka, it retrieves the schema id and fetches the schema from the registry before deserializing the actual data.

The underline API will automatically register new schemas used for the data serialization and will fetch the corresponding schema when deserializing it. Newly registered schemas and fetched schemas are both cached locally to speed up the process for future records.

» The ConfluentAvro's bullet points:

- Supports the confluent wire format
- Integrates with the confluent schema registry
- Retries with exponential backoff if connection to registry failed
- Implements caching at the schema registry level
- The underline decoder/encoder is built once for the same schema and reused for all upcoming records 
- Can be integrated with different Kafka clients


### Built With

- [fastavro](https://fastavro.readthedocs.io/en/latest/) (check [fastavro benchmark](https://github.com/DhiaTN/avro-benchmarking-py3))
- [requests](https://requests.readthedocs.io)

### Installation

```shell script
» pip install confluent-avro
```

### Usage

> Check [examples](examples) for a fully working demo.

##### Consumer App Example:

```python
from kafka import KafkaConsumer

from confluent_avro import AvroKeyValueSerde, SchemaRegistry
from confluent_avro.schema_registry import HTTPBasicAuth

KAFKA_TOPIC = "confluent_avro-example-topic"

registry_client = SchemaRegistry(
    "https://myschemaregistry.com",
    HTTPBasicAuth("username", "password"),
    headers={"Content-Type": "application/vnd.schemaregistry.v1+json"},
)
avroSerde = AvroKeyValueSerde(registry_client, KAFKA_TOPIC)

consumer = KafkaConsumer(
    KAFKA_TOPIC,
    group_id="random_group_id",
    bootstrap_servers=["localhost:9092",]
)

for msg in consumer:
    v = avroSerde.value.deserialize(msg.value)
    k = avroSerde.key.deserialize(msg.key)
    print(msg.offset, msg.partition, k, v)
```

##### Producer App Example:

```python
from kafka import KafkaProducer

from confluent_avro import AvroKeyValueSerde, SchemaRegistry
from confluent_avro.schema_registry import HTTPBasicAuth

KAFKA_TOPIC = "confluent_avro-example-topic"

registry_client = SchemaRegistry(
    "https://myschemaregistry.com",
    HTTPBasicAuth("username", "password"),
    headers={"Content-Type": "application/vnd.schemaregistry.v1+json"},
)

avroSerde = AvroKeyValueSerde(registry_client, KAFKA_TOPIC)

producer = KafkaProducer(bootstrap_servers=["localhost:9092"])
producer.send(
    KAFKA_TOPIC,
    key=avroSerde.key.serialize({...}, key_schema),
    value=avroSerde.value.serialize({...}, value_schema),
)
```
            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/DhiaTN/confluent-avro-py",
    "name": "confluent_avro",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.6",
    "maintainer_email": "",
    "keywords": "avro,kafka,confluent,schema registry",
    "author": "Dhia Abbassi",
    "author_email": "dhia.absi@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/f9/d4/c125703c80b1afeeec8310e872cd21b4f743ee9bd642f349c843db479a89/confluent_avro-1.8.0.tar.gz",
    "platform": "",
    "description": "[![PyPI - Python Version](https://img.shields.io/pypi/pyversions/confluent_avro?label=Python)](https://pypi.org/project/confluent-avro/)\n[![Build Status](https://travis-ci.com/DhiaTN/confluent-avro-py.svg?branch=master)](https://travis-ci.com/DhiaTN/avrokafka-py)\n[![Maintainability](https://api.codeclimate.com/v1/badges/fd596527a28dcaea1a2d/maintainability)](https://codeclimate.com/github/DhiaTN/confluent-avro-py/maintainability)\n[![codecov](https://codecov.io/gh/DhiaTN/confluent-avro-py/branch/master/graph/badge.svg)](https://codecov.io/gh/DhiaTN/confluent-avro-py)\n[![PyPI version](https://badge.fury.io/py/confluent_avro.svg)](https://badge.fury.io/py/confluent_avro)\n[![PyPI - License](https://img.shields.io/pypi/l/confluent_avro?color=ff69b4&label=License)](https://opensource.org/licenses/Apache-2.0)\n\n<br />\n<p align=\"center\">\n  <h1 align=\"center\">ConfluentAvro</h1>\n\n  <p align=\"center\">\n    An Avro SerDe implementation that integrates with the <a href=\"https://www.confluent.io/confluent-schema-registry/\">confluent schema registry</a> and serializes and deserializes data according to the defined <a href=\"http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#wire-format\">confluent wire format</a>.\n    <br />\n    <br />\n    <a href=\"examples\">View Demo</a>\n    \u00b7\n    <a href=\"https://github.com/DhiaTN/confluent-avro-py/issues\">Report Bug</a>\n    \u00b7\n    <a href=\"https://github.com/DhiaTN/confluent-avro-py/issues\">Request Feature</a>\n  </p>\n</p>\n\n## Getting Started\n\n### Background\n\nTo solve [schema management](https://docs.confluent.io/current/schema-registry/index.html) issues and ensure compatibility in the development of Kafka-based applications, the confluent team introduced the schema registry to store and share the schema between the different apps and apply compatibility checks on each newly registered schema. To make the schema sharing easy, they extend the Avro binary format by prepending the schema id before the actual record instead of including the full schema.\n\n-\u00bb You can find more about Confluent and Schema Registry in [Confluent documentation](https://docs.confluent.io/current/schema-registry/index.html).\n\n### Implementation \n\nConfluentAvro implemented according to the above specification. Before publishing to Kafka topic, the library prepends the schema id to the generated Avro binary and when consuming from Kafka, it retrieves the schema id and fetches the schema from the registry before deserializing the actual data.\n\nThe underline API will automatically register new schemas used for the data serialization and will fetch the corresponding schema when deserializing it. Newly registered schemas and fetched schemas are both cached locally to speed up the process for future records.\n\n\u00bb The ConfluentAvro's bullet points:\n\n- Supports the confluent wire format\n- Integrates with the confluent schema registry\n- Retries with exponential backoff if connection to registry failed\n- Implements caching at the schema registry level\n- The underline decoder/encoder is built once for the same schema and reused for all upcoming records \n- Can be integrated with different Kafka clients\n\n\n### Built With\n\n- [fastavro](https://fastavro.readthedocs.io/en/latest/) (check [fastavro benchmark](https://github.com/DhiaTN/avro-benchmarking-py3))\n- [requests](https://requests.readthedocs.io)\n\n### Installation\n\n```shell script\n\u00bb pip install confluent-avro\n```\n\n### Usage\n\n> Check [examples](examples) for a fully working demo.\n\n##### Consumer App Example:\n\n```python\nfrom kafka import KafkaConsumer\n\nfrom confluent_avro import AvroKeyValueSerde, SchemaRegistry\nfrom confluent_avro.schema_registry import HTTPBasicAuth\n\nKAFKA_TOPIC = \"confluent_avro-example-topic\"\n\nregistry_client = SchemaRegistry(\n    \"https://myschemaregistry.com\",\n    HTTPBasicAuth(\"username\", \"password\"),\n    headers={\"Content-Type\": \"application/vnd.schemaregistry.v1+json\"},\n)\navroSerde = AvroKeyValueSerde(registry_client, KAFKA_TOPIC)\n\nconsumer = KafkaConsumer(\n    KAFKA_TOPIC,\n    group_id=\"random_group_id\",\n    bootstrap_servers=[\"localhost:9092\",]\n)\n\nfor msg in consumer:\n    v = avroSerde.value.deserialize(msg.value)\n    k = avroSerde.key.deserialize(msg.key)\n    print(msg.offset, msg.partition, k, v)\n```\n\n##### Producer App Example:\n\n```python\nfrom kafka import KafkaProducer\n\nfrom confluent_avro import AvroKeyValueSerde, SchemaRegistry\nfrom confluent_avro.schema_registry import HTTPBasicAuth\n\nKAFKA_TOPIC = \"confluent_avro-example-topic\"\n\nregistry_client = SchemaRegistry(\n    \"https://myschemaregistry.com\",\n    HTTPBasicAuth(\"username\", \"password\"),\n    headers={\"Content-Type\": \"application/vnd.schemaregistry.v1+json\"},\n)\n\navroSerde = AvroKeyValueSerde(registry_client, KAFKA_TOPIC)\n\nproducer = KafkaProducer(bootstrap_servers=[\"localhost:9092\"])\nproducer.send(\n    KAFKA_TOPIC,\n    key=avroSerde.key.serialize({...}, key_schema),\n    value=avroSerde.value.serialize({...}, value_schema),\n)\n```",
    "bugtrack_url": null,
    "license": "",
    "summary": "An Avro SerDe implementation that integrates with the confluent",
    "version": "1.8.0",
    "project_urls": {
        "Homepage": "https://github.com/DhiaTN/confluent-avro-py"
    },
    "split_keywords": [
        "avro",
        "kafka",
        "confluent",
        "schema registry"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "b9d2d6fb2595c6997f1e4c289de457f8f12582a60789e366c3b3c43d7ae97ee4",
                "md5": "4f836ef862783dd2c1a493a390a4898e",
                "sha256": "37d7eafb975219a655c41ee2b95cb2fab030ff11d5af6773a6b9166e84591bfd"
            },
            "downloads": -1,
            "filename": "confluent_avro-1.8.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "4f836ef862783dd2c1a493a390a4898e",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.6",
            "size": 14275,
            "upload_time": "2020-05-21T11:43:26",
            "upload_time_iso_8601": "2020-05-21T11:43:26.741976Z",
            "url": "https://files.pythonhosted.org/packages/b9/d2/d6fb2595c6997f1e4c289de457f8f12582a60789e366c3b3c43d7ae97ee4/confluent_avro-1.8.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "f9d4c125703c80b1afeeec8310e872cd21b4f743ee9bd642f349c843db479a89",
                "md5": "81b17f0b7b01599708ed00214599fcbd",
                "sha256": "2928da95009cebc9532c8e7af0c4b16add314e1e5542daeb297932bd2338ef86"
            },
            "downloads": -1,
            "filename": "confluent_avro-1.8.0.tar.gz",
            "has_sig": false,
            "md5_digest": "81b17f0b7b01599708ed00214599fcbd",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.6",
            "size": 18803,
            "upload_time": "2020-05-21T11:43:39",
            "upload_time_iso_8601": "2020-05-21T11:43:39.016083Z",
            "url": "https://files.pythonhosted.org/packages/f9/d4/c125703c80b1afeeec8310e872cd21b4f743ee9bd642f349c843db479a89/confluent_avro-1.8.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2020-05-21 11:43:39",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "DhiaTN",
    "github_project": "confluent-avro-py",
    "travis_ci": true,
    "coveralls": false,
    "github_actions": false,
    "lcname": "confluent_avro"
}
        
Elapsed time: 5.04976s