qurix-kafka-utils


Namequrix-kafka-utils JSON
Version 1.6.0 PyPI version JSON
download
home_pagehttps://github.com/qurixtechnology/qurix-kafka-utils.git
Summaryqurix generic utils for kafka
upload_time2023-10-25 09:13:26
maintainer
docs_urlNone
authorqurix Technology
requires_python>=3.10, <4
license
keywords python
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # KafkaUtils

# What is it?

Qurix kafka utils is a Python package that provides a class for reading the unknown messages from topic.It also provides various functions to get important topic metrics like offset. Finally, it provides a tool to anonymize dataframes.

# Main Features

Key features of the package include:

- consuming the messages using confluent kafka platform
- can set specific offset options like Earliest, Latest, Last and Exlipicit to consume messages
- Logging: The package sets up logging using the Python `logging` module to facilitate monitoring and error handling during execution.
- Observer: the kafka observer allows to get important topic metrics such as offset
- Anonymization: Dataframes can be anonymized before, after reading

# Requirements

`confluent-kafka`

You can install these dependencies manually or use the provided requirements.txt file in repository.


# Installation

## Create a new virtual environment
`python -m virtualenv .venv --python="python3.11"`

## Activate
source .venv/bin/activate

## Install
To install the `qurix-kafka-utils` package, you can use `pip`:

`pip install qurix-kafka-utils`

# Usage

## Generic Consumer

Import the `GenericConsumer` class from the package:
as mentioned in the example 

## Example to use Consumer

```
from qurix.kafka.utils.generic_consumer import GenericConsumer
from qurix.kafka.utils.offset_enum import Offset
conf = {
    'bootstrap.servers': 'localhost:9092',  # Kafka-Bootstrap-Server
    'group.id': 'my_consumer_group',  # Verbrauchergruppe
    'auto.offset.reset': 'earliest'  # Offset-Einstellung für neue Verbraucher
}


consumer = GenericConsumer(topic="my_topic", consumer_config=conf)

#To consume messages from topic
consumer.read_messages()

#To consume messages from specific offset (E.g "Earliest")
consumer.set_offset(partition=0 , offset_option=Offset.EARLIEST)

#To consume messages with explicit number by giving to  variable offset_value
consumer.set_offset(partition=0 , offset_option=Offset.EXPLICIT , offset_value = 20)
consumer.read_messages()
#To consume messages with timestamp by giving to  variable offset_value
consumer.set_offset(partition=0 , offset_option=Offset.TIMESTAMP , timestamp_dt=datetime_timestamp)
#To extend the df with header 
consumer.extend_df_with_header(df= your_df_from_read_messages)
#To extract value 
consumer.extract_json(df , 'column_name')
```

## Kafka Observer

Import the `KafkaObserver` class from the package

## Example to use Kafka Observer

```
from qurix.kafka.utils.kafka_observer import KafkaObserver

conf = {
    'bootstrap.servers': 'localhost:9092',  # Kafka-Bootstrap-Server
}

observer = KafkaObserver(conf)

#To get consumer groups
observer.get_consumer_groups()
#To get the offset of the consumer group
observer.get_consumer_group_offset(group_id="my_group_id")
#To get consumer groups with offsets
observer.get_consumer_groups_with_offsets()
#To get offset status
observer.get_offset_status(consumer = my_consumer, topic = "my_topic"), auto_offset = "earliest")
#To plot offset
observer.plot_offset_status(consumer = my_consumer)
```

## Anonymization

Import the `Anonymizer` class from the package.

`Anonymizer` takes a value argument which allows to specify the content of object columns. It is None by default. It taks a dictionary conisisting out of the index of the column and the value to replace the content with. Valid arguments are gender, name, address

## Example to use Anonymizer

```
import pandas as pd
from qurix.kafka.utils.anonymizer import Anonymizer

df = pd.read_csv("my_csv_file.csv")

#Without value argument
anonymizer = Anonymizer(df)
df_anonymized = anonymizer.anonymize_dataframe()
df_anonymized.head()
#With value argument
anonymizer = Anonymizer(df, {3: 'gender', 5: 'name'})
df_anonymized = anonymizer.anonymize_dataframe()
df_anonymized.head()
```

