# Clickhouse ETL Python SDK
<p align="left">
<a target="_blank" href="https://pypi.python.org/pypi/glassflow-clickhouse-etl">
<img src="https://img.shields.io/pypi/v/glassflow-clickhouse-etl.svg?labelColor=&color=e69e3a">
</a>
<a target="_blank" href="https://github.com/glassflow/clickhouse-etl-py-sdk/blob/main/LICENSE">
<img src="https://img.shields.io/pypi/l/glassflow-clickhouse-etl.svg?labelColor=&color=e69e3a">
</a>
<a target="_blank" href="https://pypi.python.org/pypi/glassflow-clickhouse-etl">
<img src="https://img.shields.io/pypi/pyversions/glassflow-clickhouse-etl.svg?labelColor=&color=e69e3a">
</a>
<br />
<a target="_blank" href="(https://github.com/glassflow/clickhouse-etl-py-sdk/actions">
<img src="https://github.com/glassflow/clickhouse-etl-py-sdk/workflows/Test/badge.svg?labelColor=&color=e69e3a">
</a>
<!-- Pytest Coverage Comment:Begin -->
<img src=https://img.shields.io/badge/coverage-94%25-brightgreen>
<!-- Pytest Coverage Comment:End -->
</p>
A Python SDK for creating and managing data pipelines between Kafka and ClickHouse.
## Features
- Create and manage data pipelines between Kafka and ClickHouse
- Deduplication of events during a time window based on a key
- Temporal joins between topics based on a common key with a given time window
- Schema validation and configuration management
## Installation
```bash
pip install glassflow-clickhouse-etl
```
## Quick Start
### Initialize client
```python
from glassflow_clickhouse_etl import Client
# Initialize GlassFlow client
client = Client(host="your-glassflow-etl-url")
```
### Create a pipeline
```python
pipeline_config = {
"pipeline_id": "deduplication-demo-pipeline",
"source": {
"type": "kafka",
"provider": "confluent",
"connection_params": {
"brokers": [
"kafka:9093"
],
"protocol": "PLAINTEXT",
"skip_auth": True
},
"topics": [
{
"consumer_group_initial_offset": "latest",
"name": "users",
"schema": {
"type": "json",
"fields": [
{
"name": "event_id",
"type": "string"
},
{
"name": "user_id",
"type": "string"
},
{
"name": "name",
"type": "string"
},
{
"name": "email",
"type": "string"
},
{
"name": "created_at",
"type": "string"
}
]
},
"deduplication": {
"enabled": True,
"id_field": "event_id",
"id_field_type": "string",
"time_window": "1h"
}
}
]
},
"join": {
"enabled": False
},
"sink": {
"type": "clickhouse",
"provider": "localhost",
"host": "clickhouse",
"port": "9000",
"database": "default",
"username": "default",
"password": "c2VjcmV0",
"secure": False,
"max_batch_size": 1000,
"max_delay_time": "30s",
"table": "users_dedup",
"table_mapping": [
{
"source_id": "users",
"field_name": "event_id",
"column_name": "event_id",
"column_type": "UUID"
},
{
"source_id": "users",
"field_name": "user_id",
"column_name": "user_id",
"column_type": "UUID"
},
{
"source_id": "users",
"field_name": "created_at",
"column_name": "created_at",
"column_type": "DateTime"
},
{
"source_id": "users",
"field_name": "name",
"column_name": "name",
"column_type": "String"
},
{
"source_id": "users",
"field_name": "email",
"column_name": "email",
"column_type": "String"
}
]
}
}
# Create a pipeline
pipeline = client.create_pipeline(pipeline_config)
```
## Get pipeline
```python
# Get a pipeline by ID
pipeline = client.get_pipeline("my-pipeline-id")
```
### List pipelines
```python
pipelines = client.list_pipelines()
for pipeline in pipelines:
print(f"Pipeline ID: {pipeline['pipeline_id']}")
print(f"Name: {pipeline['name']}")
print(f"Transformation Type: {pipeline['transformation_type']}")
print(f"Created At: {pipeline['created_at']}")
print(f"State: {pipeline['state']}")
```
### Delete pipeline
```python
# Delete a pipeline
client.delete_pipeline("my-pipeline-id")
# Or delete via pipeline instance
pipeline.delete()
```
### Update pipeline
```python
# Update the sink table name
config_patch = {
"sink": {
"table": "new_table_name"
}
}
pipeline = client.get_pipeline("my-pipeline-id")
pipeline.update(config_patch)
```
### Pause / Resume pipeline
```python
# Will stop ingesting new messages and finish processing the messages inside the pipeline
pipeline.pause()
# Will resume ingestion
pipeline.resume()
```
## Pipeline Configuration
For detailed information about the pipeline configuration, see [GlassFlow docs](https://docs.glassflow.dev/pipeline/pipeline-configuration).
## Tracking
The SDK includes anonymous usage tracking to help improve the product. Tracking is enabled by default but can be disabled in two ways:
1. Using an environment variable:
```bash
export GF_TRACKING_ENABLED=false
```
2. Programmatically using the `disable_tracking` method:
```python
from glassflow_clickhouse_etl import Client
client = Client(host="my-glassflow-host")
client.disable_tracking()
```
The tracking collects anonymous information about:
- SDK version
- Platform (operating system)
- Python version
- Pipeline ID
- Whether joins or deduplication are enabled
- Kafka security protocol, auth mechanism used and whether authentication is disabled
- Errors during pipeline creation and deletion
## Development
### Setup
1. Clone the repository
2. Create a virtual environment
3. Install dependencies:
```bash
uv venv
source .venv/bin/activate
uv pip install -e .[dev]
```
### Testing
```bash
pytest
```
Raw data
{
"_id": null,
"home_page": null,
"name": "glassflow-clickhouse-etl",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.9",
"maintainer_email": null,
"keywords": "clickhouse, data-engineering, data-pipeline, etl, glassflow, kafka, streaming",
"author": null,
"author_email": "GlassFlow <hello@glassflow.dev>",
"download_url": "https://files.pythonhosted.org/packages/da/e3/0b323f0bfad46d9ca00ac7c428f5ac42665d268ac6ba1d30ff5265d78a1f/glassflow_clickhouse_etl-1.0.0.tar.gz",
"platform": null,
"description": "# Clickhouse ETL Python SDK\n\n<p align=\"left\">\n <a target=\"_blank\" href=\"https://pypi.python.org/pypi/glassflow-clickhouse-etl\">\n <img src=\"https://img.shields.io/pypi/v/glassflow-clickhouse-etl.svg?labelColor=&color=e69e3a\">\n </a>\n <a target=\"_blank\" href=\"https://github.com/glassflow/clickhouse-etl-py-sdk/blob/main/LICENSE\">\n <img src=\"https://img.shields.io/pypi/l/glassflow-clickhouse-etl.svg?labelColor=&color=e69e3a\">\n </a>\n <a target=\"_blank\" href=\"https://pypi.python.org/pypi/glassflow-clickhouse-etl\">\n <img src=\"https://img.shields.io/pypi/pyversions/glassflow-clickhouse-etl.svg?labelColor=&color=e69e3a\">\n </a>\n <br />\n <a target=\"_blank\" href=\"(https://github.com/glassflow/clickhouse-etl-py-sdk/actions\">\n <img src=\"https://github.com/glassflow/clickhouse-etl-py-sdk/workflows/Test/badge.svg?labelColor=&color=e69e3a\">\n </a>\n<!-- Pytest Coverage Comment:Begin -->\n <img src=https://img.shields.io/badge/coverage-94%25-brightgreen>\n<!-- Pytest Coverage Comment:End -->\n</p>\n\nA Python SDK for creating and managing data pipelines between Kafka and ClickHouse.\n\n## Features\n\n- Create and manage data pipelines between Kafka and ClickHouse\n- Deduplication of events during a time window based on a key\n- Temporal joins between topics based on a common key with a given time window\n- Schema validation and configuration management\n\n## Installation\n\n```bash\npip install glassflow-clickhouse-etl\n```\n\n## Quick Start\n\n### Initialize client\n\n```python\nfrom glassflow_clickhouse_etl import Client\n\n# Initialize GlassFlow client\nclient = Client(host=\"your-glassflow-etl-url\")\n```\n\n### Create a pipeline\n\n```python\npipeline_config = {\n \"pipeline_id\": \"deduplication-demo-pipeline\",\n \"source\": {\n \"type\": \"kafka\",\n \"provider\": \"confluent\",\n \"connection_params\": {\n \"brokers\": [\n \"kafka:9093\"\n ],\n \"protocol\": \"PLAINTEXT\",\n \"skip_auth\": True\n },\n \"topics\": [\n {\n \"consumer_group_initial_offset\": \"latest\",\n \"name\": \"users\",\n \"schema\": {\n \"type\": \"json\",\n \"fields\": [\n {\n \"name\": \"event_id\",\n \"type\": \"string\"\n },\n {\n \"name\": \"user_id\",\n \"type\": \"string\"\n },\n {\n \"name\": \"name\",\n \"type\": \"string\"\n },\n {\n \"name\": \"email\",\n \"type\": \"string\"\n },\n {\n \"name\": \"created_at\",\n \"type\": \"string\"\n }\n ]\n },\n \"deduplication\": {\n \"enabled\": True,\n \"id_field\": \"event_id\",\n \"id_field_type\": \"string\",\n \"time_window\": \"1h\"\n }\n }\n ]\n },\n \"join\": {\n \"enabled\": False\n },\n \"sink\": {\n \"type\": \"clickhouse\",\n \"provider\": \"localhost\",\n \"host\": \"clickhouse\",\n \"port\": \"9000\",\n \"database\": \"default\",\n \"username\": \"default\",\n \"password\": \"c2VjcmV0\",\n \"secure\": False,\n \"max_batch_size\": 1000,\n \"max_delay_time\": \"30s\",\n \"table\": \"users_dedup\",\n \"table_mapping\": [\n {\n \"source_id\": \"users\",\n \"field_name\": \"event_id\",\n \"column_name\": \"event_id\",\n \"column_type\": \"UUID\"\n },\n {\n \"source_id\": \"users\",\n \"field_name\": \"user_id\",\n \"column_name\": \"user_id\",\n \"column_type\": \"UUID\"\n },\n {\n \"source_id\": \"users\",\n \"field_name\": \"created_at\",\n \"column_name\": \"created_at\",\n \"column_type\": \"DateTime\"\n },\n {\n \"source_id\": \"users\",\n \"field_name\": \"name\",\n \"column_name\": \"name\",\n \"column_type\": \"String\"\n },\n {\n \"source_id\": \"users\",\n \"field_name\": \"email\",\n \"column_name\": \"email\",\n \"column_type\": \"String\"\n }\n ]\n }\n}\n\n# Create a pipeline\npipeline = client.create_pipeline(pipeline_config)\n```\n\n\n## Get pipeline\n\n```python\n# Get a pipeline by ID\npipeline = client.get_pipeline(\"my-pipeline-id\")\n```\n\n### List pipelines\n\n```python\npipelines = client.list_pipelines()\nfor pipeline in pipelines:\n print(f\"Pipeline ID: {pipeline['pipeline_id']}\")\n print(f\"Name: {pipeline['name']}\")\n print(f\"Transformation Type: {pipeline['transformation_type']}\")\n print(f\"Created At: {pipeline['created_at']}\")\n print(f\"State: {pipeline['state']}\")\n```\n\n### Delete pipeline\n\n```python\n# Delete a pipeline\nclient.delete_pipeline(\"my-pipeline-id\")\n\n# Or delete via pipeline instance\npipeline.delete()\n```\n\n### Update pipeline\n\n```python\n# Update the sink table name\nconfig_patch = {\n \"sink\": {\n \"table\": \"new_table_name\"\n }\n}\n\npipeline = client.get_pipeline(\"my-pipeline-id\")\npipeline.update(config_patch)\n```\n\n### Pause / Resume pipeline\n\n```python\n# Will stop ingesting new messages and finish processing the messages inside the pipeline\npipeline.pause()\n\n# Will resume ingestion\npipeline.resume()\n```\n\n## Pipeline Configuration\n\nFor detailed information about the pipeline configuration, see [GlassFlow docs](https://docs.glassflow.dev/pipeline/pipeline-configuration).\n\n## Tracking\n\nThe SDK includes anonymous usage tracking to help improve the product. Tracking is enabled by default but can be disabled in two ways:\n\n1. Using an environment variable:\n```bash\nexport GF_TRACKING_ENABLED=false\n```\n\n2. Programmatically using the `disable_tracking` method:\n```python\nfrom glassflow_clickhouse_etl import Client\n\nclient = Client(host=\"my-glassflow-host\")\nclient.disable_tracking()\n```\n\nThe tracking collects anonymous information about:\n- SDK version\n- Platform (operating system)\n- Python version\n- Pipeline ID\n- Whether joins or deduplication are enabled\n- Kafka security protocol, auth mechanism used and whether authentication is disabled\n- Errors during pipeline creation and deletion\n\n## Development\n\n### Setup\n\n1. Clone the repository\n2. Create a virtual environment\n3. Install dependencies:\n\n```bash\nuv venv\nsource .venv/bin/activate\nuv pip install -e .[dev]\n```\n\n### Testing\n\n```bash\npytest\n```\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "GlassFlow Clickhouse ETL Python SDK: Create GlassFlow pipelines between Kafka and ClickHouse",
"version": "1.0.0",
"project_urls": {
"Documentation": "https://glassflow.github.io/clickhouse-etl-py-sdk",
"Homepage": "https://github.com/glassflow/clickhouse-etl-py-sdk",
"Issues": "https://github.com/glassflow/clickhouse-etl-py-sdk/issues",
"Repository": "https://github.com/glassflow/clickhouse-etl-py-sdk.git"
},
"split_keywords": [
"clickhouse",
" data-engineering",
" data-pipeline",
" etl",
" glassflow",
" kafka",
" streaming"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "eefbf972ad8f2160ff99219c1364a8596087fd7f6412783c3a682570ce016a6a",
"md5": "11cf0468f9a806143ab662c19f91a0ad",
"sha256": "9b20e359b923fb10aa9bc09099d882014054d987e58d829962ee38cdca7929ad"
},
"downloads": -1,
"filename": "glassflow_clickhouse_etl-1.0.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "11cf0468f9a806143ab662c19f91a0ad",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.9",
"size": 19918,
"upload_time": "2025-09-01T16:33:51",
"upload_time_iso_8601": "2025-09-01T16:33:51.785924Z",
"url": "https://files.pythonhosted.org/packages/ee/fb/f972ad8f2160ff99219c1364a8596087fd7f6412783c3a682570ce016a6a/glassflow_clickhouse_etl-1.0.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "dae30b323f0bfad46d9ca00ac7c428f5ac42665d268ac6ba1d30ff5265d78a1f",
"md5": "93b47a3b1285ebd24e506ad7d49ee05e",
"sha256": "713a7ddb713d86e5eb33a71fcfe6ceaea9c0ead73f26616b88bbe3e97ea17c6e"
},
"downloads": -1,
"filename": "glassflow_clickhouse_etl-1.0.0.tar.gz",
"has_sig": false,
"md5_digest": "93b47a3b1285ebd24e506ad7d49ee05e",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.9",
"size": 78476,
"upload_time": "2025-09-01T16:33:52",
"upload_time_iso_8601": "2025-09-01T16:33:52.817377Z",
"url": "https://files.pythonhosted.org/packages/da/e3/0b323f0bfad46d9ca00ac7c428f5ac42665d268ac6ba1d30ff5265d78a1f/glassflow_clickhouse_etl-1.0.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-09-01 16:33:52",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "glassflow",
"github_project": "clickhouse-etl-py-sdk",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "glassflow-clickhouse-etl"
}