m-kafka-sdk-v2


Namem-kafka-sdk-v2 JSON
Version 1.0.6 PyPI version JSON
download
home_pagehttps://github.com/mobiovn
SummaryMobio Kafka SDK
upload_time2024-10-25 10:55:17
maintainerNone
docs_urlNone
authorMOBIO
requires_python>=3
licenseMIT
keywords mobio kafka m-kafka
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            - **Thư viện Consumer của JB. Chạy consumer ở Process, phù hợp cho môi trường K8s** :

- **NOTE: SDK sử dụng confluent-kafka depend requirements của module. SDK không có install_requires confluent-kafka :)** ::cheers::
* Bản này yêu cầu confluent-kafka >= 1.7.0 & requests>=2.25.1
# create topic
Note:
- EnsureKafkaTopic.REPLICATION_ASSIGNMENT là giá trị các brokers isolate của từng module. Nếu giá trị này chưa được DevOps khởi tạo nó sẽ được lấy giá trị từ *DEFAULT_BROKER_ID_ASSIGN*  
```python
from mobio.libs.kafka_lib.helpers.ensure_kafka_topic import EnsureKafkaTopic
import os
from mobio.libs.kafka_lib import MobioEnvironment

EnsureKafkaTopic().create_kafka_topics(
        [
            # TEST WITH SET replica_assignment
            {
                EnsureKafkaTopic.TOPIC_NAME: "giang-test1",
                EnsureKafkaTopic.NUM_PARTITIONS: 8,
                EnsureKafkaTopic.CONFIG: {"compression.type": "snappy"},
                EnsureKafkaTopic.REPLICATION_ASSIGNMENT: os.getenv(
                    MobioEnvironment.SALE_BROKER_ID_ASSIGN # SALE_BROKER_ID_ASSIGN
                ) # danh sách các broker_ids "10,20,30" ,
            },
            # TEST WITH SET replica_factor
            {
                EnsureKafkaTopic.TOPIC_NAME: "giang-test2",
                EnsureKafkaTopic.REPLICATION_ASSIGNMENT: os.getenv(
                    MobioEnvironment.PROFILING_BROKER_ID_ASSIGN
                )
            },
            # TEST WITH SET config
            {
                EnsureKafkaTopic.TOPIC_NAME: "giang-test3",
                EnsureKafkaTopic.NUM_PARTITIONS: 1,
                EnsureKafkaTopic.CONFIG: {"compression.type": "snappy"},
                EnsureKafkaTopic.REPLICATION_ASSIGNMENT: os.getenv(
                    MobioEnvironment.JB_BROKER_ID_ASSIGN
                )
            },
            # TEST WITHOUT manual config
            {
                EnsureKafkaTopic.TOPIC_NAME: "giang-test4",
                EnsureKafkaTopic.NUM_PARTITIONS: 1,
            },
        ]
    )
```


# Producer
```python
from mobio.libs.kafka_lib.helpers.kafka_producer_manager import KafkaProducerManager
KafkaProducerManager().flush_message(topic="test", key="uuid", value={"test":1})
```

# Consumer normal
```python
import os
from pymongo import MongoClient
from mobio.libs.kafka_lib.helpers.kafka_consumer_manager import BaseKafkaConsumer


class TestConsumer(BaseKafkaConsumer):
    def message_handle(self, data):
        print("TestConsumer: data: {}".format(data))


if __name__ == "__main__":
    url_connection = os.getenv('TEST_MONGO_URI')
    client_mongo = MongoClient(url_connection, connect=False)

    TestConsumer(topic_name="test", group_id="test", client_mongo=client_mongo, retryable=True, lst_subscribe_topic=['test', 'test1'], retry_topic='test1')
```

# Consumer with bloom filter
```python
import os
from pymongo import MongoClient
from mobio.libs.kafka_lib.helpers.kafka_consumer_manager import BaseKafkaConsumer


class TestConsumer(BaseKafkaConsumer):
    def check_msg_is_processed(self, payload: dict) -> bool:
      msg_id = payload.get("msg_id")
      exists = self.client_mongo.get_database("test_db")["test_collection"].find_one({"id": msg_id})
      return True if exists else False
      
    def message_handle(self, data):
        print("TestConsumer: data: {}".format(data))


if __name__ == "__main__":
    url_connection = os.getenv('TEST_MONGO_URI')
    client_mongo = MongoClient(url_connection, connect=False)

    TestConsumer(topic_name="test", group_id="test", client_mongo=client_mongo, retryable=True, lst_subscribe_topic=['test', 'test1'], retry_topic='test1', enable_bloom=True, auto_commit=True)
```

