# KafkaUtils
# What is it?
Qurix kafka utils is a Python package that provides a class for reading the unknown messages from topic.It also provides various functions to get important topic metrics like offset. Finally, it provides a tool to anonymize dataframes.
# Main Features
Key features of the package include:
- consuming the messages using confluent kafka platform
- can set specific offset options like Earliest, Latest, Last and Exlipicit to consume messages
- Logging: The package sets up logging using the Python `logging` module to facilitate monitoring and error handling during execution.
- Observer: the kafka observer allows to get important topic metrics such as offset
- Anonymization: Dataframes can be anonymized before, after reading
# Requirements
`confluent-kafka`
You can install these dependencies manually or use the provided requirements.txt file in repository.
# Installation
## Create a new virtual environment
`python -m virtualenv .venv --python="python3.11"`
## Activate
source .venv/bin/activate
## Install
To install the `qurix-kafka-utils` package, you can use `pip`:
`pip install qurix-kafka-utils`
# Usage
## Generic Consumer
Import the `GenericConsumer` class from the package:
as mentioned in the example
## Example to use Consumer
```
from qurix.kafka.utils.generic_consumer import GenericConsumer
from qurix.kafka.utils.offset_enum import Offset
conf = {
'bootstrap.servers': 'localhost:9092', # Kafka-Bootstrap-Server
'group.id': 'my_consumer_group', # Verbrauchergruppe
'auto.offset.reset': 'earliest' # Offset-Einstellung für neue Verbraucher
}
consumer = GenericConsumer(topic="my_topic", consumer_config=conf)
#To consume messages from topic
consumer.read_messages()
#To consume messages from specific offset (E.g "Earliest")
consumer.set_offset(partition=0 , offset_option=Offset.EARLIEST)
#To consume messages with explicit number by giving to variable offset_value
consumer.set_offset(partition=0 , offset_option=Offset.EXPLICIT , offset_value = 20)
consumer.read_messages()
#To consume messages with timestamp by giving to variable offset_value
consumer.set_offset(partition=0 , offset_option=Offset.TIMESTAMP , timestamp_dt=datetime_timestamp)
#To extend the df with header
consumer.extend_df_with_header(df= your_df_from_read_messages)
#To extract value
consumer.extract_json(df , 'column_name')
```
## Kafka Observer
Import the `KafkaObserver` class from the package
## Example to use Kafka Observer
```
from qurix.kafka.utils.kafka_observer import KafkaObserver
conf = {
'bootstrap.servers': 'localhost:9092', # Kafka-Bootstrap-Server
}
observer = KafkaObserver(conf)
#To get consumer groups
observer.get_consumer_groups()
#To get the offset of the consumer group
observer.get_consumer_group_offset(group_id="my_group_id")
#To get consumer groups with offsets
observer.get_consumer_groups_with_offsets()
#To get offset status
observer.get_offset_status(consumer = my_consumer, topic = "my_topic"), auto_offset = "earliest")
#To plot offset
observer.plot_offset_status(consumer = my_consumer)
```
## Anonymization
Import the `Anonymizer` class from the package.
`Anonymizer` takes a value argument which allows to specify the content of object columns. It is None by default. It taks a dictionary conisisting out of the index of the column and the value to replace the content with. Valid arguments are gender, name, address
## Example to use Anonymizer
```
import pandas as pd
from qurix.kafka.utils.anonymizer import Anonymizer
df = pd.read_csv("my_csv_file.csv")
#Without value argument
anonymizer = Anonymizer(df)
df_anonymized = anonymizer.anonymize_dataframe()
df_anonymized.head()
#With value argument
anonymizer = Anonymizer(df, {3: 'gender', 5: 'name'})
df_anonymized = anonymizer.anonymize_dataframe()
df_anonymized.head()
```
# Contact
For any inquiries or questions, feel free to reach out
Raw data
{
"_id": null,
"home_page": "https://github.com/qurixtechnology/qurix-kafka-utils.git",
"name": "qurix-kafka-utils",
"maintainer": "",
"docs_url": null,
"requires_python": ">=3.10, <4",
"maintainer_email": "",
"keywords": "python",
"author": "qurix Technology",
"author_email": "",
"download_url": "https://files.pythonhosted.org/packages/3f/bc/b3480f5a5d25eac4cb4b5bb207e5af94cc6c716b7a5ee907510522183c19/qurix-kafka-utils-1.6.0.tar.gz",
"platform": null,
"description": "# KafkaUtils\n\n# What is it?\n\nQurix kafka utils is a Python package that provides a class for reading the unknown messages from topic.It also provides various functions to get important topic metrics like offset. Finally, it provides a tool to anonymize dataframes.\n\n# Main Features\n\nKey features of the package include:\n\n- consuming the messages using confluent kafka platform\n- can set specific offset options like Earliest, Latest, Last and Exlipicit to consume messages\n- Logging: The package sets up logging using the Python `logging` module to facilitate monitoring and error handling during execution.\n- Observer: the kafka observer allows to get important topic metrics such as offset\n- Anonymization: Dataframes can be anonymized before, after reading\n\n# Requirements\n\n`confluent-kafka`\n\nYou can install these dependencies manually or use the provided requirements.txt file in repository.\n\n\n# Installation\n\n## Create a new virtual environment\n`python -m virtualenv .venv --python=\"python3.11\"`\n\n## Activate\nsource .venv/bin/activate\n\n## Install\nTo install the `qurix-kafka-utils` package, you can use `pip`:\n\n`pip install qurix-kafka-utils`\n\n# Usage\n\n## Generic Consumer\n\nImport the `GenericConsumer` class from the package:\nas mentioned in the example \n\n## Example to use Consumer\n\n```\nfrom qurix.kafka.utils.generic_consumer import GenericConsumer\nfrom qurix.kafka.utils.offset_enum import Offset\nconf = {\n 'bootstrap.servers': 'localhost:9092', # Kafka-Bootstrap-Server\n 'group.id': 'my_consumer_group', # Verbrauchergruppe\n 'auto.offset.reset': 'earliest' # Offset-Einstellung f\u00fcr neue Verbraucher\n}\n\n\nconsumer = GenericConsumer(topic=\"my_topic\", consumer_config=conf)\n\n#To consume messages from topic\nconsumer.read_messages()\n\n#To consume messages from specific offset (E.g \"Earliest\")\nconsumer.set_offset(partition=0 , offset_option=Offset.EARLIEST)\n\n#To consume messages with explicit number by giving to variable offset_value\nconsumer.set_offset(partition=0 , offset_option=Offset.EXPLICIT , offset_value = 20)\nconsumer.read_messages()\n#To consume messages with timestamp by giving to variable offset_value\nconsumer.set_offset(partition=0 , offset_option=Offset.TIMESTAMP , timestamp_dt=datetime_timestamp)\n#To extend the df with header \nconsumer.extend_df_with_header(df= your_df_from_read_messages)\n#To extract value \nconsumer.extract_json(df , 'column_name')\n```\n\n## Kafka Observer\n\nImport the `KafkaObserver` class from the package\n\n## Example to use Kafka Observer\n\n```\nfrom qurix.kafka.utils.kafka_observer import KafkaObserver\n\nconf = {\n 'bootstrap.servers': 'localhost:9092', # Kafka-Bootstrap-Server\n}\n\nobserver = KafkaObserver(conf)\n\n#To get consumer groups\nobserver.get_consumer_groups()\n#To get the offset of the consumer group\nobserver.get_consumer_group_offset(group_id=\"my_group_id\")\n#To get consumer groups with offsets\nobserver.get_consumer_groups_with_offsets()\n#To get offset status\nobserver.get_offset_status(consumer = my_consumer, topic = \"my_topic\"), auto_offset = \"earliest\")\n#To plot offset\nobserver.plot_offset_status(consumer = my_consumer)\n```\n\n## Anonymization\n\nImport the `Anonymizer` class from the package.\n\n`Anonymizer` takes a value argument which allows to specify the content of object columns. It is None by default. It taks a dictionary conisisting out of the index of the column and the value to replace the content with. Valid arguments are gender, name, address\n\n## Example to use Anonymizer\n\n```\nimport pandas as pd\nfrom qurix.kafka.utils.anonymizer import Anonymizer\n\ndf = pd.read_csv(\"my_csv_file.csv\")\n\n#Without value argument\nanonymizer = Anonymizer(df)\ndf_anonymized = anonymizer.anonymize_dataframe()\ndf_anonymized.head()\n#With value argument\nanonymizer = Anonymizer(df, {3: 'gender', 5: 'name'})\ndf_anonymized = anonymizer.anonymize_dataframe()\ndf_anonymized.head()\n```\n\n# Contact\nFor any inquiries or questions, feel free to reach out\n",
"bugtrack_url": null,
"license": "",
"summary": "qurix generic utils for kafka",
"version": "1.6.0",
"project_urls": {
"Homepage": "https://github.com/qurixtechnology/qurix-kafka-utils.git"
},
"split_keywords": [
"python"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "f2d41eff832f2d47c834740871c0e72cc583b8082b7e0cd317827b291872af52",
"md5": "c0c6bca7449bf6834cca14c210534895",
"sha256": "f50ebf5175286efa0be4cd924095e81fba631acb32db05f1640d0e7844a93b66"
},
"downloads": -1,
"filename": "qurix_kafka_utils-1.6.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "c0c6bca7449bf6834cca14c210534895",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.10, <4",
"size": 12119,
"upload_time": "2023-10-25T09:13:24",
"upload_time_iso_8601": "2023-10-25T09:13:24.823643Z",
"url": "https://files.pythonhosted.org/packages/f2/d4/1eff832f2d47c834740871c0e72cc583b8082b7e0cd317827b291872af52/qurix_kafka_utils-1.6.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "3fbcb3480f5a5d25eac4cb4b5bb207e5af94cc6c716b7a5ee907510522183c19",
"md5": "c7b4ba8bd694219ddf9143aedfde4652",
"sha256": "b80ed4b538d77aa0b56a2d101f62be8c1127887a4b23bafe59f4c254c13bc57f"
},
"downloads": -1,
"filename": "qurix-kafka-utils-1.6.0.tar.gz",
"has_sig": false,
"md5_digest": "c7b4ba8bd694219ddf9143aedfde4652",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.10, <4",
"size": 11210,
"upload_time": "2023-10-25T09:13:26",
"upload_time_iso_8601": "2023-10-25T09:13:26.488068Z",
"url": "https://files.pythonhosted.org/packages/3f/bc/b3480f5a5d25eac4cb4b5bb207e5af94cc6c716b7a5ee907510522183c19/qurix-kafka-utils-1.6.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2023-10-25 09:13:26",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "qurixtechnology",
"github_project": "qurix-kafka-utils",
"github_not_found": true,
"lcname": "qurix-kafka-utils"
}