# Generic Consumer
## What is it?
Qurix Kafka Generic Consumer is a Python package designed to simplify the interaction with Kafka for data engineers and data scientists. This package offers a versatile solution for effortlessly consuming messages from Confluent Kafka within Python applications. At its core is the GenericConsumer class, designed to streamline the process of fetching messages from a specified Kafka topic. This package provides essential features such as offset management, allowing users to set positions within the topic, and seamless integration with the popular pandas library, enabling easy structuring and manipulation of Kafka messages into DataFrames.
## Main Features
Key features of the package include:
1. Flexible Kafka Consumer:
The GenericConsumer class provides a flexible and reusable Kafka consumer for fetching messages from a subscribed topic.
2. Offset Management:
Supports setting offsets for different partitions based on options like earliest, latest, last, or a specific timestamp.
3. Message Reading:
Fetches messages from the subscribed topic, allowing users to specify the number of messages to read.
4. Extended DataFrame:
Provides functionality to extend the DataFrame with header information, making it easier to analyze and work with the data.
5. JSON Data Processing:
Includes a method for processing JSON data within the messages, allowing users to extract and normalize the JSON values into a DataFrame.
6. Clean Resource Management:
Implements proper resource management by closing the Kafka consumer when done, ensuring efficient resource utilization.
7. Configurability:
Users can configure the Kafka consumer by providing a ConsumerConfig object, allowing customization of consumer settings.
8. Logging Support:
Includes logging support at different levels (e.g., INFO, DEBUG) to facilitate debugging and monitoring.
9. PyPI Package Ready:
Designed for easy packaging and distribution on PyPI, making it accessible for others to install and use in their projects.
10. Compatibility:
Compatible with popular Python libraries such as pandas and confluent-kafka.
## Usage Scenarios:
1. Data Ingestion:
Suitable for scenarios where real-time or batch data needs to be ingested from Kafka topics into a DataFrame for analysis.
2. Streaming Data Processing:
Ideal for applications dealing with streaming data, enabling efficient processing and analysis of messages from Kafka.
3. Event-driven Applications:
Useful in event-driven architectures, where consuming messages from Kafka is a fundamental part of the application's workflow.
4. Data Exploration and Analysis:
Facilitates easy exploration and analysis of Kafka messages using the power of pandas DataFrames.
5. Customization:
Easily adaptable to specific project requirements, allowing users to extend or modify the class according to their needs.
## Requirements
- `confluent-kafka`
- `openpyxl`
You can install these dependencies manually or use the provided `requirement.txt` file in the repository.
## Installation
1. Create a New Virtual Environment (named `.venv` in this case):
```bash
python3 -m venv venv
```
2. Activate the Virtual Environment:
```bash
source venv/bin/activate
```
3. Install the Package:
To install the `qurix-kafka-generic-consumer` package, use `pip`:
```bash
pip install qurix-kafka-generic-consumer
```
## Example
### Generic Consumer
To use the GenericConsumer class, follow these steps:
```python
from qurix.kafka.consumer import GenericConsumer
from qurix.kafka.entities.config import ConsumerConfig, Offset
# Create a ConsumerConfig object with your Kafka settings
BOOTSTRAP_SERVERS = "localhost:9092"
consumer_config = ConsumerConfig(
bootstrap_servers=BOOTSTRAP_SERVERS,
group_id="my_consumer_group",
auto_offset_reset=Offset.EARLIEST.value,
# ...
)
# Initialize a GenericConsumer
consumer = GenericConsumer(topic="my_topic", consumer_config=consumer_config)
# Consume messages from the topic
consumer.read_messages()
# Set the offset option to consume messages from a specific point
consumer.set_offset(partition=0, offset_option=Offset.EARLIEST)
# Extend the DataFrame with header
consumer.extend_df_with_header(df=your_df_from_read_messages)
# Extract JSON values from the DataFrame
consumer.extract_json(df, 'column_name')
```
## Contact
For any inquiries or questions, feel free [reach out](https://qurix.tech/about_us.html).
Raw data
{
"_id": null,
"home_page": "https://github.com/qurixtechnology/qurix-kafka-generic-consumer.git",
"name": "qurix-kafka-generic-consumer",
"maintainer": "",
"docs_url": null,
"requires_python": ">=3.10, <4",
"maintainer_email": "",
"keywords": "python",
"author": "qurix Technology",
"author_email": "",
"download_url": "https://files.pythonhosted.org/packages/21/5c/2ebe463f212dad72121dd6da569eeffc196c4464ab7cac3654782b265bde/qurix-kafka-generic-consumer-0.2.0.tar.gz",
"platform": null,
"description": "# Generic Consumer\n\n## What is it?\n\nQurix Kafka Generic Consumer is a Python package designed to simplify the interaction with Kafka for data engineers and data scientists. This package offers a versatile solution for effortlessly consuming messages from Confluent Kafka within Python applications. At its core is the GenericConsumer class, designed to streamline the process of fetching messages from a specified Kafka topic. This package provides essential features such as offset management, allowing users to set positions within the topic, and seamless integration with the popular pandas library, enabling easy structuring and manipulation of Kafka messages into DataFrames.\n\n## Main Features\n\nKey features of the package include:\n\n1. Flexible Kafka Consumer:\nThe GenericConsumer class provides a flexible and reusable Kafka consumer for fetching messages from a subscribed topic.\n\n2. Offset Management:\nSupports setting offsets for different partitions based on options like earliest, latest, last, or a specific timestamp.\n\n3. Message Reading:\nFetches messages from the subscribed topic, allowing users to specify the number of messages to read.\n\n4. Extended DataFrame:\nProvides functionality to extend the DataFrame with header information, making it easier to analyze and work with the data.\n\n5. JSON Data Processing:\nIncludes a method for processing JSON data within the messages, allowing users to extract and normalize the JSON values into a DataFrame.\n\n6. Clean Resource Management:\nImplements proper resource management by closing the Kafka consumer when done, ensuring efficient resource utilization.\n\n7. Configurability:\nUsers can configure the Kafka consumer by providing a ConsumerConfig object, allowing customization of consumer settings.\n\n8. Logging Support:\nIncludes logging support at different levels (e.g., INFO, DEBUG) to facilitate debugging and monitoring.\n\n9. PyPI Package Ready:\nDesigned for easy packaging and distribution on PyPI, making it accessible for others to install and use in their projects.\n\n10. Compatibility:\nCompatible with popular Python libraries such as pandas and confluent-kafka.\n\n## Usage Scenarios:\n1. Data Ingestion:\nSuitable for scenarios where real-time or batch data needs to be ingested from Kafka topics into a DataFrame for analysis.\n\n2. Streaming Data Processing:\nIdeal for applications dealing with streaming data, enabling efficient processing and analysis of messages from Kafka.\n\n3. Event-driven Applications:\nUseful in event-driven architectures, where consuming messages from Kafka is a fundamental part of the application's workflow.\n\n4. Data Exploration and Analysis:\nFacilitates easy exploration and analysis of Kafka messages using the power of pandas DataFrames.\n\n5. Customization:\nEasily adaptable to specific project requirements, allowing users to extend or modify the class according to their needs.\n\n## Requirements\n\n- `confluent-kafka`\n- `openpyxl`\n\nYou can install these dependencies manually or use the provided `requirement.txt` file in the repository.\n\n## Installation\n\n1. Create a New Virtual Environment (named `.venv` in this case):\n\n```bash\npython3 -m venv venv\n```\n\n2. Activate the Virtual Environment:\n\n```bash\nsource venv/bin/activate\n```\n\n3. Install the Package:\n\nTo install the `qurix-kafka-generic-consumer` package, use `pip`:\n\n```bash\npip install qurix-kafka-generic-consumer\n```\n\n## Example\n\n### Generic Consumer\n\nTo use the GenericConsumer class, follow these steps:\n\n```python\nfrom qurix.kafka.consumer import GenericConsumer\nfrom qurix.kafka.entities.config import ConsumerConfig, Offset\n# Create a ConsumerConfig object with your Kafka settings\nBOOTSTRAP_SERVERS = \"localhost:9092\"\nconsumer_config = ConsumerConfig(\n bootstrap_servers=BOOTSTRAP_SERVERS,\n group_id=\"my_consumer_group\",\n auto_offset_reset=Offset.EARLIEST.value,\n # ...\n)\n\n# Initialize a GenericConsumer\nconsumer = GenericConsumer(topic=\"my_topic\", consumer_config=consumer_config)\n\n# Consume messages from the topic\nconsumer.read_messages()\n\n# Set the offset option to consume messages from a specific point\nconsumer.set_offset(partition=0, offset_option=Offset.EARLIEST)\n\n# Extend the DataFrame with header\nconsumer.extend_df_with_header(df=your_df_from_read_messages)\n\n# Extract JSON values from the DataFrame\nconsumer.extract_json(df, 'column_name')\n```\n\n## Contact\n\nFor any inquiries or questions, feel free [reach out](https://qurix.tech/about_us.html).\n",
"bugtrack_url": null,
"license": "",
"summary": "qurix generic consumer for kafka",
"version": "0.2.0",
"project_urls": {
"Homepage": "https://github.com/qurixtechnology/qurix-kafka-generic-consumer.git"
},
"split_keywords": [
"python"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "307c350ef9b018fa2d87dc247583a479ad26458afdef18f32f9effa495ffe09d",
"md5": "4b14eefa55c08097ea03edab8a14d0ec",
"sha256": "6e624b570ee3227e5b5a1c62bbc88c426016268f8fe288cc4e759fa827da906d"
},
"downloads": -1,
"filename": "qurix_kafka_generic_consumer-0.2.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "4b14eefa55c08097ea03edab8a14d0ec",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.10, <4",
"size": 11394,
"upload_time": "2023-11-22T11:24:47",
"upload_time_iso_8601": "2023-11-22T11:24:47.410488Z",
"url": "https://files.pythonhosted.org/packages/30/7c/350ef9b018fa2d87dc247583a479ad26458afdef18f32f9effa495ffe09d/qurix_kafka_generic_consumer-0.2.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "215c2ebe463f212dad72121dd6da569eeffc196c4464ab7cac3654782b265bde",
"md5": "135b7ed081480890bf91ed782c69969c",
"sha256": "25587ed12a7215ef839f07f0ca954a1068099d3681a2df29f7af030bf43cc9b0"
},
"downloads": -1,
"filename": "qurix-kafka-generic-consumer-0.2.0.tar.gz",
"has_sig": false,
"md5_digest": "135b7ed081480890bf91ed782c69969c",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.10, <4",
"size": 10316,
"upload_time": "2023-11-22T11:24:49",
"upload_time_iso_8601": "2023-11-22T11:24:49.545428Z",
"url": "https://files.pythonhosted.org/packages/21/5c/2ebe463f212dad72121dd6da569eeffc196c4464ab7cac3654782b265bde/qurix-kafka-generic-consumer-0.2.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2023-11-22 11:24:49",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "qurixtechnology",
"github_project": "qurix-kafka-generic-consumer",
"github_not_found": true,
"lcname": "qurix-kafka-generic-consumer"
}