# change logs
* 1.0.6 (2024-10-25)
  * Fix lỗi bỏ qua message pingpong

* 1.0.5 (2024-09-20)
  * Bỏ qua message pingpong

* 1.0.4 (2024-07-29)
  * Sử dụng redisbloom<=0.4.0 do redisbloom==0.4.1 bắt buộc redis==3.5.3

* 1.0.3 (2024-07-23)
  * Tự thêm expire_time vào tất cả các bản tin requeue, các bản tin này sẽ tự động expire sau 7 ngày (bất kể bản tin có được requeue hay chưa).
  * Fix issue init bloom
  * Chuyển sang cuckoo-filter thay vì bloom-filter
    * tự động xóa các message đã được commit khỏi cuckoo (tránh phồng redis-cache)

* 1.0.2 (2024-05-20)
  * Build 1.0.1-rc3 thành 1.0.2, không sửa logic

* 1.0.1-rc3 (2024-03-20)
  * Batch consume message
    * Bỏ log "no offset to commit"
    * Sử dụng redisbloom<=0.4.0 do redisbloom==0.4.1 bắt buộc redis==3.5.3

* 1.0.1-rc2 (2024-03-01)
  * Batch consume message
    * Fix lỗi không commit message khi xử lý xong

* 1.0.1-rc1 (2024-02-15)
  * Batch consume message
    * Ở phiên bản hiện tại, batch consumer chưa support requeue. 

* 1.0.1 (2024-01-05)
  * Truyền redis-client trong parameters khởi tạo consumer, nếu không truyền thì tạo redis-client từ URI mặc định.
* 1.0.0 (2023-12-13):
  * Cho phép cấu hình auto/manual commit
  * Sử dụng bloom-filter để check duplicate message trong quá trình consume message
  * Yêu cầu sử dụng cùng redis-bloom nếu enable bloom
* 0.1.8 (2023-08-25):
  * thêm option lst_subscribe_topic
    * Nếu truyền lst_subscribe_topic thì bỏ qua topic_name
    * Nếu ko truyền lst_subscribe_topic thì lấy topic_name làm topic subscribe
  * thêm option retry_topic
    * Nếu không chỉ định sẽ lấy topic đầu tiên của list subscribe
    * Nếu ko truyền list subscribe thì lấy topic_name làm retry
* 0.1.7 (2023-04-18):
  * cho phép truyền vào broker khi khởi tạo consumer, producer. Nếu không truyền thì sẽ lấy mặc định trong ENV KAFKA_BROKER
  * singleton producer với *args && **kwargs
  * bỏ requirements "m-singleton>=0.3", "m-caching>=0.1.8"
* 0.1.6 (2023-04-05):
  * k8s liveness v2
* 0.1.5 (2022-10-12):
  * raise exception khi close kafka, đảm bảo k8s sẽ restart lại pod
* 0.1.4.2 (2022-09-19):
  * fix bug topic được tạo đang random vào cả các broker isolate cho module khác.
* 0.1.4.1 (2022-09-19):
  * cho phép truyền vào brokers mà topic này sẽ được replicate
* 0.1.4 (2022-08-23):
  * Bổ sung thêm phần lưu mapping pod-name và client-id vào file 
* 0.1.3:
  * Mặc định compress messages ở đầu producer
  * Function create kafka topic hỗ trợ truyền số partitions và settings của topic  

