dgkafka


Namedgkafka JSON
Version 1.0.0 PyPI version JSON
download
home_pagehttps://gitlab.com/gng-group/dgkafka.git
SummaryKafka clients
upload_time2025-07-15 03:03:07
maintainerNone
docs_urlNone
authorMalanris
requires_python>=3.10
licenseMIT License
keywords kafka client confluent avro fastapi logging
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # dgkafka

Python package for working with Apache Kafka supporting multiple data formats.

## Installation

```bash
pip install dgkafka
```

For Avro support (requires additional dependencies):

```bash
pip install dgkafka[avro]
```

For Json support (requires additional dependencies):

```bash
pip install dgkafka[json]
```

## Features

- Producers and consumers for different data formats:
  - Raw messages (bytes/strings)
  - JSON
  - Avro (with Schema Registry integration)
- Robust error handling
- Comprehensive operation logging
- Context manager support
- Flexible configuration

## Quick Start

### Basic Producer/Consumer

```python
from dgkafka import KafkaProducer, KafkaConsumer

# Producer
with KafkaProducer(bootstrap_servers='localhost:9092') as producer:
    producer.produce('test_topic', 'Hello, Kafka!')

# Consumer
with KafkaConsumer(bootstrap_servers='localhost:9092', group_id='test_group') as consumer:
    consumer.subscribe(['test_topic'])
    for msg in consumer.consume():
        print(msg.value())
```

### JSON Support

```python
from dgkafka import JsonKafkaProducer, JsonKafkaConsumer

# Producer
with JsonKafkaProducer(bootstrap_servers='localhost:9092') as producer:
    producer.produce('json_topic', {'key': 'value'})

# Consumer
with JsonKafkaConsumer(bootstrap_servers='localhost:9092', group_id='json_group') as consumer:
    consumer.subscribe(['json_topic'])
    for msg in consumer.consume():
        print(msg.value())  # Automatically deserialized JSON
```

### Avro Support

```python
from dgkafka import AvroKafkaProducer, AvroKafkaConsumer

# Producer
value_schema = {
    "type": "record",
    "name": "User",
    "fields": [
        {"name": "name", "type": "string"},
        {"name": "age", "type": "int"}
    ]
}

with AvroKafkaProducer(
    schema_registry_url='http://localhost:8081',
    bootstrap_servers='localhost:9092',
    default_value_schema=value_schema
) as producer:
    producer.produce('avro_topic', {'name': 'Alice', 'age': 30})

# Consumer
with AvroKafkaConsumer(
    schema_registry_url='http://localhost:8081',
    bootstrap_servers='localhost:9092',
    group_id='avro_group'
) as consumer:
    consumer.subscribe(['avro_topic'])
    for msg in consumer.consume():
        print(msg.value())  # Automatically deserialized Avro object
```

## Classes

### Base Classes

- `KafkaProducer` - base message producer
- `KafkaConsumer` - base message consumer

### Specialized Classes

- `JsonKafkaProducer` - JSON message producer (inherits from `KafkaProducer`)
- `JsonKafkaConsumer` - JSON message consumer (inherits from `KafkaConsumer`)
- `AvroKafkaProducer` - Avro message producer (inherits from `KafkaProducer`)
- `AvroKafkaConsumer` - Avro message consumer (inherits from `KafkaConsumer`)

## Configuration

All classes accept standard Kafka configuration parameters:

```python
config = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my_group',
    'auto.offset.reset': 'earliest'
}
```

Avro classes require additional parameter:
- `schema_registry_url` - Schema Registry URL

## Logging

All classes use `dglog.Logger` for logging. You can provide a custom logger:

```python
from dglog import Logger

logger = Logger()
producer = KafkaProducer(logger_=logger, ...)
```

## Best Practices

1. Always use context managers (`with`) for proper resource cleanup
2. Implement error handling and retry logic for production use
3. Pre-register Avro schemas in Schema Registry
4. Configure appropriate `acks` and `retries` parameters for producers
5. Monitor consumer lag and producer throughput

## Advanced Usage

### Custom Serialization

```python
# Custom Avro serializer
class CustomAvroProducer(AvroKafkaProducer):
    def _serialize_value(self, value):
        # Custom serialization logic
        return super()._serialize_value(value)
```

### Message Headers

```python
# Adding headers to messages
headers = {
    'correlation_id': '12345',
    'message_type': 'user_update'
}

producer.produce(
    topic='events',
    value=message_data,
    headers=headers
)
```

### Error Handling

