wunderkafka


Namewunderkafka JSON
Version 0.16.0 PyPI version JSON
download
home_pagehttps://github.com/severstal-digital/wunderkafka
Summarylibrdkafka-powered client for Kafka for python with (hopefully) more handful API
upload_time2024-01-19 08:43:33
maintainer
docs_urlNone
authorKirill Tribunsky
requires_python>=3.8
licenseApache-2.0 License
keywords kafka cloudera confluent
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Wunderkafka

>The power of librdkafka for <s>humans</s> pythons

Wunderkafka provides a handful of facades for C-powered consumer/producer. It's built on top of the [confluent-kafka](https://pypi.org/project/confluent-kafka/)

For a quick view on what is going on, please check [Quickstart](https://wunderkafka.readthedocs.io/en/stable/pages/quickstart.html) and [Documentation](https://wunderkafka.readthedocs.io/en/stable/)

Installation process described [here](https://wunderkafka.readthedocs.io/en/stable/pages/install.html)

## Features

### #TypeSafe librdkafka config

Instead of passing just a `dict` to consumer/producer config, the [pydantic-powered](https://github.com/marcosschroh/dataclasses-avroschema) config is used. 
It is extracted directly from librdkafka's [CONFIGURATION.md](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md) with some rough parsing.

### Confluent & Cloudera Schema Registry support

Confluent is used as-is, but hortonworks/cloudera schema registry client and (de)serialization protocol are implemented as well (no "admin" methods support).

### Building Kit

Wunderkafka allows you to relatively simply build your own transport for message (de)serialization and eliminate boilerplates for typical cases.

#### Pre-defined config models
```python
import os
from functools import partial

from pydantic import field_validator, Field
from wunderkafka.time import now
from wunderkafka import SRConfig, ConsumerConfig, SecurityProtocol, AvroConsumer


# If you are a fan of 12 factors, you may want to config via env variables
class OverridenSRConfig(SRConfig):
    url: str = Field(alias='SCHEMA_REGISTRY_URL')

    @field_validator('sasl_username')
    @classmethod
    def from_env(cls, v) -> str:
        # And to use 'native' kerberos envs
        return '{0}@{1}'.format(os.environ.get('KRB5_USER'), os.environ.get('KRB5_REALM'))


# Or you want to override some defaults by default (pun intended)
class OverridenConfig(ConsumerConfig):
    # Consumer which do not commit messages automatically
    enable_auto_commit: bool = False
    # And knows nothing after restart due to new gid.
    group_id: str = 'wunderkafka-{0}'.format(now())
    # More 12 factors
    bootstrap_servers: str = Field(env='BOOTSTRAP_SERVER')
    security_protocol: SecurityProtocol = SecurityProtocol.sasl_ssl
    sasl_kerberos_kinit_cmd: str = ''
    sr: SRConfig = OverridenSRConfig()

    @field_validator('sasl_kerberos_kinit_cmd')
    @classmethod
    def format_keytab(cls, v) -> str:
        if not v:
            return 'kinit {0}@{1} -k -t {0}.keytab'.format(os.environ.get('KRB5_USER'), os.environ.get('KRB5_REALM'))
        # Still allowing to set it manually
        return str(v)


# After this, you can `partial` your own Producer/Consumer, something like...
MyConsumer = partial(AvroConsumer, config=OverridenConfig())
# OR
class MyConsumer(AvroConsumer):
    def __init__(self, config: ConsumerConfig = OverridenConfig()):
        super().__init__(config)
```
#### Building your own transport

```python
from typing import Optional

from pydantic import Field

from wunderkafka.config.generated import enums
from wunderkafka.consumers.bytes import BytesConsumer
from wunderkafka.schema_registry import ClouderaSRClient
from wunderkafka.hotfixes.watchdog import check_watchdog
from wunderkafka.serdes.headers import ConfluentClouderaHeadersHandler
from wunderkafka.consumers.constructor import HighLevelDeserializingConsumer
from wunderkafka.schema_registry.cache import SimpleCache
from wunderkafka.schema_registry.transport import KerberizableHTTPClient
from wunderkafka.serdes.avro.deserializers import FastAvroDeserializer
from wunderkafka import SRConfig, ConsumerConfig, SecurityProtocol


class SRConfig(SRConfig):
    url: str = Field(alias="SCHEMA_REGISTRY_URL")
    security_protocol: SecurityProtocol = SecurityProtocol.sasl_ssl
    sasl_mechanism: str = "SCRAM-SHA-512"
    sasl_username: str = Field(alias="SASL_USERNAME")
    sasl_password: str = Field(alias="SASL_PASSWORD")


class OverridenConsumerConfig(ConsumerConfig):
    enable_auto_commit: bool = False
    auto_offset_reset: enums.AutoOffsetReset = enums.AutoOffsetReset.earliest
    bootstrap_servers: str = Field(env="BOOTSTRAP_SERVERS")
    security_protocol: SecurityProtocol = SecurityProtocol.sasl_ssl
    sasl_mechanism: str = "SCRAM-SHA-512"
    sasl_username: str = Field(alias="SASL_USERNAME")
    sasl_password: str = Field(alias="SASL_PASSWORD")
    sr: SRConfig = Field(default_factory=SRConfig)


# Pydantic/FastAPI style, but you can inherit from `HighLevelDeserializingConsumer` directly
def MyAvroConsumer(
    config: Optional[ConsumerConfig] = None,
) -> HighLevelDeserializingConsumer:
    config = config or OverridenConsumerConfig()
    config, watchdog = check_watchdog(config)
    return HighLevelDeserializingConsumer(
        consumer=BytesConsumer(config, watchdog),
        schema_registry=ClouderaSRClient(KerberizableHTTPClient(config.sr), SimpleCache()),
        headers_handler=ConfluentClouderaHeadersHandler().parse,
        deserializer=FastAvroDeserializer(),
    )
```

### Avro on-the-fly schema derivation

Supports `dataclasses` and `pydantic.BaseModel` for avro serialization powered by [dataclasses-avroschema](https://github.com/marcosschroh/dataclasses-avroschema) and some rough "metaprogramming":
```python
# dataclass to AVRO schema example
from dataclasses import dataclass
from dataclasses_avroschema import AvroModel

@dataclass
class SomeData(AvroModel):
    field1: int
    field2: str
```
for a topic `topic_name` will become
```json
{
      "type": "record",
      "name": "topic_name_value",
      "fields": [
          {
              "name": "field1",
              "type": "long"
          },
          {
              "name": "field2",
              "type": "string"
          }
      ]
  }
```
and
```python
# pydantic.BaseModel to AVRO schema example

from typing import Optional
from pydantic import BaseModel 

class Event(BaseModel):
  id: Optional[int]
  ts: Optional[int] = None

  class Meta:
      namespace = "any.data"
```
for a topic `topic_name` will become
```json
{
      "type": "record",
      "name": "topic_name_value",
      "namespace": "any.data",
      "fields": [
          {
              "type": ["long", "null"],
              "name": "id"
          },
          {
              "type": ["null", "long"],
              "name": "ts",
              "default": null
          }
      ]
  }
```

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/severstal-digital/wunderkafka",
    "name": "wunderkafka",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.8",
    "maintainer_email": "",
    "keywords": "kafka,cloudera,confluent",
    "author": "Kirill Tribunsky",
    "author_email": "tribunsky.kir@yandex.ru",
    "download_url": "https://files.pythonhosted.org/packages/38/f1/56da954ec3a6bd18cb54fa2e7b08cd5eab00da06c318f9d0066496acd1a2/wunderkafka-0.16.0.tar.gz",
    "platform": null,
    "description": "# Wunderkafka\n\n>The power of librdkafka for <s>humans</s> pythons\n\nWunderkafka provides a handful of facades for C-powered consumer/producer. It's built on top of the [confluent-kafka](https://pypi.org/project/confluent-kafka/)\n\nFor a quick view on what is going on, please check [Quickstart](https://wunderkafka.readthedocs.io/en/stable/pages/quickstart.html) and [Documentation](https://wunderkafka.readthedocs.io/en/stable/)\n\nInstallation process described [here](https://wunderkafka.readthedocs.io/en/stable/pages/install.html)\n\n## Features\n\n### #TypeSafe librdkafka config\n\nInstead of passing just a `dict` to consumer/producer config, the [pydantic-powered](https://github.com/marcosschroh/dataclasses-avroschema) config is used. \nIt is extracted directly from librdkafka's [CONFIGURATION.md](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md) with some rough parsing.\n\n### Confluent & Cloudera Schema Registry support\n\nConfluent is used as-is, but hortonworks/cloudera schema registry client and (de)serialization protocol are implemented as well (no \"admin\" methods support).\n\n### Building Kit\n\nWunderkafka allows you to relatively simply build your own transport for message (de)serialization and eliminate boilerplates for typical cases.\n\n#### Pre-defined config models\n```python\nimport os\nfrom functools import partial\n\nfrom pydantic import field_validator, Field\nfrom wunderkafka.time import now\nfrom wunderkafka import SRConfig, ConsumerConfig, SecurityProtocol, AvroConsumer\n\n\n# If you are a fan of 12 factors, you may want to config via env variables\nclass OverridenSRConfig(SRConfig):\n    url: str = Field(alias='SCHEMA_REGISTRY_URL')\n\n    @field_validator('sasl_username')\n    @classmethod\n    def from_env(cls, v) -> str:\n        # And to use 'native' kerberos envs\n        return '{0}@{1}'.format(os.environ.get('KRB5_USER'), os.environ.get('KRB5_REALM'))\n\n\n# Or you want to override some defaults by default (pun intended)\nclass OverridenConfig(ConsumerConfig):\n    # Consumer which do not commit messages automatically\n    enable_auto_commit: bool = False\n    # And knows nothing after restart due to new gid.\n    group_id: str = 'wunderkafka-{0}'.format(now())\n    # More 12 factors\n    bootstrap_servers: str = Field(env='BOOTSTRAP_SERVER')\n    security_protocol: SecurityProtocol = SecurityProtocol.sasl_ssl\n    sasl_kerberos_kinit_cmd: str = ''\n    sr: SRConfig = OverridenSRConfig()\n\n    @field_validator('sasl_kerberos_kinit_cmd')\n    @classmethod\n    def format_keytab(cls, v) -> str:\n        if not v:\n            return 'kinit {0}@{1} -k -t {0}.keytab'.format(os.environ.get('KRB5_USER'), os.environ.get('KRB5_REALM'))\n        # Still allowing to set it manually\n        return str(v)\n\n\n# After this, you can `partial` your own Producer/Consumer, something like...\nMyConsumer = partial(AvroConsumer, config=OverridenConfig())\n# OR\nclass MyConsumer(AvroConsumer):\n    def __init__(self, config: ConsumerConfig = OverridenConfig()):\n        super().__init__(config)\n```\n#### Building your own transport\n\n```python\nfrom typing import Optional\n\nfrom pydantic import Field\n\nfrom wunderkafka.config.generated import enums\nfrom wunderkafka.consumers.bytes import BytesConsumer\nfrom wunderkafka.schema_registry import ClouderaSRClient\nfrom wunderkafka.hotfixes.watchdog import check_watchdog\nfrom wunderkafka.serdes.headers import ConfluentClouderaHeadersHandler\nfrom wunderkafka.consumers.constructor import HighLevelDeserializingConsumer\nfrom wunderkafka.schema_registry.cache import SimpleCache\nfrom wunderkafka.schema_registry.transport import KerberizableHTTPClient\nfrom wunderkafka.serdes.avro.deserializers import FastAvroDeserializer\nfrom wunderkafka import SRConfig, ConsumerConfig, SecurityProtocol\n\n\nclass SRConfig(SRConfig):\n    url: str = Field(alias=\"SCHEMA_REGISTRY_URL\")\n    security_protocol: SecurityProtocol = SecurityProtocol.sasl_ssl\n    sasl_mechanism: str = \"SCRAM-SHA-512\"\n    sasl_username: str = Field(alias=\"SASL_USERNAME\")\n    sasl_password: str = Field(alias=\"SASL_PASSWORD\")\n\n\nclass OverridenConsumerConfig(ConsumerConfig):\n    enable_auto_commit: bool = False\n    auto_offset_reset: enums.AutoOffsetReset = enums.AutoOffsetReset.earliest\n    bootstrap_servers: str = Field(env=\"BOOTSTRAP_SERVERS\")\n    security_protocol: SecurityProtocol = SecurityProtocol.sasl_ssl\n    sasl_mechanism: str = \"SCRAM-SHA-512\"\n    sasl_username: str = Field(alias=\"SASL_USERNAME\")\n    sasl_password: str = Field(alias=\"SASL_PASSWORD\")\n    sr: SRConfig = Field(default_factory=SRConfig)\n\n\n# Pydantic/FastAPI style, but you can inherit from `HighLevelDeserializingConsumer` directly\ndef MyAvroConsumer(\n    config: Optional[ConsumerConfig] = None,\n) -> HighLevelDeserializingConsumer:\n    config = config or OverridenConsumerConfig()\n    config, watchdog = check_watchdog(config)\n    return HighLevelDeserializingConsumer(\n        consumer=BytesConsumer(config, watchdog),\n        schema_registry=ClouderaSRClient(KerberizableHTTPClient(config.sr), SimpleCache()),\n        headers_handler=ConfluentClouderaHeadersHandler().parse,\n        deserializer=FastAvroDeserializer(),\n    )\n```\n\n### Avro on-the-fly schema derivation\n\nSupports `dataclasses` and `pydantic.BaseModel` for avro serialization powered by [dataclasses-avroschema](https://github.com/marcosschroh/dataclasses-avroschema) and some rough \"metaprogramming\":\n```python\n# dataclass to AVRO schema example\nfrom dataclasses import dataclass\nfrom dataclasses_avroschema import AvroModel\n\n@dataclass\nclass SomeData(AvroModel):\n    field1: int\n    field2: str\n```\nfor a topic `topic_name` will become\n```json\n{\n      \"type\": \"record\",\n      \"name\": \"topic_name_value\",\n      \"fields\": [\n          {\n              \"name\": \"field1\",\n              \"type\": \"long\"\n          },\n          {\n              \"name\": \"field2\",\n              \"type\": \"string\"\n          }\n      ]\n  }\n```\nand\n```python\n# pydantic.BaseModel to AVRO schema example\n\nfrom typing import Optional\nfrom pydantic import BaseModel \n\nclass Event(BaseModel):\n  id: Optional[int]\n  ts: Optional[int] = None\n\n  class Meta:\n      namespace = \"any.data\"\n```\nfor a topic `topic_name` will become\n```json\n{\n      \"type\": \"record\",\n      \"name\": \"topic_name_value\",\n      \"namespace\": \"any.data\",\n      \"fields\": [\n          {\n              \"type\": [\"long\", \"null\"],\n              \"name\": \"id\"\n          },\n          {\n              \"type\": [\"null\", \"long\"],\n              \"name\": \"ts\",\n              \"default\": null\n          }\n      ]\n  }\n```\n",
    "bugtrack_url": null,
    "license": "Apache-2.0 License",
    "summary": "librdkafka-powered client for Kafka for python with (hopefully) more handful API",
    "version": "0.16.0",
    "project_urls": {
        "Bug Tracker": "https://github.com/severstal-digital/wunderkafka/issues",
        "Homepage": "https://github.com/severstal-digital/wunderkafka"
    },
    "split_keywords": [
        "kafka",
        "cloudera",
        "confluent"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "2cb5b0840269fa19fbb3e1a836b0f17324d7a41d36ba43e0a99e916fc4c7fe79",
                "md5": "c808d879112acb65aa2ad4f926dfb55d",
                "sha256": "897019d3cf03000bf840e68602e7f4de4e01eb7a2d1b0e1e942ca50387b02e69"
            },
            "downloads": -1,
            "filename": "wunderkafka-0.16.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "c808d879112acb65aa2ad4f926dfb55d",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.8",
            "size": 79169,
            "upload_time": "2024-01-19T08:43:32",
            "upload_time_iso_8601": "2024-01-19T08:43:32.205120Z",
            "url": "https://files.pythonhosted.org/packages/2c/b5/b0840269fa19fbb3e1a836b0f17324d7a41d36ba43e0a99e916fc4c7fe79/wunderkafka-0.16.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "38f156da954ec3a6bd18cb54fa2e7b08cd5eab00da06c318f9d0066496acd1a2",
                "md5": "211ef6410c0b116747669ecf7f1611da",
                "sha256": "ebb1c0f12f499427b61bacd355907bfec7482bc42c370615b23253f73ea19f7c"
            },
            "downloads": -1,
            "filename": "wunderkafka-0.16.0.tar.gz",
            "has_sig": false,
            "md5_digest": "211ef6410c0b116747669ecf7f1611da",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8",
            "size": 69113,
            "upload_time": "2024-01-19T08:43:33",
            "upload_time_iso_8601": "2024-01-19T08:43:33.420659Z",
            "url": "https://files.pythonhosted.org/packages/38/f1/56da954ec3a6bd18cb54fa2e7b08cd5eab00da06c318f9d0066496acd1a2/wunderkafka-0.16.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-01-19 08:43:33",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "severstal-digital",
    "github_project": "wunderkafka",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "wunderkafka"
}
        
Elapsed time: 0.18324s