m-kafka-sdk-v2


Namem-kafka-sdk-v2 JSON
Version 1.0.1 PyPI version JSON
download
home_pagehttps://github.com/mobiovn
SummaryMobio Kafka SDK
upload_time2024-01-11 10:08:09
maintainer
docs_urlNone
authorMOBIO
requires_python>=3
licenseMIT
keywords mobio kafka m-kafka
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            - **Thư viện Consumer của JB. Chạy consumer ở Process, phù hợp cho môi trường K8s** :

- **NOTE: SDK sử dụng confluent-kafka depend requirements của module. SDK không có install_requires confluent-kafka :)** ::cheers::
* Bản này yêu cầu confluent-kafka >= 1.7.0 & requests>=2.25.1
# create topic
Note:
- EnsureKafkaTopic.REPLICATION_ASSIGNMENT là giá trị các brokers isolate của từng module. Nếu giá trị này chưa được DevOps khởi tạo nó sẽ được lấy giá trị từ *DEFAULT_BROKER_ID_ASSIGN*  
```python
from mobio.libs.kafka_lib.helpers.ensure_kafka_topic import EnsureKafkaTopic
import os
from mobio.libs.kafka_lib import MobioEnvironment

EnsureKafkaTopic().create_kafka_topics(
        [
            # TEST WITH SET replica_assignment
            {
                EnsureKafkaTopic.TOPIC_NAME: "giang-test1",
                EnsureKafkaTopic.NUM_PARTITIONS: 8,
                EnsureKafkaTopic.CONFIG: {"compression.type": "snappy"},
                EnsureKafkaTopic.REPLICATION_ASSIGNMENT: os.getenv(
                    MobioEnvironment.SALE_BROKER_ID_ASSIGN # SALE_BROKER_ID_ASSIGN
                ) # danh sách các broker_ids "10,20,30" ,
            },
            # TEST WITH SET replica_factor
            {
                EnsureKafkaTopic.TOPIC_NAME: "giang-test2",
                EnsureKafkaTopic.REPLICATION_ASSIGNMENT: os.getenv(
                    MobioEnvironment.PROFILING_BROKER_ID_ASSIGN
                )
            },
            # TEST WITH SET config
            {
                EnsureKafkaTopic.TOPIC_NAME: "giang-test3",
                EnsureKafkaTopic.NUM_PARTITIONS: 1,
                EnsureKafkaTopic.CONFIG: {"compression.type": "snappy"},
                EnsureKafkaTopic.REPLICATION_ASSIGNMENT: os.getenv(
                    MobioEnvironment.JB_BROKER_ID_ASSIGN
                )
            },
            # TEST WITHOUT manual config
            {
                EnsureKafkaTopic.TOPIC_NAME: "giang-test4",
                EnsureKafkaTopic.NUM_PARTITIONS: 1,
            },
        ]
    )
```


# Producer
```python
from mobio.libs.kafka_lib.helpers.kafka_producer_manager import KafkaProducerManager
KafkaProducerManager().flush_message(topic="test", key="uuid", value={"test":1})
```

# Consumer normal
```python
import os
from pymongo import MongoClient
from mobio.libs.kafka_lib.helpers.kafka_consumer_manager import BaseKafkaConsumer


class TestConsumer(BaseKafkaConsumer):
    def message_handle(self, data):
        print("TestConsumer: data: {}".format(data))


if __name__ == "__main__":
    url_connection = os.getenv('TEST_MONGO_URI')
    client_mongo = MongoClient(url_connection, connect=False)

    TestConsumer(topic_name="test", group_id="test", client_mongo=client_mongo, retryable=True, lst_subscribe_topic=['test', 'test1'], retry_topic='test1')
```

# Consumer with bloom filter
```python
import os
from pymongo import MongoClient
from mobio.libs.kafka_lib.helpers.kafka_consumer_manager import BaseKafkaConsumer


class TestConsumer(BaseKafkaConsumer):
    def check_msg_is_processed(self, payload: dict) -> bool:
      msg_id = payload.get("msg_id")
      exists = self.client_mongo.get_database("test_db")["test_collection"].find_one({"id": msg_id})
      return True if exists else False
      
    def message_handle(self, data):
        print("TestConsumer: data: {}".format(data))


if __name__ == "__main__":
    url_connection = os.getenv('TEST_MONGO_URI')
    client_mongo = MongoClient(url_connection, connect=False)

    TestConsumer(topic_name="test", group_id="test", client_mongo=client_mongo, retryable=True, lst_subscribe_topic=['test', 'test1'], retry_topic='test1', enable_bloom=True, auto_commit=True)
```