```python
from confluent_kafka import KafkaException

try:
    with AvroKafkaProducer(...) as producer:
        producer.produce(...)
except KafkaException as e:
    print(f"Kafka error occurred: {e}")
```

## Performance Tips

1. Batch messages when possible (`batch.num.messages` config)
2. Adjust `linger.ms` for better batching
3. Use `compression.type` (lz4, snappy, or gzip)
4. Tune `fetch.max.bytes` and `max.partition.fetch.bytes` for consumers

## License

MIT

            

Raw data

            {
    "_id": null,
    "home_page": "https://gitlab.com/gng-group/dgkafka.git",
    "name": "dgkafka",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.10",
    "maintainer_email": null,
    "keywords": "kafka, client, confluent, avro, fastapi, logging",
    "author": "Malanris",
    "author_email": "Roman Rasputin <admin@roro.su>",
    "download_url": "https://files.pythonhosted.org/packages/0b/46/a2d59b4e27240ad10248c24ef08e82182761f8fd491a3a344c2b918b2072/dgkafka-1.0.0.tar.gz",
    "platform": null,
    "description": "# dgkafka\n\nPython package for working with Apache Kafka supporting multiple data formats.\n\n## Installation\n\n```bash\npip install dgkafka\n```\n\nFor Avro support (requires additional dependencies):\n\n```bash\npip install dgkafka[avro]\n```\n\nFor Json support (requires additional dependencies):\n\n```bash\npip install dgkafka[json]\n```\n\n## Features\n\n- Producers and consumers for different data formats:\n  - Raw messages (bytes/strings)\n  - JSON\n  - Avro (with Schema Registry integration)\n- Robust error handling\n- Comprehensive operation logging\n- Context manager support\n- Flexible configuration\n\n## Quick Start\n\n### Basic Producer/Consumer\n\n```python\nfrom dgkafka import KafkaProducer, KafkaConsumer\n\n# Producer\nwith KafkaProducer(bootstrap_servers='localhost:9092') as producer:\n    producer.produce('test_topic', 'Hello, Kafka!')\n\n# Consumer\nwith KafkaConsumer(bootstrap_servers='localhost:9092', group_id='test_group') as consumer:\n    consumer.subscribe(['test_topic'])\n    for msg in consumer.consume():\n        print(msg.value())\n```\n\n### JSON Support\n\n```python\nfrom dgkafka import JsonKafkaProducer, JsonKafkaConsumer\n\n# Producer\nwith JsonKafkaProducer(bootstrap_servers='localhost:9092') as producer:\n    producer.produce('json_topic', {'key': 'value'})\n\n# Consumer\nwith JsonKafkaConsumer(bootstrap_servers='localhost:9092', group_id='json_group') as consumer:\n    consumer.subscribe(['json_topic'])\n    for msg in consumer.consume():\n        print(msg.value())  # Automatically deserialized JSON\n```\n\n### Avro Support\n\n```python\nfrom dgkafka import AvroKafkaProducer, AvroKafkaConsumer\n\n# Producer\nvalue_schema = {\n    \"type\": \"record\",\n    \"name\": \"User\",\n    \"fields\": [\n        {\"name\": \"name\", \"type\": \"string\"},\n        {\"name\": \"age\", \"type\": \"int\"}\n    ]\n}\n\nwith AvroKafkaProducer(\n    schema_registry_url='http://localhost:8081',\n    bootstrap_servers='localhost:9092',\n    default_value_schema=value_schema\n) as producer:\n    producer.produce('avro_topic', {'name': 'Alice', 'age': 30})\n\n# Consumer\nwith AvroKafkaConsumer(\n    schema_registry_url='http://localhost:8081',\n    bootstrap_servers='localhost:9092',\n    group_id='avro_group'\n) as consumer:\n    consumer.subscribe(['avro_topic'])\n    for msg in consumer.consume():\n        print(msg.value())  # Automatically deserialized Avro object\n```\n\n## Classes\n\n### Base Classes\n\n- `KafkaProducer` - base message producer\n- `KafkaConsumer` - base message consumer\n\n### Specialized Classes\n\n- `JsonKafkaProducer` - JSON message producer (inherits from `KafkaProducer`)\n- `JsonKafkaConsumer` - JSON message consumer (inherits from `KafkaConsumer`)\n- `AvroKafkaProducer` - Avro message producer (inherits from `KafkaProducer`)\n- `AvroKafkaConsumer` - Avro message consumer (inherits from `KafkaConsumer`)\n\n## Configuration\n\nAll classes accept standard Kafka configuration parameters:\n\n```python\nconfig = {\n    'bootstrap.servers': 'localhost:9092',\n    'group.id': 'my_group',\n    'auto.offset.reset': 'earliest'\n}\n```\n\nAvro classes require additional parameter:\n- `schema_registry_url` - Schema Registry URL\n\n## Logging\n\nAll classes use `dglog.Logger` for logging. You can provide a custom logger:\n\n```python\nfrom dglog import Logger\n\nlogger = Logger()\nproducer = KafkaProducer(logger_=logger, ...)\n```\n\n## Best Practices\n\n1. Always use context managers (`with`) for proper resource cleanup\n2. Implement error handling and retry logic for production use\n3. Pre-register Avro schemas in Schema Registry\n4. Configure appropriate `acks` and `retries` parameters for producers\n5. Monitor consumer lag and producer throughput\n\n## Advanced Usage\n\n### Custom Serialization\n\n```python\n# Custom Avro serializer\nclass CustomAvroProducer(AvroKafkaProducer):\n    def _serialize_value(self, value):\n        # Custom serialization logic\n        return super()._serialize_value(value)\n```\n\n### Message Headers\n\n```python\n# Adding headers to messages\nheaders = {\n    'correlation_id': '12345',\n    'message_type': 'user_update'\n}\n\nproducer.produce(\n    topic='events',\n    value=message_data,\n    headers=headers\n)\n```\n\n### Error Handling\n\n```python\nfrom confluent_kafka import KafkaException\n\ntry:\n    with AvroKafkaProducer(...) as producer:\n        producer.produce(...)\nexcept KafkaException as e:\n    print(f\"Kafka error occurred: {e}\")\n```\n\n## Performance Tips\n\n1. Batch messages when possible (`batch.num.messages` config)\n2. Adjust `linger.ms` for better batching\n3. Use `compression.type` (lz4, snappy, or gzip)\n4. Tune `fetch.max.bytes` and `max.partition.fetch.bytes` for consumers\n\n## License\n\nMIT\n",
    "bugtrack_url": null,
    "license": "MIT License",
    "summary": "Kafka clients",
    "version": "1.0.0",
    "project_urls": {
        "BugTracker": "https://gitlab.com/gng-group/dgkafka/issues",
        "Homepage": "https://gitlab.com/gng-group/dgkafka"
    },
    "split_keywords": [
        "kafka",
        " client",
        " confluent",
        " avro",
        " fastapi",
        " logging"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "ba9be308ec6b159076e34d100a31b067c7f344074ee9bf16bf29525067105825",
                "md5": "d32ceb5dcf2f127b7de95ef1b9d4e77f",
                "sha256": "86a66609d849cbc8987c446ac45b94c71c45002f59c536f60ea423b389362935"
            },
            "downloads": -1,
            "filename": "dgkafka-1.0.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "d32ceb5dcf2f127b7de95ef1b9d4e77f",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.10",
            "size": 13269,
            "upload_time": "2025-07-15T03:03:06",
            "upload_time_iso_8601": "2025-07-15T03:03:06.441416Z",
            "url": "https://files.pythonhosted.org/packages/ba/9b/e308ec6b159076e34d100a31b067c7f344074ee9bf16bf29525067105825/dgkafka-1.0.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "0b46a2d59b4e27240ad10248c24ef08e82182761f8fd491a3a344c2b918b2072",
                "md5": "1e2824b037d6ca891f55ce673f4746ab",
                "sha256": "90cb17b4c253b430781539a4094e696a07939e693ace0d652aceee0bf5c7963b"
            },
            "downloads": -1,
            "filename": "dgkafka-1.0.0.tar.gz",
            "has_sig": false,
            "md5_digest": "1e2824b037d6ca891f55ce673f4746ab",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.10",
            "size": 12578,
            "upload_time": "2025-07-15T03:03:07",
            "upload_time_iso_8601": "2025-07-15T03:03:07.988364Z",
            "url": "https://files.pythonhosted.org/packages/0b/46/a2d59b4e27240ad10248c24ef08e82182761f8fd491a3a344c2b918b2072/dgkafka-1.0.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-07-15 03:03:07",
    "github": false,
    "gitlab": true,
    "bitbucket": false,
    "codeberg": false,
    "gitlab_user": "gng-group",
    "gitlab_project": "dgkafka",
    "lcname": "dgkafka"
}
        
Elapsed time: 1.81224s