A wrapper for kafka producer and consumer that can be used as decorator for a function which can keep consuming data, process this data and broadcast it to next topics/queues.
This uses [confluent-kafka](https://pypi.org/project/confluent-kafka/) python package to create prooducer, consumer and then wraps it. So, big thanks to them!
## Installation
```
$ pip install kafka-client-decorator
```
## Usage
Define your function how you want to process the data and then decorate it.
```
from kafka_client_decorator.kafka_client import KafkaClient
@KafkaClient(bootstrap_servers, security_protocol, sasl_username, sasl_password).consumer_producer(consumer_from_topic='my-topic-1', group_id='pdf', produce_to_topic=['my-topic-2'])
def process_data(data = None):
# Call your driver modules here to process the data
result = Driver(data)
return result
```
> **_NOTE:_** If you want the your driver result to be pushed to next topic/queue, you can simply pass produce_to_topic as arg in decorator 'consumer_prodcuer' method.
To only produce to topic(s) -
```
from kafka_client_decorator.client_producer import ClientProducer
producer = ClientProducer(bootstrap_servers, security_protocol, sasl_username, sasl_password)
prodcuer.produce_to_broker(data, topics_list)
```
> **_NOTE:_** If your kafka broker does not uses SASL or SSL protocol, no need to pass 'sasl_username' and 'sasl_password'.
> **_NOTE:_** If you want to work with multiple partitions in kafka, you can use below method to produce (it provides custom unique key to be sent with the message)
```
prodcuer.produce_to_broker_with_key(data, topic_list)
```
Raw data
{
"_id": null,
"home_page": "https://www.quantrium.ai/",
"name": "kafka-client-decorator",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.6",
"maintainer_email": null,
"keywords": "confluent-kafka, Kafka-producer, Kafka-consumer",
"author": "Quantrium",
"author_email": "firoz.mohammad@quantrium.ai",
"download_url": "https://files.pythonhosted.org/packages/38/6c/41d95d8be84c8bf74a48eba08a21480edd712b254a0284f2d2c9021ae07c/kafka_client_decorator-1.8.tar.gz",
"platform": null,
"description": "A wrapper for kafka producer and consumer that can be used as decorator for a function which can keep consuming data, process this data and broadcast it to next topics/queues.\r\n\r\nThis uses [confluent-kafka](https://pypi.org/project/confluent-kafka/) python package to create prooducer, consumer and then wraps it. So, big thanks to them!\r\n\r\n## Installation\r\n```\r\n$ pip install kafka-client-decorator\r\n```\r\n\r\n## Usage\r\nDefine your function how you want to process the data and then decorate it.\r\n```\r\nfrom kafka_client_decorator.kafka_client import KafkaClient\r\n\r\n@KafkaClient(bootstrap_servers, security_protocol, sasl_username, sasl_password).consumer_producer(consumer_from_topic='my-topic-1', group_id='pdf', produce_to_topic=['my-topic-2'])\r\ndef process_data(data = None):\r\n # Call your driver modules here to process the data\r\n result = Driver(data)\r\n return result\r\n```\r\n\r\n> **_NOTE:_** If you want the your driver result to be pushed to next topic/queue, you can simply pass produce_to_topic as arg in decorator 'consumer_prodcuer' method.\r\n\r\nTo only produce to topic(s) -\r\n```\r\nfrom kafka_client_decorator.client_producer import ClientProducer\r\n\r\nproducer = ClientProducer(bootstrap_servers, security_protocol, sasl_username, sasl_password)\r\nprodcuer.produce_to_broker(data, topics_list)\r\n```\r\n> **_NOTE:_** If your kafka broker does not uses SASL or SSL protocol, no need to pass 'sasl_username' and 'sasl_password'.\r\n\r\n> **_NOTE:_** If you want to work with multiple partitions in kafka, you can use below method to produce (it provides custom unique key to be sent with the message)\r\n\r\n```\r\nprodcuer.produce_to_broker_with_key(data, topic_list)\r\n```\r\n",
"bugtrack_url": null,
"license": "Quantrium PVT LTD",
"summary": "A wrapper for confluent-kafka producer and consumer",
"version": "1.8",
"project_urls": {
"Homepage": "https://www.quantrium.ai/"
},
"split_keywords": [
"confluent-kafka",
" kafka-producer",
" kafka-consumer"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "386c41d95d8be84c8bf74a48eba08a21480edd712b254a0284f2d2c9021ae07c",
"md5": "25fab7d468062f20eebf5ac11fe91b6a",
"sha256": "ad6be76cf21f057cda12b222a28b66c665061446ef7c9330a223584c21fa9c0e"
},
"downloads": -1,
"filename": "kafka_client_decorator-1.8.tar.gz",
"has_sig": false,
"md5_digest": "25fab7d468062f20eebf5ac11fe91b6a",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.6",
"size": 5181,
"upload_time": "2024-06-13T04:09:54",
"upload_time_iso_8601": "2024-06-13T04:09:54.508415Z",
"url": "https://files.pythonhosted.org/packages/38/6c/41d95d8be84c8bf74a48eba08a21480edd712b254a0284f2d2c9021ae07c/kafka_client_decorator-1.8.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-06-13 04:09:54",
"github": false,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"lcname": "kafka-client-decorator"
}