# change logs
* 1.0.1 (2024-01-05)
  * Truyền redis-client trong parameters khởi tạo consumer, nếu không truyền thì tạo redis-client từ URI mặc định.
* 1.0.0 (2023-12-13):
  * Cho phép cấu hình auto/manual commit
  * Sử dụng bloom-filter để check duplicate message trong quá trình consume message
  * Yêu cầu sử dụng cùng redis-bloom nếu enable bloom
* 0.1.8 (2023-08-25):
  * thêm option lst_subscribe_topic
    * Nếu truyền lst_subscribe_topic thì bỏ qua topic_name
    * Nếu ko truyền lst_subscribe_topic thì lấy topic_name làm topic subscribe
  * thêm option retry_topic
    * Nếu không chỉ định sẽ lấy topic đầu tiên của list subscribe
    * Nếu ko truyền list subscribe thì lấy topic_name làm retry
* 0.1.7 (2023-04-18):
  * cho phép truyền vào broker khi khởi tạo consumer, producer. Nếu không truyền thì sẽ lấy mặc định trong ENV KAFKA_BROKER
  * singleton producer với *args && **kwargs
  * bỏ requirements "m-singleton>=0.3", "m-caching>=0.1.8"
* 0.1.6 (2023-04-05):
  * k8s liveness v2
* 0.1.5 (2022-10-12):
  * raise exception khi close kafka, đảm bảo k8s sẽ restart lại pod
* 0.1.4.2 (2022-09-19):
  * fix bug topic được tạo đang random vào cả các broker isolate cho module khác.
* 0.1.4.1 (2022-09-19):
  * cho phép truyền vào brokers mà topic này sẽ được replicate
* 0.1.4 (2022-08-23):
  * Bổ sung thêm phần lưu mapping pod-name và client-id vào file 
* 0.1.3:
  * Mặc định compress messages ở đầu producer
  * Function create kafka topic hỗ trợ truyền số partitions và settings của topic  