* 0.1.1: fix bug init Config

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/mobiovn",
    "name": "m-kafka-sdk-v2",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3",
    "maintainer_email": null,
    "keywords": "mobio, kafka, m-kafka",
    "author": "MOBIO",
    "author_email": "contact@mobio.vn",
    "download_url": "https://files.pythonhosted.org/packages/63/89/bc97ca9796deee6c07cf68e9d445fb6f373fefec324b767eb4213728472f/m_kafka_sdk_v2-1.0.6.tar.gz",
    "platform": null,
    "description": "- **Th\u01b0 vi\u1ec7n Consumer c\u1ee7a JB. Ch\u1ea1y consumer \u1edf Process, ph\u00f9 h\u1ee3p cho m\u00f4i tr\u01b0\u1eddng K8s** :\n\n- **NOTE: SDK s\u1eed d\u1ee5ng confluent-kafka depend requirements c\u1ee7a module. SDK kh\u00f4ng c\u00f3 install_requires confluent-kafka :)** ::cheers::\n* B\u1ea3n n\u00e0y y\u00eau c\u1ea7u confluent-kafka >= 1.7.0 & requests>=2.25.1\n# create topic\nNote:\n- EnsureKafkaTopic.REPLICATION_ASSIGNMENT l\u00e0 gi\u00e1 tr\u1ecb c\u00e1c brokers isolate c\u1ee7a t\u1eebng module. N\u1ebfu gi\u00e1 tr\u1ecb n\u00e0y ch\u01b0a \u0111\u01b0\u1ee3c DevOps kh\u1edfi t\u1ea1o n\u00f3 s\u1ebd \u0111\u01b0\u1ee3c l\u1ea5y gi\u00e1 tr\u1ecb t\u1eeb *DEFAULT_BROKER_ID_ASSIGN*  \n```python\nfrom mobio.libs.kafka_lib.helpers.ensure_kafka_topic import EnsureKafkaTopic\nimport os\nfrom mobio.libs.kafka_lib import MobioEnvironment\n\nEnsureKafkaTopic().create_kafka_topics(\n        [\n            # TEST WITH SET replica_assignment\n            {\n                EnsureKafkaTopic.TOPIC_NAME: \"giang-test1\",\n                EnsureKafkaTopic.NUM_PARTITIONS: 8,\n                EnsureKafkaTopic.CONFIG: {\"compression.type\": \"snappy\"},\n                EnsureKafkaTopic.REPLICATION_ASSIGNMENT: os.getenv(\n                    MobioEnvironment.SALE_BROKER_ID_ASSIGN # SALE_BROKER_ID_ASSIGN\n                ) # danh s\u00e1ch c\u00e1c broker_ids \"10,20,30\" ,\n            },\n            # TEST WITH SET replica_factor\n            {\n                EnsureKafkaTopic.TOPIC_NAME: \"giang-test2\",\n                EnsureKafkaTopic.REPLICATION_ASSIGNMENT: os.getenv(\n                    MobioEnvironment.PROFILING_BROKER_ID_ASSIGN\n                )\n            },\n            # TEST WITH SET config\n            {\n                EnsureKafkaTopic.TOPIC_NAME: \"giang-test3\",\n                EnsureKafkaTopic.NUM_PARTITIONS: 1,\n                EnsureKafkaTopic.CONFIG: {\"compression.type\": \"snappy\"},\n                EnsureKafkaTopic.REPLICATION_ASSIGNMENT: os.getenv(\n                    MobioEnvironment.JB_BROKER_ID_ASSIGN\n                )\n            },\n            # TEST WITHOUT manual config\n            {\n                EnsureKafkaTopic.TOPIC_NAME: \"giang-test4\",\n                EnsureKafkaTopic.NUM_PARTITIONS: 1,\n            },\n        ]\n    )\n```\n\n\n# Producer\n```python\nfrom mobio.libs.kafka_lib.helpers.kafka_producer_manager import KafkaProducerManager\nKafkaProducerManager().flush_message(topic=\"test\", key=\"uuid\", value={\"test\":1})\n```\n\n# Consumer normal\n```python\nimport os\nfrom pymongo import MongoClient\nfrom mobio.libs.kafka_lib.helpers.kafka_consumer_manager import BaseKafkaConsumer\n\n\nclass TestConsumer(BaseKafkaConsumer):\n    def message_handle(self, data):\n        print(\"TestConsumer: data: {}\".format(data))\n\n\nif __name__ == \"__main__\":\n    url_connection = os.getenv('TEST_MONGO_URI')\n    client_mongo = MongoClient(url_connection, connect=False)\n\n    TestConsumer(topic_name=\"test\", group_id=\"test\", client_mongo=client_mongo, retryable=True, lst_subscribe_topic=['test', 'test1'], retry_topic='test1')\n```\n\n# Consumer with bloom filter\n```python\nimport os\nfrom pymongo import MongoClient\nfrom mobio.libs.kafka_lib.helpers.kafka_consumer_manager import BaseKafkaConsumer\n\n\nclass TestConsumer(BaseKafkaConsumer):\n    def check_msg_is_processed(self, payload: dict) -> bool:\n      msg_id = payload.get(\"msg_id\")\n      exists = self.client_mongo.get_database(\"test_db\")[\"test_collection\"].find_one({\"id\": msg_id})\n      return True if exists else False\n      \n    def message_handle(self, data):\n        print(\"TestConsumer: data: {}\".format(data))\n\n\nif __name__ == \"__main__\":\n    url_connection = os.getenv('TEST_MONGO_URI')\n    client_mongo = MongoClient(url_connection, connect=False)\n\n    TestConsumer(topic_name=\"test\", group_id=\"test\", client_mongo=client_mongo, retryable=True, lst_subscribe_topic=['test', 'test1'], retry_topic='test1', enable_bloom=True, auto_commit=True)\n```\n\n# change logs\n* 1.0.6 (2024-10-25)\n  * Fix l\u1ed7i b\u1ecf qua message pingpong\n\n* 1.0.5 (2024-09-20)\n  * B\u1ecf qua message pingpong\n\n* 1.0.4 (2024-07-29)\n  * S\u1eed d\u1ee5ng redisbloom<=0.4.0 do redisbloom==0.4.1 b\u1eaft bu\u1ed9c redis==3.5.3\n\n* 1.0.3 (2024-07-23)\n  * T\u1ef1 th\u00eam expire_time v\u00e0o t\u1ea5t c\u1ea3 c\u00e1c b\u1ea3n tin requeue, c\u00e1c b\u1ea3n tin n\u00e0y s\u1ebd t\u1ef1 \u0111\u1ed9ng expire sau 7 ng\u00e0y (b\u1ea5t k\u1ec3 b\u1ea3n tin c\u00f3 \u0111\u01b0\u1ee3c requeue hay ch\u01b0a).\n  * Fix issue init bloom\n  * Chuy\u1ec3n sang cuckoo-filter thay v\u00ec bloom-filter\n    * t\u1ef1 \u0111\u1ed9ng x\u00f3a c\u00e1c message \u0111\u00e3 \u0111\u01b0\u1ee3c commit kh\u1ecfi cuckoo (tr\u00e1nh ph\u1ed3ng redis-cache)\n\n* 1.0.2 (2024-05-20)\n  * Build 1.0.1-rc3 th\u00e0nh 1.0.2, kh\u00f4ng s\u1eeda logic\n\n* 1.0.1-rc3 (2024-03-20)\n  * Batch consume message\n    * B\u1ecf log \"no offset to commit\"\n    * S\u1eed d\u1ee5ng redisbloom<=0.4.0 do redisbloom==0.4.1 b\u1eaft bu\u1ed9c redis==3.5.3\n\n* 1.0.1-rc2 (2024-03-01)\n  * Batch consume message\n    * Fix l\u1ed7i kh\u00f4ng commit message khi x\u1eed l\u00fd xong\n\n* 1.0.1-rc1 (2024-02-15)\n  * Batch consume message\n    * \u01a0\u0309 phi\u00ean ba\u0309n hi\u00ea\u0323n ta\u0323i, batch consumer ch\u01b0a support requeue. \n\n* 1.0.1 (2024-01-05)\n  * Truy\u1ec1n redis-client trong parameters kh\u1edfi t\u1ea1o consumer, n\u1ebfu kh\u00f4ng truy\u1ec1n th\u00ec t\u1ea1o redis-client t\u1eeb URI m\u1eb7c \u0111\u1ecbnh.\n* 1.0.0 (2023-12-13):\n  * Cho ph\u00e9p c\u1ea5u h\u00ecnh auto/manual commit\n  * S\u1eed d\u1ee5ng bloom-filter \u0111\u1ec3 check duplicate message trong qu\u00e1 tr\u00ecnh consume message\n  * Y\u00eau c\u1ea7u s\u1eed d\u1ee5ng c\u00f9ng redis-bloom n\u1ebfu enable bloom\n* 0.1.8 (2023-08-25):\n  * th\u00eam option lst_subscribe_topic\n    * N\u1ebfu truy\u1ec1n lst_subscribe_topic th\u00ec b\u1ecf qua topic_name\n    * N\u1ebfu ko truy\u1ec1n lst_subscribe_topic th\u00ec l\u1ea5y topic_name l\u00e0m topic subscribe\n  * th\u00eam option retry_topic\n    * N\u1ebfu kh\u00f4ng ch\u1ec9 \u0111\u1ecbnh s\u1ebd l\u1ea5y topic \u0111\u1ea7u ti\u00ean c\u1ee7a list subscribe\n    * N\u1ebfu ko truy\u1ec1n list subscribe th\u00ec l\u1ea5y topic_name l\u00e0m retry\n* 0.1.7 (2023-04-18):\n  * cho ph\u00e9p truy\u1ec1n v\u00e0o broker khi kh\u1edfi t\u1ea1o consumer, producer. N\u1ebfu kh\u00f4ng truy\u1ec1n th\u00ec s\u1ebd l\u1ea5y m\u1eb7c \u0111\u1ecbnh trong ENV KAFKA_BROKER\n  * singleton producer v\u1edbi *args && **kwargs\n  * b\u1ecf requirements \"m-singleton>=0.3\", \"m-caching>=0.1.8\"\n* 0.1.6 (2023-04-05):\n  * k8s liveness v2\n* 0.1.5 (2022-10-12):\n  * raise exception khi close kafka, \u0111\u1ea3m b\u1ea3o k8s s\u1ebd restart l\u1ea1i pod\n* 0.1.4.2 (2022-09-19):\n  * fix bug topic \u0111\u01b0\u1ee3c t\u1ea1o \u0111ang random v\u00e0o c\u1ea3 c\u00e1c broker isolate cho module kh\u00e1c.\n* 0.1.4.1 (2022-09-19):\n  * cho ph\u00e9p truy\u1ec1n v\u00e0o brokers m\u00e0 topic n\u00e0y s\u1ebd \u0111\u01b0\u1ee3c replicate\n* 0.1.4 (2022-08-23):\n  * B\u1ed5 sung th\u00eam ph\u1ea7n l\u01b0u mapping pod-name v\u00e0 client-id v\u00e0o file \n* 0.1.3:\n  * M\u1eb7c \u0111\u1ecbnh compress messages \u1edf \u0111\u1ea7u producer\n  * Function create kafka topic h\u1ed7 tr\u1ee3 truy\u1ec1n s\u1ed1 partitions v\u00e0 settings c\u1ee7a topic  \n\n* 0.1.1: fix bug init Config\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Mobio Kafka SDK",
    "version": "1.0.6",
    "project_urls": {
        "Homepage": "https://github.com/mobiovn",
        "Source": "https://github.com/mobiovn"
    },
    "split_keywords": [
        "mobio",
        " kafka",
        " m-kafka"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "6389bc97ca9796deee6c07cf68e9d445fb6f373fefec324b767eb4213728472f",
                "md5": "0b6db31cda446d90db37be9e09ae6b24",
                "sha256": "ab243f129b431d4a859700093fd956d1c7d6a567f5e2b5213d3cf724ae18945c"
            },
            "downloads": -1,
            "filename": "m_kafka_sdk_v2-1.0.6.tar.gz",
            "has_sig": false,
            "md5_digest": "0b6db31cda446d90db37be9e09ae6b24",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3",
            "size": 18062,
            "upload_time": "2024-10-25T10:55:17",
            "upload_time_iso_8601": "2024-10-25T10:55:17.024108Z",
            "url": "https://files.pythonhosted.org/packages/63/89/bc97ca9796deee6c07cf68e9d445fb6f373fefec324b767eb4213728472f/m_kafka_sdk_v2-1.0.6.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-10-25 10:55:17",
    "github": false,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "lcname": "m-kafka-sdk-v2"
}
        
Elapsed time: 0.38638s