            # Python Rest Client Schema Registry

[![Python package](](
[![GitHub license](](
[![Python Version](](

Python Rest Client to interact against [schema-registry]( confluent server to manage [Avro]( and [JSON]( schemas resources.

## Requirements

python 3.8+

## Installation

pip install python-schema-registry-client

If you want the `Faust` functionality:

pip install python-schema-registry-client[faust]

Note that this will automatically add a dependency on the [faust-streaming]( fork of faust. If you want to use the
old faust version, simply install it manually and then install `python-schema-registry-client` without the `faust` extra enabled, the functionality will
be the same.

## Client API, Serializer, Faust Integration and Schema Server description

**Documentation**: [](

## Avro Schema Usage

from schema_registry.client import SchemaRegistryClient, schema

client = SchemaRegistryClient(url="")

deployment_schema = {
    "type": "record",
    "namespace": "com.kubertenes",
    "name": "AvroDeployment",
    "fields": [
        {"name": "image", "type": "string"},
        {"name": "replicas", "type": "int"},
        {"name": "port", "type": "int"},

avro_schema = schema.AvroSchema(deployment_schema)

schema_id = client.register("test-deployment", avro_schema)

or async

from schema_registry.client import AsyncSchemaRegistryClient, schema

async_client = AsyncSchemaRegistryClient(url="")

deployment_schema = {
    "type": "record",
    "namespace": "com.kubertenes",
    "name": "AvroDeployment",
    "fields": [
        {"name": "image", "type": "string"},
        {"name": "replicas", "type": "int"},
        {"name": "port", "type": "int"},

avro_schema = schema.AvroSchema(deployment_schema)

schema_id = await async_client.register("test-deployment", avro_schema)

## JSON Schema Usage

from schema_registry.client import SchemaRegistryClient, schema

client = SchemaRegistryClient(url="")

deployment_schema = {
    "definitions" : {
        "JsonDeployment" : {
            "type" : "object",
            "required" : ["image", "replicas", "port"],
            "properties" : {
                "image" :       {"type" : "string"},
                "replicas" :    {"type" : "integer"},
                "port" :        {"type" : "integer"}
    "$ref" : "#/definitions/JsonDeployment"

json_schema = schema.JsonSchema(deployment_schema)

schema_id = client.register("test-deployment", json_schema)

or async

from schema_registry.client import AsyncSchemaRegistryClient, schema

async_client = AsyncSchemaRegistryClient(url="")

deployment_schema = {
    "definitions" : {
        "JsonDeployment" : {
            "type" : "object",
            "required" : ["image", "replicas", "port"],
            "properties" : {
                "image" :       {"type" : "string"},
                "replicas" :    {"type" : "integer"},
                "port" :        {"type" : "integer"}
    "$ref" : "#/definitions/JsonDeployment"

json_schema = schema.JsonSchema(deployment_schema)

schema_id = await async_client.register("test-deployment", json_schema)

## Usage with dataclasses-avroschema for avro schemas

You can generate the `avro schema` directely from a python class using [dataclasses-avroschema](
and use it in the API for `register schemas`, `check versions` and `test compatibility`:

import dataclasses

from dataclasses_avroschema import AvroModel, types

from schema_registry.client import SchemaRegistryClient

client = SchemaRegistryClient(url="")

class UserAdvance(AvroModel):
    name: str
    age: int
    pets: typing.List[str] = dataclasses.field(default_factory=lambda: ["dog", "cat"])
    accounts: typing.Dict[str, int] = dataclasses.field(default_factory=lambda: {"key": 1})
    has_car: bool = False
    favorite_colors: types.Enum = types.Enum(["BLUE", "YELLOW", "GREEN"], default="BLUE")
    country: str = "Argentina"
    address: str = None

# register the schema
schema_id = client.register(subject, UserAdvance.avro_schema())

# >>> 12

result = client.check_version(subject, UserAdvance.avro_schema())
# >>> SchemaVersion(subject='dataclasses-avroschema-subject-2', schema_id=12, schema=1, version={"type":"record" ...')

compatibility = client.test_compatibility(subject, UserAdvance.avro_schema())

# >>> True

### Usage with pydantic for json schemas
You can generate the json schema directely from a python class using pydantic and use it in the API for register schemas, check versions and test compatibility:

import typing

from enum import Enum

from pydantic import BaseModel

from schema_registry.client import SchemaRegistryClient

client = SchemaRegistryClient(url="")

class ColorEnum(str, Enum):

class UserAdvance(BaseModel):
    name: str
    age: int
    pets: typing.List[str] = ["dog", "cat"]
    accounts: typing.Dict[str, int] = {"key": 1}
    has_car: bool = False
    favorite_colors: ColorEnum = ColorEnum.BLUE
    country: str = "Argentina"
    address: str = None

# register the schema
schema_id = client.register(subject, UserAdvance.schema_json(), schema_type="JSON")

# >>> 12

result = client.check_version(subject, UserAdvance.schema_json(), schema_type="JSON")
# >>> SchemaVersion(subject='pydantic-jsonschema-subject', schema_id=12, schema=1, version=<schema_registry.client.schema.JsonSchema object at 0x7f40354550a0>)

compatibility = client.test_compatibility(subject, UserAdvance.schema_json(), schema_type="JSON")

# >>> True

## Serializers

You can use `AvroMessageSerializer` to encode/decode messages in `avro`

from schema_registry.client import SchemaRegistryClient, schema
from schema_registry.serializers import AvroMessageSerializer

client = SchemaRegistryClient("")
avro_message_serializer = AvroMessageSerializer(client)

avro_user_schema = schema.AvroSchema({
    "type": "record",
    "namespace": "com.example",
    "name": "AvroUsers",
    "fields": [
        {"name": "first_name", "type": "string"},
        {"name": "last_name", "type": "string"},
        {"name": "age", "type": "int"},


# We want to encode the user_record with avro_user_schema
user_record = {
    "first_name": "my_first_name",
    "last_name": "my_last_name",
    "age": 20,

# Encode the record
message_encoded = avro_message_serializer.encode_record_with_schema(
    "user", avro_user_schema, user_record)

# >>> b'\x00\x00\x00\x00\x01\x1amy_first_name\x18my_last_name('

or with `json schemas`

from schema_registry.client import SchemaRegistryClient, schema
from schema_registry.serializers import JsonMessageSerializer

client = SchemaRegistryClient("")
json_message_serializer = JsonMessageSerializer(client)

json_schema = schema.JsonSchema({
  "definitions" : {
    "record:python.test.basic.basic" : {
      "description" : "basic schema for tests",
      "type" : "object",
      "required" : [ "number", "name" ],
      "properties" : {
        "number" : {
          "oneOf" : [ {
            "type" : "integer"
          }, {
            "type" : "null"
          } ]
        "name" : {
          "oneOf" : [ {
            "type" : "string"
          } ]
  "$ref" : "#/definitions/record:python.test.basic.basic"

# Encode the record
basic_record = {
    "number": 10,
    "name": "a_name",

message_encoded = json_message_serializer.encode_record_with_schema(
    "basic", json_schema, basic_record)

# >>> b'\x00\x00\x00\x00\x02{"number": 10, "name": "a_name"}'

## When use this library

Usually, we have a situation like this:

![Confluent Architecture](docs/img/confluent_architecture.png)

So, our producers/consumers have to serialize/deserialize messages every time that they send/receive from Kafka topics. In this picture, we can imagine a `Faust` application receiving messages (encoded with an Avro schema) and we want to deserialize them, so we can ask the `schema server` to do that for us. In this scenario, the `MessageSerializer` is perfect.

Also, could be a use case that we would like to have an Application only to administrate `Avro Schemas` (register, update compatibilities, delete old schemas, etc.), so the `SchemaRegistryClient` is perfect.

## Development

[Poetry]( is needed to install the dependencies and develope locally

1. Install dependencies: `poetry install --all-extras`
2. Code linting: `./scripts/format`
3. Run tests: `./scripts/test`

For commit messages we use [commitizen]( in order to standardize a way of committing rules

*Note*: The tests are run against the `Schema Server` using `docker compose`, so you will need
`Docker` and `Docker Compose` installed.

In a terminal run `docker-compose up`. Then in a different terminal run the tests:


All additional args will be passed to pytest, for example:

./scripts/test ./tests/client/

### Tests usind the python shell

To perform tests using the python shell you can run the project using `docker-compose`.

1. Execute `docker-compose up`. Then, the `schema registry server` will run on ``, then you can interact against it using the `SchemaRegistryClient`:
1. Use the python interpreter (get a python shell typing `python` in your command line)
1. Play with the `schema server`

from schema_registry.client import SchemaRegistryClient, schema

client = SchemaRegistryClient(url="")

# do some operations with the client...
deployment_schema = {
    "type": "record",
    "namespace": "com.kubertenes",
    "name": "AvroDeployment",
    "fields": [
        {"name": "image", "type": "string"},
        {"name": "replicas", "type": "int"},
        {"name": "port", "type": "int"},

avro_schema = schema.AvroSchema(deployment_schema)
client.register("test-deployment", avro_schema)
# >>>> Out[5]: 1

Then, you can check the schema using your browser going to the url ``


