- **Thư viện Consumer của JB. Chạy consumer ở Process, phù hợp cho môi trường K8s** :
- **NOTE: SDK sử dụng confluent-kafka depend requirements của module. SDK không có install_requires confluent-kafka :)** ::cheers::
* Bản này yêu cầu confluent-kafka >= 1.7.0 & requests>=2.25.1
# create topic
Note:
- EnsureKafkaTopic.REPLICATION_ASSIGNMENT là giá trị các brokers isolate của từng module. Nếu giá trị này chưa được DevOps khởi tạo nó sẽ được lấy giá trị từ *DEFAULT_BROKER_ID_ASSIGN*
```python
from mobio.libs.kafka_lib.helpers.ensure_kafka_topic import EnsureKafkaTopic
import os
from mobio.libs.kafka_lib import MobioEnvironment
EnsureKafkaTopic().create_kafka_topics(
[
# TEST WITH SET replica_assignment
{
EnsureKafkaTopic.TOPIC_NAME: "giang-test1",
EnsureKafkaTopic.NUM_PARTITIONS: 8,
EnsureKafkaTopic.CONFIG: {"compression.type": "snappy"},
EnsureKafkaTopic.REPLICATION_ASSIGNMENT: os.getenv(
MobioEnvironment.SALE_BROKER_ID_ASSIGN # SALE_BROKER_ID_ASSIGN
) # danh sách các broker_ids "10,20,30" ,
},
# TEST WITH SET replica_factor
{
EnsureKafkaTopic.TOPIC_NAME: "giang-test2",
EnsureKafkaTopic.REPLICATION_ASSIGNMENT: os.getenv(
MobioEnvironment.PROFILING_BROKER_ID_ASSIGN
)
},
# TEST WITH SET config
{
EnsureKafkaTopic.TOPIC_NAME: "giang-test3",
EnsureKafkaTopic.NUM_PARTITIONS: 1,
EnsureKafkaTopic.CONFIG: {"compression.type": "snappy"},
EnsureKafkaTopic.REPLICATION_ASSIGNMENT: os.getenv(
MobioEnvironment.JB_BROKER_ID_ASSIGN
)
},
# TEST WITHOUT manual config
{
EnsureKafkaTopic.TOPIC_NAME: "giang-test4",
EnsureKafkaTopic.NUM_PARTITIONS: 1,
},
]
)
```
# Producer
```python
from mobio.libs.kafka_lib.helpers.kafka_producer_manager import KafkaProducerManager
KafkaProducerManager().flush_message(topic="test", key="uuid", value={"test":1})
```
# Consumer normal
```python
import os
from pymongo import MongoClient
from mobio.libs.kafka_lib.helpers.kafka_consumer_manager import BaseKafkaConsumer
class TestConsumer(BaseKafkaConsumer):
def message_handle(self, data):
print("TestConsumer: data: {}".format(data))
if __name__ == "__main__":
url_connection = os.getenv('TEST_MONGO_URI')
client_mongo = MongoClient(url_connection, connect=False)
TestConsumer(topic_name="test", group_id="test", client_mongo=client_mongo, retryable=True, lst_subscribe_topic=['test', 'test1'], retry_topic='test1')
```
# Consumer with bloom filter
```python
import os
from pymongo import MongoClient
from mobio.libs.kafka_lib.helpers.kafka_consumer_manager import BaseKafkaConsumer
class TestConsumer(BaseKafkaConsumer):
def check_msg_is_processed(self, payload: dict) -> bool:
msg_id = payload.get("msg_id")
exists = self.client_mongo.get_database("test_db")["test_collection"].find_one({"id": msg_id})
return True if exists else False
def message_handle(self, data):
print("TestConsumer: data: {}".format(data))
if __name__ == "__main__":
url_connection = os.getenv('TEST_MONGO_URI')
client_mongo = MongoClient(url_connection, connect=False)
TestConsumer(topic_name="test", group_id="test", client_mongo=client_mongo, retryable=True, lst_subscribe_topic=['test', 'test1'], retry_topic='test1', enable_bloom=True, auto_commit=True)
```
# change logs
* 1.0.1 (2024-01-05)
* Truyền redis-client trong parameters khởi tạo consumer, nếu không truyền thì tạo redis-client từ URI mặc định.
* 1.0.0 (2023-12-13):
* Cho phép cấu hình auto/manual commit
* Sử dụng bloom-filter để check duplicate message trong quá trình consume message
* Yêu cầu sử dụng cùng redis-bloom nếu enable bloom
* 0.1.8 (2023-08-25):
* thêm option lst_subscribe_topic
* Nếu truyền lst_subscribe_topic thì bỏ qua topic_name
* Nếu ko truyền lst_subscribe_topic thì lấy topic_name làm topic subscribe
* thêm option retry_topic
* Nếu không chỉ định sẽ lấy topic đầu tiên của list subscribe
* Nếu ko truyền list subscribe thì lấy topic_name làm retry
* 0.1.7 (2023-04-18):
* cho phép truyền vào broker khi khởi tạo consumer, producer. Nếu không truyền thì sẽ lấy mặc định trong ENV KAFKA_BROKER
* singleton producer với *args && **kwargs
* bỏ requirements "m-singleton>=0.3", "m-caching>=0.1.8"
* 0.1.6 (2023-04-05):
* k8s liveness v2
* 0.1.5 (2022-10-12):
* raise exception khi close kafka, đảm bảo k8s sẽ restart lại pod
* 0.1.4.2 (2022-09-19):
* fix bug topic được tạo đang random vào cả các broker isolate cho module khác.
* 0.1.4.1 (2022-09-19):
* cho phép truyền vào brokers mà topic này sẽ được replicate
* 0.1.4 (2022-08-23):
* Bổ sung thêm phần lưu mapping pod-name và client-id vào file
* 0.1.3:
* Mặc định compress messages ở đầu producer
* Function create kafka topic hỗ trợ truyền số partitions và settings của topic
* 0.1.1: fix bug init Config
Raw data
{
"_id": null,
"home_page": "https://github.com/mobiovn",
"name": "m-kafka-sdk-v2",
"maintainer": "",
"docs_url": null,
"requires_python": ">=3",
"maintainer_email": "",
"keywords": "mobio,kafka,m-kafka",
"author": "MOBIO",
"author_email": "contact@mobio.vn",
"download_url": "https://files.pythonhosted.org/packages/69/04/06ff7d4740b080cb2875e9b9e8a2aa058620478de1f648f2bc5365926b1a/m-kafka-sdk-v2-1.0.1.tar.gz",
"platform": null,
"description": "- **Th\u01b0 vi\u1ec7n Consumer c\u1ee7a JB. Ch\u1ea1y consumer \u1edf Process, ph\u00f9 h\u1ee3p cho m\u00f4i tr\u01b0\u1eddng K8s** :\n\n- **NOTE: SDK s\u1eed d\u1ee5ng confluent-kafka depend requirements c\u1ee7a module. SDK kh\u00f4ng c\u00f3 install_requires confluent-kafka :)** ::cheers::\n* B\u1ea3n n\u00e0y y\u00eau c\u1ea7u confluent-kafka >= 1.7.0 & requests>=2.25.1\n# create topic\nNote:\n- EnsureKafkaTopic.REPLICATION_ASSIGNMENT l\u00e0 gi\u00e1 tr\u1ecb c\u00e1c brokers isolate c\u1ee7a t\u1eebng module. N\u1ebfu gi\u00e1 tr\u1ecb n\u00e0y ch\u01b0a \u0111\u01b0\u1ee3c DevOps kh\u1edfi t\u1ea1o n\u00f3 s\u1ebd \u0111\u01b0\u1ee3c l\u1ea5y gi\u00e1 tr\u1ecb t\u1eeb *DEFAULT_BROKER_ID_ASSIGN* \n```python\nfrom mobio.libs.kafka_lib.helpers.ensure_kafka_topic import EnsureKafkaTopic\nimport os\nfrom mobio.libs.kafka_lib import MobioEnvironment\n\nEnsureKafkaTopic().create_kafka_topics(\n [\n # TEST WITH SET replica_assignment\n {\n EnsureKafkaTopic.TOPIC_NAME: \"giang-test1\",\n EnsureKafkaTopic.NUM_PARTITIONS: 8,\n EnsureKafkaTopic.CONFIG: {\"compression.type\": \"snappy\"},\n EnsureKafkaTopic.REPLICATION_ASSIGNMENT: os.getenv(\n MobioEnvironment.SALE_BROKER_ID_ASSIGN # SALE_BROKER_ID_ASSIGN\n ) # danh s\u00e1ch c\u00e1c broker_ids \"10,20,30\" ,\n },\n # TEST WITH SET replica_factor\n {\n EnsureKafkaTopic.TOPIC_NAME: \"giang-test2\",\n EnsureKafkaTopic.REPLICATION_ASSIGNMENT: os.getenv(\n MobioEnvironment.PROFILING_BROKER_ID_ASSIGN\n )\n },\n # TEST WITH SET config\n {\n EnsureKafkaTopic.TOPIC_NAME: \"giang-test3\",\n EnsureKafkaTopic.NUM_PARTITIONS: 1,\n EnsureKafkaTopic.CONFIG: {\"compression.type\": \"snappy\"},\n EnsureKafkaTopic.REPLICATION_ASSIGNMENT: os.getenv(\n MobioEnvironment.JB_BROKER_ID_ASSIGN\n )\n },\n # TEST WITHOUT manual config\n {\n EnsureKafkaTopic.TOPIC_NAME: \"giang-test4\",\n EnsureKafkaTopic.NUM_PARTITIONS: 1,\n },\n ]\n )\n```\n\n\n# Producer\n```python\nfrom mobio.libs.kafka_lib.helpers.kafka_producer_manager import KafkaProducerManager\nKafkaProducerManager().flush_message(topic=\"test\", key=\"uuid\", value={\"test\":1})\n```\n\n# Consumer normal\n```python\nimport os\nfrom pymongo import MongoClient\nfrom mobio.libs.kafka_lib.helpers.kafka_consumer_manager import BaseKafkaConsumer\n\n\nclass TestConsumer(BaseKafkaConsumer):\n def message_handle(self, data):\n print(\"TestConsumer: data: {}\".format(data))\n\n\nif __name__ == \"__main__\":\n url_connection = os.getenv('TEST_MONGO_URI')\n client_mongo = MongoClient(url_connection, connect=False)\n\n TestConsumer(topic_name=\"test\", group_id=\"test\", client_mongo=client_mongo, retryable=True, lst_subscribe_topic=['test', 'test1'], retry_topic='test1')\n```\n\n# Consumer with bloom filter\n```python\nimport os\nfrom pymongo import MongoClient\nfrom mobio.libs.kafka_lib.helpers.kafka_consumer_manager import BaseKafkaConsumer\n\n\nclass TestConsumer(BaseKafkaConsumer):\n def check_msg_is_processed(self, payload: dict) -> bool:\n msg_id = payload.get(\"msg_id\")\n exists = self.client_mongo.get_database(\"test_db\")[\"test_collection\"].find_one({\"id\": msg_id})\n return True if exists else False\n \n def message_handle(self, data):\n print(\"TestConsumer: data: {}\".format(data))\n\n\nif __name__ == \"__main__\":\n url_connection = os.getenv('TEST_MONGO_URI')\n client_mongo = MongoClient(url_connection, connect=False)\n\n TestConsumer(topic_name=\"test\", group_id=\"test\", client_mongo=client_mongo, retryable=True, lst_subscribe_topic=['test', 'test1'], retry_topic='test1', enable_bloom=True, auto_commit=True)\n```\n\n# change logs\n* 1.0.1 (2024-01-05)\n * Truy\u1ec1n redis-client trong parameters kh\u1edfi t\u1ea1o consumer, n\u1ebfu kh\u00f4ng truy\u1ec1n th\u00ec t\u1ea1o redis-client t\u1eeb URI m\u1eb7c \u0111\u1ecbnh.\n* 1.0.0 (2023-12-13):\n * Cho ph\u00e9p c\u1ea5u h\u00ecnh auto/manual commit\n * S\u1eed d\u1ee5ng bloom-filter \u0111\u1ec3 check duplicate message trong qu\u00e1 tr\u00ecnh consume message\n * Y\u00eau c\u1ea7u s\u1eed d\u1ee5ng c\u00f9ng redis-bloom n\u1ebfu enable bloom\n* 0.1.8 (2023-08-25):\n * th\u00eam option lst_subscribe_topic\n * N\u1ebfu truy\u1ec1n lst_subscribe_topic th\u00ec b\u1ecf qua topic_name\n * N\u1ebfu ko truy\u1ec1n lst_subscribe_topic th\u00ec l\u1ea5y topic_name l\u00e0m topic subscribe\n * th\u00eam option retry_topic\n * N\u1ebfu kh\u00f4ng ch\u1ec9 \u0111\u1ecbnh s\u1ebd l\u1ea5y topic \u0111\u1ea7u ti\u00ean c\u1ee7a list subscribe\n * N\u1ebfu ko truy\u1ec1n list subscribe th\u00ec l\u1ea5y topic_name l\u00e0m retry\n* 0.1.7 (2023-04-18):\n * cho ph\u00e9p truy\u1ec1n v\u00e0o broker khi kh\u1edfi t\u1ea1o consumer, producer. N\u1ebfu kh\u00f4ng truy\u1ec1n th\u00ec s\u1ebd l\u1ea5y m\u1eb7c \u0111\u1ecbnh trong ENV KAFKA_BROKER\n * singleton producer v\u1edbi *args && **kwargs\n * b\u1ecf requirements \"m-singleton>=0.3\", \"m-caching>=0.1.8\"\n* 0.1.6 (2023-04-05):\n * k8s liveness v2\n* 0.1.5 (2022-10-12):\n * raise exception khi close kafka, \u0111\u1ea3m b\u1ea3o k8s s\u1ebd restart l\u1ea1i pod\n* 0.1.4.2 (2022-09-19):\n * fix bug topic \u0111\u01b0\u1ee3c t\u1ea1o \u0111ang random v\u00e0o c\u1ea3 c\u00e1c broker isolate cho module kh\u00e1c.\n* 0.1.4.1 (2022-09-19):\n * cho ph\u00e9p truy\u1ec1n v\u00e0o brokers m\u00e0 topic n\u00e0y s\u1ebd \u0111\u01b0\u1ee3c replicate\n* 0.1.4 (2022-08-23):\n * B\u1ed5 sung th\u00eam ph\u1ea7n l\u01b0u mapping pod-name v\u00e0 client-id v\u00e0o file \n* 0.1.3:\n * M\u1eb7c \u0111\u1ecbnh compress messages \u1edf \u0111\u1ea7u producer\n * Function create kafka topic h\u1ed7 tr\u1ee3 truy\u1ec1n s\u1ed1 partitions v\u00e0 settings c\u1ee7a topic \n\n* 0.1.1: fix bug init Config\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Mobio Kafka SDK",
"version": "1.0.1",
"project_urls": {
"Homepage": "https://github.com/mobiovn",
"Source": "https://github.com/mobiovn"
},
"split_keywords": [
"mobio",
"kafka",
"m-kafka"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "690406ff7d4740b080cb2875e9b9e8a2aa058620478de1f648f2bc5365926b1a",
"md5": "206dcd9d985d607407ea2062216f4ed0",
"sha256": "4acfe988c9f4671a236d63a9616e00c0a7d93a5fc5a7ecfb8917105ba17d41bd"
},
"downloads": -1,
"filename": "m-kafka-sdk-v2-1.0.1.tar.gz",
"has_sig": false,
"md5_digest": "206dcd9d985d607407ea2062216f4ed0",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3",
"size": 15497,
"upload_time": "2024-01-11T10:08:09",
"upload_time_iso_8601": "2024-01-11T10:08:09.334802Z",
"url": "https://files.pythonhosted.org/packages/69/04/06ff7d4740b080cb2875e9b9e8a2aa058620478de1f648f2bc5365926b1a/m-kafka-sdk-v2-1.0.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-01-11 10:08:09",
"github": false,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"lcname": "m-kafka-sdk-v2"
}