# Contact
For any inquiries or questions, feel free to reach out

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/qurixtechnology/qurix-kafka-utils.git",
    "name": "qurix-kafka-utils",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.10, <4",
    "maintainer_email": "",
    "keywords": "python",
    "author": "qurix Technology",
    "author_email": "",
    "download_url": "https://files.pythonhosted.org/packages/3f/bc/b3480f5a5d25eac4cb4b5bb207e5af94cc6c716b7a5ee907510522183c19/qurix-kafka-utils-1.6.0.tar.gz",
    "platform": null,
    "description": "# KafkaUtils\n\n# What is it?\n\nQurix kafka utils is a Python package that provides a class for reading the unknown messages from topic.It also provides various functions to get important topic metrics like offset. Finally, it provides a tool to anonymize dataframes.\n\n# Main Features\n\nKey features of the package include:\n\n- consuming the messages using confluent kafka platform\n- can set specific offset options like Earliest, Latest, Last and Exlipicit to consume messages\n- Logging: The package sets up logging using the Python `logging` module to facilitate monitoring and error handling during execution.\n- Observer: the kafka observer allows to get important topic metrics such as offset\n- Anonymization: Dataframes can be anonymized before, after reading\n\n# Requirements\n\n`confluent-kafka`\n\nYou can install these dependencies manually or use the provided requirements.txt file in repository.\n\n\n# Installation\n\n## Create a new virtual environment\n`python -m virtualenv .venv --python=\"python3.11\"`\n\n## Activate\nsource .venv/bin/activate\n\n## Install\nTo install the `qurix-kafka-utils` package, you can use `pip`:\n\n`pip install qurix-kafka-utils`\n\n# Usage\n\n## Generic Consumer\n\nImport the `GenericConsumer` class from the package:\nas mentioned in the example \n\n## Example to use Consumer\n\n```\nfrom qurix.kafka.utils.generic_consumer import GenericConsumer\nfrom qurix.kafka.utils.offset_enum import Offset\nconf = {\n    'bootstrap.servers': 'localhost:9092',  # Kafka-Bootstrap-Server\n    'group.id': 'my_consumer_group',  # Verbrauchergruppe\n    'auto.offset.reset': 'earliest'  # Offset-Einstellung f\u00fcr neue Verbraucher\n}\n\n\nconsumer = GenericConsumer(topic=\"my_topic\", consumer_config=conf)\n\n#To consume messages from topic\nconsumer.read_messages()\n\n#To consume messages from specific offset (E.g \"Earliest\")\nconsumer.set_offset(partition=0 , offset_option=Offset.EARLIEST)\n\n#To consume messages with explicit number by giving to  variable offset_value\nconsumer.set_offset(partition=0 , offset_option=Offset.EXPLICIT , offset_value = 20)\nconsumer.read_messages()\n#To consume messages with timestamp by giving to  variable offset_value\nconsumer.set_offset(partition=0 , offset_option=Offset.TIMESTAMP , timestamp_dt=datetime_timestamp)\n#To extend the df with header \nconsumer.extend_df_with_header(df= your_df_from_read_messages)\n#To extract value \nconsumer.extract_json(df , 'column_name')\n```\n\n## Kafka Observer\n\nImport the `KafkaObserver` class from the package\n\n## Example to use Kafka Observer\n\n```\nfrom qurix.kafka.utils.kafka_observer import KafkaObserver\n\nconf = {\n    'bootstrap.servers': 'localhost:9092',  # Kafka-Bootstrap-Server\n}\n\nobserver = KafkaObserver(conf)\n\n#To get consumer groups\nobserver.get_consumer_groups()\n#To get the offset of the consumer group\nobserver.get_consumer_group_offset(group_id=\"my_group_id\")\n#To get consumer groups with offsets\nobserver.get_consumer_groups_with_offsets()\n#To get offset status\nobserver.get_offset_status(consumer = my_consumer, topic = \"my_topic\"), auto_offset = \"earliest\")\n#To plot offset\nobserver.plot_offset_status(consumer = my_consumer)\n```\n\n## Anonymization\n\nImport the `Anonymizer` class from the package.\n\n`Anonymizer` takes a value argument which allows to specify the content of object columns. It is None by default. It taks a dictionary conisisting out of the index of the column and the value to replace the content with. Valid arguments are gender, name, address\n\n## Example to use Anonymizer\n\n```\nimport pandas as pd\nfrom qurix.kafka.utils.anonymizer import Anonymizer\n\ndf = pd.read_csv(\"my_csv_file.csv\")\n\n#Without value argument\nanonymizer = Anonymizer(df)\ndf_anonymized = anonymizer.anonymize_dataframe()\ndf_anonymized.head()\n#With value argument\nanonymizer = Anonymizer(df, {3: 'gender', 5: 'name'})\ndf_anonymized = anonymizer.anonymize_dataframe()\ndf_anonymized.head()\n```\n\n# Contact\nFor any inquiries or questions, feel free to reach out\n",
    "bugtrack_url": null,
    "license": "",
    "summary": "qurix generic utils for kafka",
    "version": "1.6.0",
    "project_urls": {
        "Homepage": "https://github.com/qurixtechnology/qurix-kafka-utils.git"
    },
    "split_keywords": [
        "python"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "f2d41eff832f2d47c834740871c0e72cc583b8082b7e0cd317827b291872af52",
                "md5": "c0c6bca7449bf6834cca14c210534895",
                "sha256": "f50ebf5175286efa0be4cd924095e81fba631acb32db05f1640d0e7844a93b66"
            },
            "downloads": -1,
            "filename": "qurix_kafka_utils-1.6.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "c0c6bca7449bf6834cca14c210534895",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.10, <4",
            "size": 12119,
            "upload_time": "2023-10-25T09:13:24",
            "upload_time_iso_8601": "2023-10-25T09:13:24.823643Z",
            "url": "https://files.pythonhosted.org/packages/f2/d4/1eff832f2d47c834740871c0e72cc583b8082b7e0cd317827b291872af52/qurix_kafka_utils-1.6.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "3fbcb3480f5a5d25eac4cb4b5bb207e5af94cc6c716b7a5ee907510522183c19",
                "md5": "c7b4ba8bd694219ddf9143aedfde4652",
                "sha256": "b80ed4b538d77aa0b56a2d101f62be8c1127887a4b23bafe59f4c254c13bc57f"
            },
            "downloads": -1,
            "filename": "qurix-kafka-utils-1.6.0.tar.gz",
            "has_sig": false,
            "md5_digest": "c7b4ba8bd694219ddf9143aedfde4652",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.10, <4",
            "size": 11210,
            "upload_time": "2023-10-25T09:13:26",
            "upload_time_iso_8601": "2023-10-25T09:13:26.488068Z",
            "url": "https://files.pythonhosted.org/packages/3f/bc/b3480f5a5d25eac4cb4b5bb207e5af94cc6c716b7a5ee907510522183c19/qurix-kafka-utils-1.6.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2023-10-25 09:13:26",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "qurixtechnology",
    "github_project": "qurix-kafka-utils",
    "github_not_found": true,
    "lcname": "qurix-kafka-utils"
}
        
Elapsed time: 0.13354s