[](http://www.apache.org/licenses/LICENSE-2.0.html)

[](https://github.com/memiiso/pydbzengine/actions/workflows/release.yml)
# pydbzengine
A Python module to use [Debezium Engine](https://debezium.io/) in python. Consume Database CDC events using python.
Java integration is using [Pyjnius](https://pyjnius.readthedocs.io/en/latest/), It is a Python library for accessing
Java classes
## Installation
install:
```shell
pip install pydbzengine
# install from github:
pip install https://github.com/memiiso/pydbzengine/archive/master.zip --upgrade --user
```
## How to Use
First install the packages, `pip install pydbzengine[dev]`
```python
from typing import List
from pydbzengine import ChangeEvent, BasePythonChangeHandler
from pydbzengine import Properties, DebeziumJsonEngine
class PrintChangeHandler(BasePythonChangeHandler):
"""
A custom change event handler class.
This class processes batches of Debezium change events received from the engine.
The `handleJsonBatch` method is where you implement your logic for consuming
and processing these events. Currently, it prints basic information about
each event to the console.
"""
def handleJsonBatch(self, records: List[ChangeEvent]):
"""
Handles a batch of Debezium change events.
This method is called by the Debezium engine with a list of ChangeEvent objects.
Change this method to implement your desired processing logic. For example,
you might parse the event data, transform it, and load it into a database or
other destination.
Args:
records: A list of ChangeEvent objects representing the changes captured by Debezium.
"""
print(f"Received {len(records)} records")
for record in records:
print(f"destination: {record.destination()}")
print(f"key: {record.key()}")
print(f"value: {record.value()}")
print("--------------------------------------")
if __name__ == '__main__':
props = Properties()
props.setProperty("name", "engine")
props.setProperty("snapshot.mode", "initial_only")
# Add further Debezium connector configuration properties here. For example:
# props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector")
# props.setProperty("database.hostname", "your_database_host")
# props.setProperty("database.port", "3306")
# Create a DebeziumJsonEngine instance, passing the configuration properties and the custom change event handler.
engine = DebeziumJsonEngine(properties=props, handler=PrintChangeHandler())
# Start the Debezium engine to begin consuming and processing change events.
engine.run()
```
#### How to consume events with dlt
For the full code please see [dlt_consuming.py](pydbzengine/examples/dlt_consuming.py)
https://github.com/memiiso/pydbzengine/blob/c4a88228aa66a2dc41b3dcc192615b1357326b66/pydbzengine/examples/dlt_consuming.py#L92-L153
### Contributors
<a href="https://github.com/memiiso/pydbzengine/graphs/contributors">
<img src="https://contributors-img.web.app/image?repo=memiiso/pydbzengine" />
</a>
Raw data
{
"_id": null,
"home_page": null,
"name": "pydbzengine",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.8",
"maintainer_email": null,
"keywords": "Debezium, Replication, CDC",
"author": "Memiiso Organization",
"author_email": null,
"download_url": "https://files.pythonhosted.org/packages/39/52/c9f471e79fb577728ea21577fbb01eac2f68ee90b16743fa61328546bf0a/pydbzengine-3.0.7.2.tar.gz",
"platform": null,
"description": "[](http://www.apache.org/licenses/LICENSE-2.0.html)\n\n[](https://github.com/memiiso/pydbzengine/actions/workflows/release.yml)\n# pydbzengine\n\nA Python module to use [Debezium Engine](https://debezium.io/) in python. Consume Database CDC events using python.\n\nJava integration is using [Pyjnius](https://pyjnius.readthedocs.io/en/latest/), It is a Python library for accessing\nJava classes\n\n## Installation\n\ninstall:\n\n```shell\npip install pydbzengine\n# install from github:\npip install https://github.com/memiiso/pydbzengine/archive/master.zip --upgrade --user\n```\n\n## How to Use\n\nFirst install the packages, `pip install pydbzengine[dev]`\n\n```python\nfrom typing import List\nfrom pydbzengine import ChangeEvent, BasePythonChangeHandler\nfrom pydbzengine import Properties, DebeziumJsonEngine\n\n\nclass PrintChangeHandler(BasePythonChangeHandler):\n \"\"\"\n A custom change event handler class.\n\n This class processes batches of Debezium change events received from the engine.\n The `handleJsonBatch` method is where you implement your logic for consuming\n and processing these events. Currently, it prints basic information about\n each event to the console.\n \"\"\"\n\n def handleJsonBatch(self, records: List[ChangeEvent]):\n \"\"\"\n Handles a batch of Debezium change events.\n\n This method is called by the Debezium engine with a list of ChangeEvent objects.\n Change this method to implement your desired processing logic. For example,\n you might parse the event data, transform it, and load it into a database or\n other destination.\n\n Args:\n records: A list of ChangeEvent objects representing the changes captured by Debezium.\n \"\"\"\n print(f\"Received {len(records)} records\")\n for record in records:\n print(f\"destination: {record.destination()}\")\n print(f\"key: {record.key()}\")\n print(f\"value: {record.value()}\")\n print(\"--------------------------------------\")\n\n\nif __name__ == '__main__':\n props = Properties()\n props.setProperty(\"name\", \"engine\")\n props.setProperty(\"snapshot.mode\", \"initial_only\")\n # Add further Debezium connector configuration properties here. For example:\n # props.setProperty(\"connector.class\", \"io.debezium.connector.mysql.MySqlConnector\")\n # props.setProperty(\"database.hostname\", \"your_database_host\")\n # props.setProperty(\"database.port\", \"3306\")\n\n # Create a DebeziumJsonEngine instance, passing the configuration properties and the custom change event handler.\n engine = DebeziumJsonEngine(properties=props, handler=PrintChangeHandler())\n\n # Start the Debezium engine to begin consuming and processing change events.\n engine.run()\n\n```\n#### How to consume events with dlt \nFor the full code please see [dlt_consuming.py](pydbzengine/examples/dlt_consuming.py)\n\nhttps://github.com/memiiso/pydbzengine/blob/c4a88228aa66a2dc41b3dcc192615b1357326b66/pydbzengine/examples/dlt_consuming.py#L92-L153\n\n### Contributors\n\n<a href=\"https://github.com/memiiso/pydbzengine/graphs/contributors\">\n <img src=\"https://contributors-img.web.app/image?repo=memiiso/pydbzengine\" />\n</a>\n\n",
"bugtrack_url": null,
"license": "Apache License 2.0",
"summary": "Python Debezium Embedded Engine",
"version": "3.0.7.2",
"project_urls": {
"Documentation": "https://github.com/memiiso/pydbzengine",
"Homepage": "https://github.com/memiiso/pydbzengine",
"Repository": "https://github.com/memiiso/pydbzengine"
},
"split_keywords": [
"debezium",
" replication",
" cdc"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "43a88a16c2b9418e0e10ab27753dbf6bbc88b8c8dbc9d2368d3a6e1e68feb414",
"md5": "bcd411e6feb44587961f82daae6850a7",
"sha256": "2034ef9add3388a7495d80af4087f9af2cb4ef446b1394e8508faea63ff8c035"
},
"downloads": -1,
"filename": "pydbzengine-3.0.7.2-py3-none-any.whl",
"has_sig": false,
"md5_digest": "bcd411e6feb44587961f82daae6850a7",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.8",
"size": 43577434,
"upload_time": "2025-02-01T11:31:37",
"upload_time_iso_8601": "2025-02-01T11:31:37.936346Z",
"url": "https://files.pythonhosted.org/packages/43/a8/8a16c2b9418e0e10ab27753dbf6bbc88b8c8dbc9d2368d3a6e1e68feb414/pydbzengine-3.0.7.2-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "3952c9f471e79fb577728ea21577fbb01eac2f68ee90b16743fa61328546bf0a",
"md5": "a52390d3866d4b18d394aadd8e5d0e7b",
"sha256": "0655e83b6b82e863d482bd004ac28b4c0fc21428b626eebc5bc2bfbacfb06968"
},
"downloads": -1,
"filename": "pydbzengine-3.0.7.2.tar.gz",
"has_sig": false,
"md5_digest": "a52390d3866d4b18d394aadd8e5d0e7b",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.8",
"size": 43559312,
"upload_time": "2025-02-01T11:31:45",
"upload_time_iso_8601": "2025-02-01T11:31:45.430473Z",
"url": "https://files.pythonhosted.org/packages/39/52/c9f471e79fb577728ea21577fbb01eac2f68ee90b16743fa61328546bf0a/pydbzengine-3.0.7.2.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-02-01 11:31:45",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "memiiso",
"github_project": "pydbzengine",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "pydbzengine"
}