# Flask And Kafka
Easily write your kafka producers and consumers in flask.
This plugin was developed using confluent-kafka to help you use your producers and consumers alongside your Flask project as easily as possible. Also, kafka-and-consumer logs all messages in producer and consumer
## Installation
Install it with the following command:
```
pip install flask-and-kafka
```
## Usage
Using consumer:
```
from flask import Flask
from flask_and_kafka import FlaskKafkaConsumer
from flask_and_kafka import close_kafka
app = Flask(__name__)
app.config['KAFKA_CONSUMER_CONFIGS'] = {"bootstrap.servers": 'localhost:9092'}
app.config['KAFKA_CONSUMER_LOG_PATH'] = "logs/kafka_consumer.log"
kafka_consumer = FlaskKafkaConsumer(app)
@kafka_consumer.handle_message(topic='test-topic', group_id='group1')
def handle_logistic_message(msg):
print(msg.value())
if __name__ == '__main__':
kafka_consumer.start()
close_kafka(consumer=kafka_consumer)
app.run()
```
You can also write your own consumers in separate modules and use them using the register_consumers method
consumers/test_consumer.py
```
@kafka_consumer.handle_message(topic='test-topic', group_id='group1')
def handle_logistic_message(msg):
print(msg.value())
```
app.py
```
from flask import Flask
from flask_and_kafka import FlaskKafkaConsumer
from flask_and_kafka import close_kafka
app = Flask(__name__)
app.config['KAFKA_CONSUMER_CONFIGS'] = {"bootstrap.servers": 'localhost:9092'}
app.config['KAFKA_CONSUMER_LOG_PATH'] = "logs/kafka_consumer.log"
kafka_consumer = FlaskKafkaConsumer(app)
kafka_consumer.register_consumers(['consumers.test_consumer'])
if __name__ == '__main__':
kafka_consumer.start()
close_kafka(consumer=kafka_consumer)
app.run()
```
<hr style="border:2px solid gray">
Using producer:
```
from flask import Flask
from flask_and_kafka import FlaskKafkaProducer
from flask_and_kafka import close_kafka
app = Flask(__name__)
app.config['KAFKA_PRODUCER_CONFIGS'] = {"bootstrap.servers": 'localhost:9092'}
app.config['KAFKA_PRODUCER_LOG_PATH'] = "logs/kafka_producer.log"
kafka_producer = FlaskKafkaProducer(app)
kafka_producer.send_message(topic='test-topic', value="Hello, World!")
kafka_producer.send_message(topic='test-topic', value="Hello, World!")
kafka_producer.send_message(topic='test-topic', value="Hello, World!")
kafka_producer.send_message(topic='test-topic', value="Hello, World!")
if __name__ == '__main__':
close_kafka(producer=kafka_producer)
app.run()
```
**Note that you must use close_kafka to close consumer and producer.**
<hr style="border:2px solid gray">
#### FlaskKafkaConsumer - handle_message decorator:
A decorator that registers a message handler function for the given topic and group ID.
Args:
+ topic (str): The Kafka topic to subscribe to.
+ group_id (str): The Kafka consumer group ID to use.
+ num_consumers (int, optional): The number of Kafka consumer threads to spawn (default is 1).
+ app_context (bool, optional): Whether to run the message handler function inside a Flask application context (default is False).
+ **kwargs: Additional arguments to pass to the Kafka consumer constructor.
Returns: Callable: A decorator function that wraps the message handler function.
#### FlaskKafkaProducer - send_message method:
Send a message to the specified Kafka topic with the given key and value.
Args:
+ topic (str): The Kafka topic to send the message to.
+ value (any): The message value to send.
+ key (str, optional): The message key to use (default: None).
+ flush (bool, optional): Whether to flush the producer's message buffer immediately after sending the message (default: False).
+ poll (bool, optional): Whether to wait for any outstanding messages to be sent before returning (default: True).
+ poll_timeout (float, optional): The maximum amount of time to wait for outstanding messages to be sent, in seconds (default: 1).
+ **kwargs: Additional keyword arguments to pass to the underlying Kafka producer.
Returns: None
Raises: KafkaError: If there is an error producing the message.
Note:
+ If `flush` is True, any outstanding messages in the producer's buffer will be sent immediately after the current message is sent.
+ If `poll` is True, the producer will wait for any outstanding messages to be sent before returning, up to the specified `poll_timeout`.
+ The `poll` argument is only relevant if `flush` is False, since the producer always waits for outstanding messages to be sent before flushing.
Raw data
{
"_id": null,
"home_page": "https://github.com/heysaeid/flask-and-kafka.git",
"name": "flask-and-kafka",
"maintainer": "",
"docs_url": null,
"requires_python": "",
"maintainer_email": "",
"keywords": "flask,kafka,flask kafka,flask kafka consumer,flask-kafka,flask-kafka-consumer,flask-and-kafka,Flask-And-Kafka,flask kafka producer,kafka producer,flask-kafka-producer",
"author": "Saeid Noormohammadi",
"author_email": "heysaeid92@gmail.com",
"download_url": "https://files.pythonhosted.org/packages/1d/f3/500d68e2b6cf877683c5b51074e2a763b6a635d971565fea3490831f3825/flask-and-kafka-0.0.4.tar.gz",
"platform": "any",
"description": "# Flask And Kafka\nEasily write your kafka producers and consumers in flask.\n\nThis plugin was developed using confluent-kafka to help you use your producers and consumers alongside your Flask project as easily as possible. Also, kafka-and-consumer logs all messages in producer and consumer\n\n\n## Installation\nInstall it with the following command:\n```\npip install flask-and-kafka\n```\n\n## Usage\nUsing consumer:\n```\nfrom flask import Flask\nfrom flask_and_kafka import FlaskKafkaConsumer\nfrom flask_and_kafka import close_kafka\n\n\napp = Flask(__name__)\napp.config['KAFKA_CONSUMER_CONFIGS'] = {\"bootstrap.servers\": 'localhost:9092'}\napp.config['KAFKA_CONSUMER_LOG_PATH'] = \"logs/kafka_consumer.log\"\n\nkafka_consumer = FlaskKafkaConsumer(app)\n\n@kafka_consumer.handle_message(topic='test-topic', group_id='group1')\ndef handle_logistic_message(msg):\n print(msg.value())\n\nif __name__ == '__main__':\n kafka_consumer.start()\n close_kafka(consumer=kafka_consumer)\n app.run()\n```\nYou can also write your own consumers in separate modules and use them using the register_consumers method\n\nconsumers/test_consumer.py\n\u200d\u200d\u200d\u200d\n```\n@kafka_consumer.handle_message(topic='test-topic', group_id='group1')\ndef handle_logistic_message(msg):\n print(msg.value())\n```\napp.py\n```\nfrom flask import Flask\nfrom flask_and_kafka import FlaskKafkaConsumer\nfrom flask_and_kafka import close_kafka\n\n\napp = Flask(__name__)\napp.config['KAFKA_CONSUMER_CONFIGS'] = {\"bootstrap.servers\": 'localhost:9092'}\napp.config['KAFKA_CONSUMER_LOG_PATH'] = \"logs/kafka_consumer.log\"\n\nkafka_consumer = FlaskKafkaConsumer(app)\nkafka_consumer.register_consumers(['consumers.test_consumer'])\n\nif __name__ == '__main__':\n kafka_consumer.start()\n close_kafka(consumer=kafka_consumer)\n app.run()\n```\n<hr style=\"border:2px solid gray\">\nUsing producer:\n\n```\nfrom flask import Flask\nfrom flask_and_kafka import FlaskKafkaProducer\nfrom flask_and_kafka import close_kafka\n\n\napp = Flask(__name__)\napp.config['KAFKA_PRODUCER_CONFIGS'] = {\"bootstrap.servers\": 'localhost:9092'}\napp.config['KAFKA_PRODUCER_LOG_PATH'] = \"logs/kafka_producer.log\"\nkafka_producer = FlaskKafkaProducer(app)\n\nkafka_producer.send_message(topic='test-topic', value=\"Hello, World!\")\nkafka_producer.send_message(topic='test-topic', value=\"Hello, World!\")\nkafka_producer.send_message(topic='test-topic', value=\"Hello, World!\")\nkafka_producer.send_message(topic='test-topic', value=\"Hello, World!\")\n\nif __name__ == '__main__':\n close_kafka(producer=kafka_producer)\n app.run()\n```\n\n**\u200cNote that you must use close_kafka to close consumer and producer.**\n<hr style=\"border:2px solid gray\">\n\n#### FlaskKafkaConsumer - handle_message decorator:\n\nA decorator that registers a message handler function for the given topic and group ID.\n\nArgs:\n+ topic (str): The Kafka topic to subscribe to.\n+ group_id (str): The Kafka consumer group ID to use.\n+ num_consumers (int, optional): The number of Kafka consumer threads to spawn (default is 1).\n+ app_context (bool, optional): Whether to run the message handler function inside a Flask application context (default is False).\n+ **kwargs: Additional arguments to pass to the Kafka consumer constructor.\n\nReturns: Callable: A decorator function that wraps the message handler function.\n\n\n#### FlaskKafkaProducer - send_message method:\n\nSend a message to the specified Kafka topic with the given key and value.\n\nArgs:\n+ topic (str): The Kafka topic to send the message to.\n+ value (any): The message value to send.\n+ key (str, optional): The message key to use (default: None).\n+ flush (bool, optional): Whether to flush the producer's message buffer immediately after sending the message (default: False).\n+ poll (bool, optional): Whether to wait for any outstanding messages to be sent before returning (default: True).\n+ poll_timeout (float, optional): The maximum amount of time to wait for outstanding messages to be sent, in seconds (default: 1).\n+ **kwargs: Additional keyword arguments to pass to the underlying Kafka producer.\n\nReturns: None\n\nRaises: KafkaError: If there is an error producing the message.\n\nNote:\n\n+ If `flush` is True, any outstanding messages in the producer's buffer will be sent immediately after the current message is sent.\n+ If `poll` is True, the producer will wait for any outstanding messages to be sent before returning, up to the specified `poll_timeout`.\n+ The `poll` argument is only relevant if `flush` is False, since the producer always waits for outstanding messages to be sent before flushing. \n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Easily write your kafka producers and consumers in flask",
"version": "0.0.4",
"project_urls": {
"Homepage": "https://github.com/heysaeid/flask-and-kafka.git"
},
"split_keywords": [
"flask",
"kafka",
"flask kafka",
"flask kafka consumer",
"flask-kafka",
"flask-kafka-consumer",
"flask-and-kafka",
"flask-and-kafka",
"flask kafka producer",
"kafka producer",
"flask-kafka-producer"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "34f0c936a5159b46b8b8958974bcbcd0f79cba054a7598c29f3e07118b125503",
"md5": "4b4b6349f83546b73591d7a7f429d3a6",
"sha256": "4d2c0d5f1c313b6a4d8fc176ee844e42fc952f508b2ef4757be1fa5ba962bd13"
},
"downloads": -1,
"filename": "flask_and_kafka-0.0.4-py3-none-any.whl",
"has_sig": false,
"md5_digest": "4b4b6349f83546b73591d7a7f429d3a6",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": null,
"size": 8148,
"upload_time": "2023-09-21T07:11:28",
"upload_time_iso_8601": "2023-09-21T07:11:28.389091Z",
"url": "https://files.pythonhosted.org/packages/34/f0/c936a5159b46b8b8958974bcbcd0f79cba054a7598c29f3e07118b125503/flask_and_kafka-0.0.4-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "1df3500d68e2b6cf877683c5b51074e2a763b6a635d971565fea3490831f3825",
"md5": "83446466833063015bc0d060223827cf",
"sha256": "95a4ead48eca6dce660e0d21b0dd1cc124948613946a3800c5a210dc981f9b95"
},
"downloads": -1,
"filename": "flask-and-kafka-0.0.4.tar.gz",
"has_sig": false,
"md5_digest": "83446466833063015bc0d060223827cf",
"packagetype": "sdist",
"python_version": "source",
"requires_python": null,
"size": 6645,
"upload_time": "2023-09-21T07:11:30",
"upload_time_iso_8601": "2023-09-21T07:11:30.705918Z",
"url": "https://files.pythonhosted.org/packages/1d/f3/500d68e2b6cf877683c5b51074e2a763b6a635d971565fea3490831f3825/flask-and-kafka-0.0.4.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2023-09-21 07:11:30",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "heysaeid",
"github_project": "flask-and-kafka",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"requirements": [],
"lcname": "flask-and-kafka"
}