# Wunderkafka
>The power of librdkafka for <s>humans</s> pythons
Wunderkafka provides a handful of facades for C-powered consumer/producer. It's built on top of the [confluent-kafka](https://pypi.org/project/confluent-kafka/)
For a quick view on what is going on, please check [Quickstart](https://wunderkafka.readthedocs.io/en/stable/pages/quickstart.html) and [Documentation](https://wunderkafka.readthedocs.io/en/stable/)
Installation process described [here](https://wunderkafka.readthedocs.io/en/stable/pages/install.html)
## Features
### #TypeSafe librdkafka config
Instead of passing just a `dict` to consumer/producer config, the [pydantic-powered](https://github.com/marcosschroh/dataclasses-avroschema) config is used.
It is extracted directly from librdkafka's [CONFIGURATION.md](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md) with some rough parsing.
### Confluent & Cloudera Schema Registry support
Confluent is used as-is, but hortonworks/cloudera schema registry client and (de)serialization protocol are implemented as well (no "admin" methods support).
### Building Kit
Wunderkafka allows you to relatively simply build your own transport for message (de)serialization and eliminate boilerplates for typical cases.
#### Pre-defined config models
```python
import os
from functools import partial
from pydantic import field_validator, Field
from wunderkafka.time import now
from wunderkafka import SRConfig, ConsumerConfig, SecurityProtocol, AvroConsumer
# If you are a fan of 12 factors, you may want to config via env variables
class OverridenSRConfig(SRConfig):
url: str = Field(alias='SCHEMA_REGISTRY_URL')
@field_validator('sasl_username')
@classmethod
def from_env(cls, v) -> str:
# And to use 'native' kerberos envs
return '{0}@{1}'.format(os.environ.get('KRB5_USER'), os.environ.get('KRB5_REALM'))
# Or you want to override some defaults by default (pun intended)
class OverridenConfig(ConsumerConfig):
# Consumer which do not commit messages automatically
enable_auto_commit: bool = False
# And knows nothing after restart due to new gid.
group_id: str = 'wunderkafka-{0}'.format(now())
# More 12 factors
bootstrap_servers: str = Field(env='BOOTSTRAP_SERVER')
security_protocol: SecurityProtocol = SecurityProtocol.sasl_ssl
sasl_kerberos_kinit_cmd: str = ''
sr: SRConfig = OverridenSRConfig()
@field_validator('sasl_kerberos_kinit_cmd')
@classmethod
def format_keytab(cls, v) -> str:
if not v:
return 'kinit {0}@{1} -k -t {0}.keytab'.format(os.environ.get('KRB5_USER'), os.environ.get('KRB5_REALM'))
# Still allowing to set it manually
return str(v)
# After this, you can `partial` your own Producer/Consumer, something like...
MyConsumer = partial(AvroConsumer, config=OverridenConfig())
# OR
class MyConsumer(AvroConsumer):
def __init__(self, config: ConsumerConfig = OverridenConfig()):
super().__init__(config)
```
#### Building your own transport
```python
from typing import Optional
from pydantic import Field
from wunderkafka.config.generated import enums
from wunderkafka.consumers.bytes import BytesConsumer
from wunderkafka.schema_registry import ClouderaSRClient
from wunderkafka.hotfixes.watchdog import check_watchdog
from wunderkafka.serdes.headers import ConfluentClouderaHeadersHandler
from wunderkafka.consumers.constructor import HighLevelDeserializingConsumer
from wunderkafka.schema_registry.cache import SimpleCache
from wunderkafka.schema_registry.transport import KerberizableHTTPClient
from wunderkafka.serdes.avro.deserializers import FastAvroDeserializer
from wunderkafka import SRConfig, ConsumerConfig, SecurityProtocol
class SRConfig(SRConfig):
url: str = Field(alias="SCHEMA_REGISTRY_URL")
security_protocol: SecurityProtocol = SecurityProtocol.sasl_ssl
sasl_mechanism: str = "SCRAM-SHA-512"
sasl_username: str = Field(alias="SASL_USERNAME")
sasl_password: str = Field(alias="SASL_PASSWORD")
class OverridenConsumerConfig(ConsumerConfig):
enable_auto_commit: bool = False
auto_offset_reset: enums.AutoOffsetReset = enums.AutoOffsetReset.earliest
bootstrap_servers: str = Field(env="BOOTSTRAP_SERVERS")
security_protocol: SecurityProtocol = SecurityProtocol.sasl_ssl
sasl_mechanism: str = "SCRAM-SHA-512"
sasl_username: str = Field(alias="SASL_USERNAME")
sasl_password: str = Field(alias="SASL_PASSWORD")
sr: SRConfig = Field(default_factory=SRConfig)
# Pydantic/FastAPI style, but you can inherit from `HighLevelDeserializingConsumer` directly
def MyAvroConsumer(
config: Optional[ConsumerConfig] = None,
) -> HighLevelDeserializingConsumer:
config = config or OverridenConsumerConfig()
config, watchdog = check_watchdog(config)
return HighLevelDeserializingConsumer(
consumer=BytesConsumer(config, watchdog),
schema_registry=ClouderaSRClient(KerberizableHTTPClient(config.sr), SimpleCache()),
headers_handler=ConfluentClouderaHeadersHandler().parse,
deserializer=FastAvroDeserializer(),
)
```
### Avro on-the-fly schema derivation
Supports `dataclasses` and `pydantic.BaseModel` for avro serialization powered by [dataclasses-avroschema](https://github.com/marcosschroh/dataclasses-avroschema) and some rough "metaprogramming":
```python
# dataclass to AVRO schema example
from dataclasses import dataclass
from dataclasses_avroschema import AvroModel
@dataclass
class SomeData(AvroModel):
field1: int
field2: str
```
for a topic `topic_name` will become
```json
{
"type": "record",
"name": "topic_name_value",
"fields": [
{
"name": "field1",
"type": "long"
},
{
"name": "field2",
"type": "string"
}
]
}
```
and
```python
# pydantic.BaseModel to AVRO schema example
from typing import Optional
from pydantic import BaseModel
class Event(BaseModel):
id: Optional[int]
ts: Optional[int] = None
class Meta:
namespace = "any.data"
```
for a topic `topic_name` will become
```json
{
"type": "record",
"name": "topic_name_value",
"namespace": "any.data",
"fields": [
{
"type": ["long", "null"],
"name": "id"
},
{
"type": ["null", "long"],
"name": "ts",
"default": null
}
]
}
```
Raw data
{
"_id": null,
"home_page": "https://github.com/severstal-digital/wunderkafka",
"name": "wunderkafka",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.9",
"maintainer_email": null,
"keywords": "kafka, cloudera, confluent",
"author": "Kirill Tribunsky",
"author_email": "tribunsky.kir@yandex.ru",
"download_url": "https://files.pythonhosted.org/packages/32/77/6d538d52a1d7b9060b2a75a35b67c355777906a1fab929824fb74b12a6f0/wunderkafka-0.18.0.tar.gz",
"platform": null,
"description": "# Wunderkafka\n\n>The power of librdkafka for <s>humans</s> pythons\n\nWunderkafka provides a handful of facades for C-powered consumer/producer. It's built on top of the [confluent-kafka](https://pypi.org/project/confluent-kafka/)\n\nFor a quick view on what is going on, please check [Quickstart](https://wunderkafka.readthedocs.io/en/stable/pages/quickstart.html) and [Documentation](https://wunderkafka.readthedocs.io/en/stable/)\n\nInstallation process described [here](https://wunderkafka.readthedocs.io/en/stable/pages/install.html)\n\n## Features\n\n### #TypeSafe librdkafka config\n\nInstead of passing just a `dict` to consumer/producer config, the [pydantic-powered](https://github.com/marcosschroh/dataclasses-avroschema) config is used. \nIt is extracted directly from librdkafka's [CONFIGURATION.md](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md) with some rough parsing.\n\n### Confluent & Cloudera Schema Registry support\n\nConfluent is used as-is, but hortonworks/cloudera schema registry client and (de)serialization protocol are implemented as well (no \"admin\" methods support).\n\n### Building Kit\n\nWunderkafka allows you to relatively simply build your own transport for message (de)serialization and eliminate boilerplates for typical cases.\n\n#### Pre-defined config models\n```python\nimport os\nfrom functools import partial\n\nfrom pydantic import field_validator, Field\nfrom wunderkafka.time import now\nfrom wunderkafka import SRConfig, ConsumerConfig, SecurityProtocol, AvroConsumer\n\n\n# If you are a fan of 12 factors, you may want to config via env variables\nclass OverridenSRConfig(SRConfig):\n url: str = Field(alias='SCHEMA_REGISTRY_URL')\n\n @field_validator('sasl_username')\n @classmethod\n def from_env(cls, v) -> str:\n # And to use 'native' kerberos envs\n return '{0}@{1}'.format(os.environ.get('KRB5_USER'), os.environ.get('KRB5_REALM'))\n\n\n# Or you want to override some defaults by default (pun intended)\nclass OverridenConfig(ConsumerConfig):\n # Consumer which do not commit messages automatically\n enable_auto_commit: bool = False\n # And knows nothing after restart due to new gid.\n group_id: str = 'wunderkafka-{0}'.format(now())\n # More 12 factors\n bootstrap_servers: str = Field(env='BOOTSTRAP_SERVER')\n security_protocol: SecurityProtocol = SecurityProtocol.sasl_ssl\n sasl_kerberos_kinit_cmd: str = ''\n sr: SRConfig = OverridenSRConfig()\n\n @field_validator('sasl_kerberos_kinit_cmd')\n @classmethod\n def format_keytab(cls, v) -> str:\n if not v:\n return 'kinit {0}@{1} -k -t {0}.keytab'.format(os.environ.get('KRB5_USER'), os.environ.get('KRB5_REALM'))\n # Still allowing to set it manually\n return str(v)\n\n\n# After this, you can `partial` your own Producer/Consumer, something like...\nMyConsumer = partial(AvroConsumer, config=OverridenConfig())\n# OR\nclass MyConsumer(AvroConsumer):\n def __init__(self, config: ConsumerConfig = OverridenConfig()):\n super().__init__(config)\n```\n#### Building your own transport\n\n```python\nfrom typing import Optional\n\nfrom pydantic import Field\n\nfrom wunderkafka.config.generated import enums\nfrom wunderkafka.consumers.bytes import BytesConsumer\nfrom wunderkafka.schema_registry import ClouderaSRClient\nfrom wunderkafka.hotfixes.watchdog import check_watchdog\nfrom wunderkafka.serdes.headers import ConfluentClouderaHeadersHandler\nfrom wunderkafka.consumers.constructor import HighLevelDeserializingConsumer\nfrom wunderkafka.schema_registry.cache import SimpleCache\nfrom wunderkafka.schema_registry.transport import KerberizableHTTPClient\nfrom wunderkafka.serdes.avro.deserializers import FastAvroDeserializer\nfrom wunderkafka import SRConfig, ConsumerConfig, SecurityProtocol\n\n\nclass SRConfig(SRConfig):\n url: str = Field(alias=\"SCHEMA_REGISTRY_URL\")\n security_protocol: SecurityProtocol = SecurityProtocol.sasl_ssl\n sasl_mechanism: str = \"SCRAM-SHA-512\"\n sasl_username: str = Field(alias=\"SASL_USERNAME\")\n sasl_password: str = Field(alias=\"SASL_PASSWORD\")\n\n\nclass OverridenConsumerConfig(ConsumerConfig):\n enable_auto_commit: bool = False\n auto_offset_reset: enums.AutoOffsetReset = enums.AutoOffsetReset.earliest\n bootstrap_servers: str = Field(env=\"BOOTSTRAP_SERVERS\")\n security_protocol: SecurityProtocol = SecurityProtocol.sasl_ssl\n sasl_mechanism: str = \"SCRAM-SHA-512\"\n sasl_username: str = Field(alias=\"SASL_USERNAME\")\n sasl_password: str = Field(alias=\"SASL_PASSWORD\")\n sr: SRConfig = Field(default_factory=SRConfig)\n\n\n# Pydantic/FastAPI style, but you can inherit from `HighLevelDeserializingConsumer` directly\ndef MyAvroConsumer(\n config: Optional[ConsumerConfig] = None,\n) -> HighLevelDeserializingConsumer:\n config = config or OverridenConsumerConfig()\n config, watchdog = check_watchdog(config)\n return HighLevelDeserializingConsumer(\n consumer=BytesConsumer(config, watchdog),\n schema_registry=ClouderaSRClient(KerberizableHTTPClient(config.sr), SimpleCache()),\n headers_handler=ConfluentClouderaHeadersHandler().parse,\n deserializer=FastAvroDeserializer(),\n )\n```\n\n### Avro on-the-fly schema derivation\n\nSupports `dataclasses` and `pydantic.BaseModel` for avro serialization powered by [dataclasses-avroschema](https://github.com/marcosschroh/dataclasses-avroschema) and some rough \"metaprogramming\":\n```python\n# dataclass to AVRO schema example\nfrom dataclasses import dataclass\nfrom dataclasses_avroschema import AvroModel\n\n@dataclass\nclass SomeData(AvroModel):\n field1: int\n field2: str\n```\nfor a topic `topic_name` will become\n```json\n{\n \"type\": \"record\",\n \"name\": \"topic_name_value\",\n \"fields\": [\n {\n \"name\": \"field1\",\n \"type\": \"long\"\n },\n {\n \"name\": \"field2\",\n \"type\": \"string\"\n }\n ]\n }\n```\nand\n```python\n# pydantic.BaseModel to AVRO schema example\n\nfrom typing import Optional\nfrom pydantic import BaseModel \n\nclass Event(BaseModel):\n id: Optional[int]\n ts: Optional[int] = None\n\n class Meta:\n namespace = \"any.data\"\n```\nfor a topic `topic_name` will become\n```json\n{\n \"type\": \"record\",\n \"name\": \"topic_name_value\",\n \"namespace\": \"any.data\",\n \"fields\": [\n {\n \"type\": [\"long\", \"null\"],\n \"name\": \"id\"\n },\n {\n \"type\": [\"null\", \"long\"],\n \"name\": \"ts\",\n \"default\": null\n }\n ]\n }\n```\n",
"bugtrack_url": null,
"license": "Apache-2.0 License",
"summary": "librdkafka-powered client for Kafka for python with (hopefully) more handful API",
"version": "0.18.0",
"project_urls": {
"Bug Tracker": "https://github.com/severstal-digital/wunderkafka/issues",
"Homepage": "https://github.com/severstal-digital/wunderkafka"
},
"split_keywords": [
"kafka",
" cloudera",
" confluent"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "02480226acdcdc68eaa4086337ba4b86e0a3f94a851cffece88626d2ece87f96",
"md5": "fd8c60b0b13785340b6c584679c45d16",
"sha256": "e0bf89ea39b9d3753daa839c7a26ee4d6433f955f780b98850bd52d4ecb39f4b"
},
"downloads": -1,
"filename": "wunderkafka-0.18.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "fd8c60b0b13785340b6c584679c45d16",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.9",
"size": 104184,
"upload_time": "2024-10-29T08:06:35",
"upload_time_iso_8601": "2024-10-29T08:06:35.311377Z",
"url": "https://files.pythonhosted.org/packages/02/48/0226acdcdc68eaa4086337ba4b86e0a3f94a851cffece88626d2ece87f96/wunderkafka-0.18.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "32776d538d52a1d7b9060b2a75a35b67c355777906a1fab929824fb74b12a6f0",
"md5": "fee970778229af0649f4d115bf16f7f3",
"sha256": "06d1d0994b710fe2196c1c9ecf4e5615fd4e47bed1d9556e1a8ac5b6b72db787"
},
"downloads": -1,
"filename": "wunderkafka-0.18.0.tar.gz",
"has_sig": false,
"md5_digest": "fee970778229af0649f4d115bf16f7f3",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.9",
"size": 70970,
"upload_time": "2024-10-29T08:06:36",
"upload_time_iso_8601": "2024-10-29T08:06:36.592144Z",
"url": "https://files.pythonhosted.org/packages/32/77/6d538d52a1d7b9060b2a75a35b67c355777906a1fab929824fb74b12a6f0/wunderkafka-0.18.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-10-29 08:06:36",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "severstal-digital",
"github_project": "wunderkafka",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "wunderkafka"
}