* 0.1.1: fix bug init Config

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/mobiovn",
    "name": "m-kafka-sdk-v2",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3",
    "maintainer_email": "",
    "keywords": "mobio,kafka,m-kafka",
    "author": "MOBIO",
    "author_email": "contact@mobio.vn",
    "download_url": "https://files.pythonhosted.org/packages/69/04/06ff7d4740b080cb2875e9b9e8a2aa058620478de1f648f2bc5365926b1a/m-kafka-sdk-v2-1.0.1.tar.gz",
    "platform": null,
    "description": "- **Th\u01b0 vi\u1ec7n Consumer c\u1ee7a JB. Ch\u1ea1y consumer \u1edf Process, ph\u00f9 h\u1ee3p cho m\u00f4i tr\u01b0\u1eddng K8s** :\n\n- **NOTE: SDK s\u1eed d\u1ee5ng confluent-kafka depend requirements c\u1ee7a module. SDK kh\u00f4ng c\u00f3 install_requires confluent-kafka :)** ::cheers::\n* B\u1ea3n n\u00e0y y\u00eau c\u1ea7u confluent-kafka >= 1.7.0 & requests>=2.25.1\n# create topic\nNote:\n- EnsureKafkaTopic.REPLICATION_ASSIGNMENT l\u00e0 gi\u00e1 tr\u1ecb c\u00e1c brokers isolate c\u1ee7a t\u1eebng module. N\u1ebfu gi\u00e1 tr\u1ecb n\u00e0y ch\u01b0a \u0111\u01b0\u1ee3c DevOps kh\u1edfi t\u1ea1o n\u00f3 s\u1ebd \u0111\u01b0\u1ee3c l\u1ea5y gi\u00e1 tr\u1ecb t\u1eeb *DEFAULT_BROKER_ID_ASSIGN*  \n```python\nfrom mobio.libs.kafka_lib.helpers.ensure_kafka_topic import EnsureKafkaTopic\nimport os\nfrom mobio.libs.kafka_lib import MobioEnvironment\n\nEnsureKafkaTopic().create_kafka_topics(\n        [\n            # TEST WITH SET replica_assignment\n            {\n                EnsureKafkaTopic.TOPIC_NAME: \"giang-test1\",\n                EnsureKafkaTopic.NUM_PARTITIONS: 8,\n                EnsureKafkaTopic.CONFIG: {\"compression.type\": \"snappy\"},\n                EnsureKafkaTopic.REPLICATION_ASSIGNMENT: os.getenv(\n                    MobioEnvironment.SALE_BROKER_ID_ASSIGN # SALE_BROKER_ID_ASSIGN\n                ) # danh s\u00e1ch c\u00e1c broker_ids \"10,20,30\" ,\n            },\n            # TEST WITH SET replica_factor\n            {\n                EnsureKafkaTopic.TOPIC_NAME: \"giang-test2\",\n                EnsureKafkaTopic.REPLICATION_ASSIGNMENT: os.getenv(\n                    MobioEnvironment.PROFILING_BROKER_ID_ASSIGN\n                )\n            },\n            # TEST WITH SET config\n            {\n                EnsureKafkaTopic.TOPIC_NAME: \"giang-test3\",\n                EnsureKafkaTopic.NUM_PARTITIONS: 1,\n                EnsureKafkaTopic.CONFIG: {\"compression.type\": \"snappy\"},\n                EnsureKafkaTopic.REPLICATION_ASSIGNMENT: os.getenv(\n                    MobioEnvironment.JB_BROKER_ID_ASSIGN\n                )\n            },\n            # TEST WITHOUT manual config\n            {\n                EnsureKafkaTopic.TOPIC_NAME: \"giang-test4\",\n                EnsureKafkaTopic.NUM_PARTITIONS: 1,\n            },\n        ]\n    )\n```\n\n\n# Producer\n```python\nfrom mobio.libs.kafka_lib.helpers.kafka_producer_manager import KafkaProducerManager\nKafkaProducerManager().flush_message(topic=\"test\", key=\"uuid\", value={\"test\":1})\n```\n\n# Consumer normal\n```python\nimport os\nfrom pymongo import MongoClient\nfrom mobio.libs.kafka_lib.helpers.kafka_consumer_manager import BaseKafkaConsumer\n\n\nclass TestConsumer(BaseKafkaConsumer):\n    def message_handle(self, data):\n        print(\"TestConsumer: data: {}\".format(data))\n\n\nif __name__ == \"__main__\":\n    url_connection = os.getenv('TEST_MONGO_URI')\n    client_mongo = MongoClient(url_connection, connect=False)\n\n    TestConsumer(topic_name=\"test\", group_id=\"test\", client_mongo=client_mongo, retryable=True, lst_subscribe_topic=['test', 'test1'], retry_topic='test1')\n```\n\n# Consumer with bloom filter\n```python\nimport os\nfrom pymongo import MongoClient\nfrom mobio.libs.kafka_lib.helpers.kafka_consumer_manager import BaseKafkaConsumer\n\n\nclass TestConsumer(BaseKafkaConsumer):\n    def check_msg_is_processed(self, payload: dict) -> bool:\n      msg_id = payload.get(\"msg_id\")\n      exists = self.client_mongo.get_database(\"test_db\")[\"test_collection\"].find_one({\"id\": msg_id})\n      return True if exists else False\n      \n    def message_handle(self, data):\n        print(\"TestConsumer: data: {}\".format(data))\n\n\nif __name__ == \"__main__\":\n    url_connection = os.getenv('TEST_MONGO_URI')\n    client_mongo = MongoClient(url_connection, connect=False)\n\n    TestConsumer(topic_name=\"test\", group_id=\"test\", client_mongo=client_mongo, retryable=True, lst_subscribe_topic=['test', 'test1'], retry_topic='test1', enable_bloom=True, auto_commit=True)\n```\n\n# change logs\n* 1.0.1 (2024-01-05)\n  * Truy\u1ec1n redis-client trong parameters kh\u1edfi t\u1ea1o consumer, n\u1ebfu kh\u00f4ng truy\u1ec1n th\u00ec t\u1ea1o redis-client t\u1eeb URI m\u1eb7c \u0111\u1ecbnh.\n* 1.0.0 (2023-12-13):\n  * Cho ph\u00e9p c\u1ea5u h\u00ecnh auto/manual commit\n  * S\u1eed d\u1ee5ng bloom-filter \u0111\u1ec3 check duplicate message trong qu\u00e1 tr\u00ecnh consume message\n  * Y\u00eau c\u1ea7u s\u1eed d\u1ee5ng c\u00f9ng redis-bloom n\u1ebfu enable bloom\n* 0.1.8 (2023-08-25):\n  * th\u00eam option lst_subscribe_topic\n    * N\u1ebfu truy\u1ec1n lst_subscribe_topic th\u00ec b\u1ecf qua topic_name\n    * N\u1ebfu ko truy\u1ec1n lst_subscribe_topic th\u00ec l\u1ea5y topic_name l\u00e0m topic subscribe\n  * th\u00eam option retry_topic\n    * N\u1ebfu kh\u00f4ng ch\u1ec9 \u0111\u1ecbnh s\u1ebd l\u1ea5y topic \u0111\u1ea7u ti\u00ean c\u1ee7a list subscribe\n    * N\u1ebfu ko truy\u1ec1n list subscribe th\u00ec l\u1ea5y topic_name l\u00e0m retry\n* 0.1.7 (2023-04-18):\n  * cho ph\u00e9p truy\u1ec1n v\u00e0o broker khi kh\u1edfi t\u1ea1o consumer, producer. N\u1ebfu kh\u00f4ng truy\u1ec1n th\u00ec s\u1ebd l\u1ea5y m\u1eb7c \u0111\u1ecbnh trong ENV KAFKA_BROKER\n  * singleton producer v\u1edbi *args && **kwargs\n  * b\u1ecf requirements \"m-singleton>=0.3\", \"m-caching>=0.1.8\"\n* 0.1.6 (2023-04-05):\n  * k8s liveness v2\n* 0.1.5 (2022-10-12):\n  * raise exception khi close kafka, \u0111\u1ea3m b\u1ea3o k8s s\u1ebd restart l\u1ea1i pod\n* 0.1.4.2 (2022-09-19):\n  * fix bug topic \u0111\u01b0\u1ee3c t\u1ea1o \u0111ang random v\u00e0o c\u1ea3 c\u00e1c broker isolate cho module kh\u00e1c.\n* 0.1.4.1 (2022-09-19):\n  * cho ph\u00e9p truy\u1ec1n v\u00e0o brokers m\u00e0 topic n\u00e0y s\u1ebd \u0111\u01b0\u1ee3c replicate\n* 0.1.4 (2022-08-23):\n  * B\u1ed5 sung th\u00eam ph\u1ea7n l\u01b0u mapping pod-name v\u00e0 client-id v\u00e0o file \n* 0.1.3:\n  * M\u1eb7c \u0111\u1ecbnh compress messages \u1edf \u0111\u1ea7u producer\n  * Function create kafka topic h\u1ed7 tr\u1ee3 truy\u1ec1n s\u1ed1 partitions v\u00e0 settings c\u1ee7a topic  \n\n* 0.1.1: fix bug init Config\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Mobio Kafka SDK",
    "version": "1.0.1",
    "project_urls": {
        "Homepage": "https://github.com/mobiovn",
        "Source": "https://github.com/mobiovn"
    },
    "split_keywords": [
        "mobio",
        "kafka",
        "m-kafka"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "690406ff7d4740b080cb2875e9b9e8a2aa058620478de1f648f2bc5365926b1a",
                "md5": "206dcd9d985d607407ea2062216f4ed0",
                "sha256": "4acfe988c9f4671a236d63a9616e00c0a7d93a5fc5a7ecfb8917105ba17d41bd"
            },
            "downloads": -1,
            "filename": "m-kafka-sdk-v2-1.0.1.tar.gz",
            "has_sig": false,
            "md5_digest": "206dcd9d985d607407ea2062216f4ed0",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3",
            "size": 15497,
            "upload_time": "2024-01-11T10:08:09",
            "upload_time_iso_8601": "2024-01-11T10:08:09.334802Z",
            "url": "https://files.pythonhosted.org/packages/69/04/06ff7d4740b080cb2875e9b9e8a2aa058620478de1f648f2bc5365926b1a/m-kafka-sdk-v2-1.0.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-01-11 10:08:09",
    "github": false,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "lcname": "m-kafka-sdk-v2"
}
        
Elapsed time: 0.20601s