- **Thư viện Consumer của JB. Chạy consumer ở Process, phù hợp cho môi trường K8s** :
- **NOTE: SDK sử dụng confluent-kafka depend requirements của module. SDK không có install_requires confluent-kafka :)** ::cheers::
* Bản này yêu cầu confluent-kafka >= 1.7.0 & requests>=2.25.1
# create topic
Note:
- EnsureKafkaTopic.REPLICATION_ASSIGNMENT là giá trị các brokers isolate của từng module. Nếu giá trị này chưa được DevOps khởi tạo nó sẽ được lấy giá trị từ *DEFAULT_BROKER_ID_ASSIGN*
```python
from mobio.libs.kafka_lib.helpers.ensure_kafka_topic import EnsureKafkaTopic
import os
from mobio.libs.kafka_lib import MobioEnvironment
EnsureKafkaTopic().create_kafka_topics(
[
# TEST WITH SET replica_assignment
{
EnsureKafkaTopic.TOPIC_NAME: "giang-test1",
EnsureKafkaTopic.NUM_PARTITIONS: 8,
EnsureKafkaTopic.CONFIG: {"compression.type": "snappy"},
EnsureKafkaTopic.REPLICATION_ASSIGNMENT: os.getenv(
MobioEnvironment.SALE_BROKER_ID_ASSIGN # SALE_BROKER_ID_ASSIGN
) # danh sách các broker_ids "10,20,30" ,
},
# TEST WITH SET replica_factor
{
EnsureKafkaTopic.TOPIC_NAME: "giang-test2",
EnsureKafkaTopic.REPLICATION_ASSIGNMENT: os.getenv(
MobioEnvironment.PROFILING_BROKER_ID_ASSIGN
)
},
# TEST WITH SET config
{
EnsureKafkaTopic.TOPIC_NAME: "giang-test3",
EnsureKafkaTopic.NUM_PARTITIONS: 1,
EnsureKafkaTopic.CONFIG: {"compression.type": "snappy"},
EnsureKafkaTopic.REPLICATION_ASSIGNMENT: os.getenv(
MobioEnvironment.JB_BROKER_ID_ASSIGN
)
},
# TEST WITHOUT manual config
{
EnsureKafkaTopic.TOPIC_NAME: "giang-test4",
EnsureKafkaTopic.NUM_PARTITIONS: 1,
},
]
)
```
# Producer
```python
from mobio.libs.kafka_lib.helpers.kafka_producer_manager import KafkaProducerManager
KafkaProducerManager().flush_message(topic="test", key="uuid", value={"test":1})
```
# Consumer normal
```python
import os
from pymongo import MongoClient
from mobio.libs.kafka_lib.helpers.kafka_consumer_manager import BaseKafkaConsumer
class TestConsumer(BaseKafkaConsumer):
def message_handle(self, data):
print("TestConsumer: data: {}".format(data))
if __name__ == "__main__":
url_connection = os.getenv('TEST_MONGO_URI')
client_mongo = MongoClient(url_connection, connect=False)
TestConsumer(topic_name="test", group_id="test", client_mongo=client_mongo, retryable=True, lst_subscribe_topic=['test', 'test1'], retry_topic='test1')
```
# Consumer with bloom filter
```python
import os
from pymongo import MongoClient
from mobio.libs.kafka_lib.helpers.kafka_consumer_manager import BaseKafkaConsumer
class TestConsumer(BaseKafkaConsumer):
def check_msg_is_processed(self, payload: dict) -> bool:
msg_id = payload.get("msg_id")
exists = self.client_mongo.get_database("test_db")["test_collection"].find_one({"id": msg_id})
return True if exists else False
def message_handle(self, data):
print("TestConsumer: data: {}".format(data))
if __name__ == "__main__":
url_connection = os.getenv('TEST_MONGO_URI')
client_mongo = MongoClient(url_connection, connect=False)
TestConsumer(topic_name="test", group_id="test", client_mongo=client_mongo, retryable=True, lst_subscribe_topic=['test', 'test1'], retry_topic='test1', enable_bloom=True, auto_commit=True)
```
# change logs
* 1.0.6 (2024-10-25)
* Fix lỗi bỏ qua message pingpong
* 1.0.5 (2024-09-20)
* Bỏ qua message pingpong
* 1.0.4 (2024-07-29)
* Sử dụng redisbloom<=0.4.0 do redisbloom==0.4.1 bắt buộc redis==3.5.3
* 1.0.3 (2024-07-23)
* Tự thêm expire_time vào tất cả các bản tin requeue, các bản tin này sẽ tự động expire sau 7 ngày (bất kể bản tin có được requeue hay chưa).
* Fix issue init bloom
* Chuyển sang cuckoo-filter thay vì bloom-filter
* tự động xóa các message đã được commit khỏi cuckoo (tránh phồng redis-cache)
* 1.0.2 (2024-05-20)
* Build 1.0.1-rc3 thành 1.0.2, không sửa logic
* 1.0.1-rc3 (2024-03-20)
* Batch consume message
* Bỏ log "no offset to commit"
* Sử dụng redisbloom<=0.4.0 do redisbloom==0.4.1 bắt buộc redis==3.5.3
* 1.0.1-rc2 (2024-03-01)
* Batch consume message
* Fix lỗi không commit message khi xử lý xong
* 1.0.1-rc1 (2024-02-15)
* Batch consume message
* Ở phiên bản hiện tại, batch consumer chưa support requeue.
* 1.0.1 (2024-01-05)
* Truyền redis-client trong parameters khởi tạo consumer, nếu không truyền thì tạo redis-client từ URI mặc định.
* 1.0.0 (2023-12-13):
* Cho phép cấu hình auto/manual commit
* Sử dụng bloom-filter để check duplicate message trong quá trình consume message
* Yêu cầu sử dụng cùng redis-bloom nếu enable bloom
* 0.1.8 (2023-08-25):
* thêm option lst_subscribe_topic
* Nếu truyền lst_subscribe_topic thì bỏ qua topic_name
* Nếu ko truyền lst_subscribe_topic thì lấy topic_name làm topic subscribe
* thêm option retry_topic
* Nếu không chỉ định sẽ lấy topic đầu tiên của list subscribe
* Nếu ko truyền list subscribe thì lấy topic_name làm retry
* 0.1.7 (2023-04-18):
* cho phép truyền vào broker khi khởi tạo consumer, producer. Nếu không truyền thì sẽ lấy mặc định trong ENV KAFKA_BROKER
* singleton producer với *args && **kwargs
* bỏ requirements "m-singleton>=0.3", "m-caching>=0.1.8"
* 0.1.6 (2023-04-05):
* k8s liveness v2
* 0.1.5 (2022-10-12):
* raise exception khi close kafka, đảm bảo k8s sẽ restart lại pod
* 0.1.4.2 (2022-09-19):
* fix bug topic được tạo đang random vào cả các broker isolate cho module khác.
* 0.1.4.1 (2022-09-19):
* cho phép truyền vào brokers mà topic này sẽ được replicate
* 0.1.4 (2022-08-23):
* Bổ sung thêm phần lưu mapping pod-name và client-id vào file
* 0.1.3:
* Mặc định compress messages ở đầu producer
* Function create kafka topic hỗ trợ truyền số partitions và settings của topic
* 0.1.1: fix bug init Config
Raw data
{
"_id": null,
"home_page": "https://github.com/mobiovn",
"name": "m-kafka-sdk-v2",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3",
"maintainer_email": null,
"keywords": "mobio, kafka, m-kafka",
"author": "MOBIO",
"author_email": "contact@mobio.vn",
"download_url": "https://files.pythonhosted.org/packages/63/89/bc97ca9796deee6c07cf68e9d445fb6f373fefec324b767eb4213728472f/m_kafka_sdk_v2-1.0.6.tar.gz",
"platform": null,
"description": "- **Th\u01b0 vi\u1ec7n Consumer c\u1ee7a JB. Ch\u1ea1y consumer \u1edf Process, ph\u00f9 h\u1ee3p cho m\u00f4i tr\u01b0\u1eddng K8s** :\n\n- **NOTE: SDK s\u1eed d\u1ee5ng confluent-kafka depend requirements c\u1ee7a module. SDK kh\u00f4ng c\u00f3 install_requires confluent-kafka :)** ::cheers::\n* B\u1ea3n n\u00e0y y\u00eau c\u1ea7u confluent-kafka >= 1.7.0 & requests>=2.25.1\n# create topic\nNote:\n- EnsureKafkaTopic.REPLICATION_ASSIGNMENT l\u00e0 gi\u00e1 tr\u1ecb c\u00e1c brokers isolate c\u1ee7a t\u1eebng module. N\u1ebfu gi\u00e1 tr\u1ecb n\u00e0y ch\u01b0a \u0111\u01b0\u1ee3c DevOps kh\u1edfi t\u1ea1o n\u00f3 s\u1ebd \u0111\u01b0\u1ee3c l\u1ea5y gi\u00e1 tr\u1ecb t\u1eeb *DEFAULT_BROKER_ID_ASSIGN* \n```python\nfrom mobio.libs.kafka_lib.helpers.ensure_kafka_topic import EnsureKafkaTopic\nimport os\nfrom mobio.libs.kafka_lib import MobioEnvironment\n\nEnsureKafkaTopic().create_kafka_topics(\n [\n # TEST WITH SET replica_assignment\n {\n EnsureKafkaTopic.TOPIC_NAME: \"giang-test1\",\n EnsureKafkaTopic.NUM_PARTITIONS: 8,\n EnsureKafkaTopic.CONFIG: {\"compression.type\": \"snappy\"},\n EnsureKafkaTopic.REPLICATION_ASSIGNMENT: os.getenv(\n MobioEnvironment.SALE_BROKER_ID_ASSIGN # SALE_BROKER_ID_ASSIGN\n ) # danh s\u00e1ch c\u00e1c broker_ids \"10,20,30\" ,\n },\n # TEST WITH SET replica_factor\n {\n EnsureKafkaTopic.TOPIC_NAME: \"giang-test2\",\n EnsureKafkaTopic.REPLICATION_ASSIGNMENT: os.getenv(\n MobioEnvironment.PROFILING_BROKER_ID_ASSIGN\n )\n },\n # TEST WITH SET config\n {\n EnsureKafkaTopic.TOPIC_NAME: \"giang-test3\",\n EnsureKafkaTopic.NUM_PARTITIONS: 1,\n EnsureKafkaTopic.CONFIG: {\"compression.type\": \"snappy\"},\n EnsureKafkaTopic.REPLICATION_ASSIGNMENT: os.getenv(\n MobioEnvironment.JB_BROKER_ID_ASSIGN\n )\n },\n # TEST WITHOUT manual config\n {\n EnsureKafkaTopic.TOPIC_NAME: \"giang-test4\",\n EnsureKafkaTopic.NUM_PARTITIONS: 1,\n },\n ]\n )\n```\n\n\n# Producer\n```python\nfrom mobio.libs.kafka_lib.helpers.kafka_producer_manager import KafkaProducerManager\nKafkaProducerManager().flush_message(topic=\"test\", key=\"uuid\", value={\"test\":1})\n```\n\n# Consumer normal\n```python\nimport os\nfrom pymongo import MongoClient\nfrom mobio.libs.kafka_lib.helpers.kafka_consumer_manager import BaseKafkaConsumer\n\n\nclass TestConsumer(BaseKafkaConsumer):\n def message_handle(self, data):\n print(\"TestConsumer: data: {}\".format(data))\n\n\nif __name__ == \"__main__\":\n url_connection = os.getenv('TEST_MONGO_URI')\n client_mongo = MongoClient(url_connection, connect=False)\n\n TestConsumer(topic_name=\"test\", group_id=\"test\", client_mongo=client_mongo, retryable=True, lst_subscribe_topic=['test', 'test1'], retry_topic='test1')\n```\n\n# Consumer with bloom filter\n```python\nimport os\nfrom pymongo import MongoClient\nfrom mobio.libs.kafka_lib.helpers.kafka_consumer_manager import BaseKafkaConsumer\n\n\nclass TestConsumer(BaseKafkaConsumer):\n def check_msg_is_processed(self, payload: dict) -> bool:\n msg_id = payload.get(\"msg_id\")\n exists = self.client_mongo.get_database(\"test_db\")[\"test_collection\"].find_one({\"id\": msg_id})\n return True if exists else False\n \n def message_handle(self, data):\n print(\"TestConsumer: data: {}\".format(data))\n\n\nif __name__ == \"__main__\":\n url_connection = os.getenv('TEST_MONGO_URI')\n client_mongo = MongoClient(url_connection, connect=False)\n\n TestConsumer(topic_name=\"test\", group_id=\"test\", client_mongo=client_mongo, retryable=True, lst_subscribe_topic=['test', 'test1'], retry_topic='test1', enable_bloom=True, auto_commit=True)\n```\n\n# change logs\n* 1.0.6 (2024-10-25)\n * Fix l\u1ed7i b\u1ecf qua message pingpong\n\n* 1.0.5 (2024-09-20)\n * B\u1ecf qua message pingpong\n\n* 1.0.4 (2024-07-29)\n * S\u1eed d\u1ee5ng redisbloom<=0.4.0 do redisbloom==0.4.1 b\u1eaft bu\u1ed9c redis==3.5.3\n\n* 1.0.3 (2024-07-23)\n * T\u1ef1 th\u00eam expire_time v\u00e0o t\u1ea5t c\u1ea3 c\u00e1c b\u1ea3n tin requeue, c\u00e1c b\u1ea3n tin n\u00e0y s\u1ebd t\u1ef1 \u0111\u1ed9ng expire sau 7 ng\u00e0y (b\u1ea5t k\u1ec3 b\u1ea3n tin c\u00f3 \u0111\u01b0\u1ee3c requeue hay ch\u01b0a).\n * Fix issue init bloom\n * Chuy\u1ec3n sang cuckoo-filter thay v\u00ec bloom-filter\n * t\u1ef1 \u0111\u1ed9ng x\u00f3a c\u00e1c message \u0111\u00e3 \u0111\u01b0\u1ee3c commit kh\u1ecfi cuckoo (tr\u00e1nh ph\u1ed3ng redis-cache)\n\n* 1.0.2 (2024-05-20)\n * Build 1.0.1-rc3 th\u00e0nh 1.0.2, kh\u00f4ng s\u1eeda logic\n\n* 1.0.1-rc3 (2024-03-20)\n * Batch consume message\n * B\u1ecf log \"no offset to commit\"\n * S\u1eed d\u1ee5ng redisbloom<=0.4.0 do redisbloom==0.4.1 b\u1eaft bu\u1ed9c redis==3.5.3\n\n* 1.0.1-rc2 (2024-03-01)\n * Batch consume message\n * Fix l\u1ed7i kh\u00f4ng commit message khi x\u1eed l\u00fd xong\n\n* 1.0.1-rc1 (2024-02-15)\n * Batch consume message\n * \u01a0\u0309 phi\u00ean ba\u0309n hi\u00ea\u0323n ta\u0323i, batch consumer ch\u01b0a support requeue. \n\n* 1.0.1 (2024-01-05)\n * Truy\u1ec1n redis-client trong parameters kh\u1edfi t\u1ea1o consumer, n\u1ebfu kh\u00f4ng truy\u1ec1n th\u00ec t\u1ea1o redis-client t\u1eeb URI m\u1eb7c \u0111\u1ecbnh.\n* 1.0.0 (2023-12-13):\n * Cho ph\u00e9p c\u1ea5u h\u00ecnh auto/manual commit\n * S\u1eed d\u1ee5ng bloom-filter \u0111\u1ec3 check duplicate message trong qu\u00e1 tr\u00ecnh consume message\n * Y\u00eau c\u1ea7u s\u1eed d\u1ee5ng c\u00f9ng redis-bloom n\u1ebfu enable bloom\n* 0.1.8 (2023-08-25):\n * th\u00eam option lst_subscribe_topic\n * N\u1ebfu truy\u1ec1n lst_subscribe_topic th\u00ec b\u1ecf qua topic_name\n * N\u1ebfu ko truy\u1ec1n lst_subscribe_topic th\u00ec l\u1ea5y topic_name l\u00e0m topic subscribe\n * th\u00eam option retry_topic\n * N\u1ebfu kh\u00f4ng ch\u1ec9 \u0111\u1ecbnh s\u1ebd l\u1ea5y topic \u0111\u1ea7u ti\u00ean c\u1ee7a list subscribe\n * N\u1ebfu ko truy\u1ec1n list subscribe th\u00ec l\u1ea5y topic_name l\u00e0m retry\n* 0.1.7 (2023-04-18):\n * cho ph\u00e9p truy\u1ec1n v\u00e0o broker khi kh\u1edfi t\u1ea1o consumer, producer. N\u1ebfu kh\u00f4ng truy\u1ec1n th\u00ec s\u1ebd l\u1ea5y m\u1eb7c \u0111\u1ecbnh trong ENV KAFKA_BROKER\n * singleton producer v\u1edbi *args && **kwargs\n * b\u1ecf requirements \"m-singleton>=0.3\", \"m-caching>=0.1.8\"\n* 0.1.6 (2023-04-05):\n * k8s liveness v2\n* 0.1.5 (2022-10-12):\n * raise exception khi close kafka, \u0111\u1ea3m b\u1ea3o k8s s\u1ebd restart l\u1ea1i pod\n* 0.1.4.2 (2022-09-19):\n * fix bug topic \u0111\u01b0\u1ee3c t\u1ea1o \u0111ang random v\u00e0o c\u1ea3 c\u00e1c broker isolate cho module kh\u00e1c.\n* 0.1.4.1 (2022-09-19):\n * cho ph\u00e9p truy\u1ec1n v\u00e0o brokers m\u00e0 topic n\u00e0y s\u1ebd \u0111\u01b0\u1ee3c replicate\n* 0.1.4 (2022-08-23):\n * B\u1ed5 sung th\u00eam ph\u1ea7n l\u01b0u mapping pod-name v\u00e0 client-id v\u00e0o file \n* 0.1.3:\n * M\u1eb7c \u0111\u1ecbnh compress messages \u1edf \u0111\u1ea7u producer\n * Function create kafka topic h\u1ed7 tr\u1ee3 truy\u1ec1n s\u1ed1 partitions v\u00e0 settings c\u1ee7a topic \n\n* 0.1.1: fix bug init Config\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Mobio Kafka SDK",
"version": "1.0.6",
"project_urls": {
"Homepage": "https://github.com/mobiovn",
"Source": "https://github.com/mobiovn"
},
"split_keywords": [
"mobio",
" kafka",
" m-kafka"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "6389bc97ca9796deee6c07cf68e9d445fb6f373fefec324b767eb4213728472f",
"md5": "0b6db31cda446d90db37be9e09ae6b24",
"sha256": "ab243f129b431d4a859700093fd956d1c7d6a567f5e2b5213d3cf724ae18945c"
},
"downloads": -1,
"filename": "m_kafka_sdk_v2-1.0.6.tar.gz",
"has_sig": false,
"md5_digest": "0b6db31cda446d90db37be9e09ae6b24",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3",
"size": 18062,
"upload_time": "2024-10-25T10:55:17",
"upload_time_iso_8601": "2024-10-25T10:55:17.024108Z",
"url": "https://files.pythonhosted.org/packages/63/89/bc97ca9796deee6c07cf68e9d445fb6f373fefec324b767eb4213728472f/m_kafka_sdk_v2-1.0.6.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-10-25 10:55:17",
"github": false,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"lcname": "m-kafka-sdk-v2"
}