glassflow-clickhouse-etl


Nameglassflow-clickhouse-etl JSON
Version 1.0.0 PyPI version JSON
download
home_pageNone
SummaryGlassFlow Clickhouse ETL Python SDK: Create GlassFlow pipelines between Kafka and ClickHouse
upload_time2025-09-01 16:33:52
maintainerNone
docs_urlNone
authorNone
requires_python>=3.9
licenseMIT
keywords clickhouse data-engineering data-pipeline etl glassflow kafka streaming
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Clickhouse ETL Python SDK

<p align="left">
  <a target="_blank" href="https://pypi.python.org/pypi/glassflow-clickhouse-etl">
    <img src="https://img.shields.io/pypi/v/glassflow-clickhouse-etl.svg?labelColor=&color=e69e3a">
  </a>
  <a target="_blank" href="https://github.com/glassflow/clickhouse-etl-py-sdk/blob/main/LICENSE">
    <img src="https://img.shields.io/pypi/l/glassflow-clickhouse-etl.svg?labelColor=&color=e69e3a">
  </a>
  <a target="_blank" href="https://pypi.python.org/pypi/glassflow-clickhouse-etl">
    <img src="https://img.shields.io/pypi/pyversions/glassflow-clickhouse-etl.svg?labelColor=&color=e69e3a">
  </a>
  <br />
  <a target="_blank" href="(https://github.com/glassflow/clickhouse-etl-py-sdk/actions">
    <img src="https://github.com/glassflow/clickhouse-etl-py-sdk/workflows/Test/badge.svg?labelColor=&color=e69e3a">
  </a>
<!-- Pytest Coverage Comment:Begin -->
  <img src=https://img.shields.io/badge/coverage-94%25-brightgreen>
<!-- Pytest Coverage Comment:End -->
</p>

A Python SDK for creating and managing data pipelines between Kafka and ClickHouse.

## Features

- Create and manage data pipelines between Kafka and ClickHouse
- Deduplication of events during a time window based on a key
- Temporal joins between topics based on a common key with a given time window
- Schema validation and configuration management

## Installation

```bash
pip install glassflow-clickhouse-etl
```

## Quick Start

### Initialize client

```python
from glassflow_clickhouse_etl import Client

# Initialize GlassFlow client
client = Client(host="your-glassflow-etl-url")
```

### Create a pipeline

```python
pipeline_config = {
    "pipeline_id": "deduplication-demo-pipeline",
    "source": {
      "type": "kafka",
      "provider": "confluent",
      "connection_params": {
        "brokers": [
          "kafka:9093"
        ],
        "protocol": "PLAINTEXT",
        "skip_auth": True
      },
      "topics": [
        {
          "consumer_group_initial_offset": "latest",
          "name": "users",
          "schema": {
            "type": "json",
            "fields": [
              {
                "name": "event_id",
                "type": "string"
              },
              {
                "name": "user_id",
                "type": "string"
              },
              {
                "name": "name",
                "type": "string"
              },
              {
                "name": "email",
                "type": "string"
              },
              {
                "name": "created_at",
                "type": "string"
              }
            ]
          },
          "deduplication": {
            "enabled": True,
            "id_field": "event_id",
            "id_field_type": "string",
            "time_window": "1h"
          }
        }
      ]
    },
    "join": {
      "enabled": False
    },
    "sink": {
      "type": "clickhouse",
      "provider": "localhost",
      "host": "clickhouse",
      "port": "9000",
      "database": "default",
      "username": "default",
      "password": "c2VjcmV0",
      "secure": False,
      "max_batch_size": 1000,
      "max_delay_time": "30s",
      "table": "users_dedup",
      "table_mapping": [
        {
          "source_id": "users",
          "field_name": "event_id",
          "column_name": "event_id",
          "column_type": "UUID"
        },
        {
          "source_id": "users",
          "field_name": "user_id",
          "column_name": "user_id",
          "column_type": "UUID"
        },
        {
          "source_id": "users",
          "field_name": "created_at",
          "column_name": "created_at",
          "column_type": "DateTime"
        },
        {
          "source_id": "users",
          "field_name": "name",
          "column_name": "name",
          "column_type": "String"
        },
        {
          "source_id": "users",
          "field_name": "email",
          "column_name": "email",
          "column_type": "String"
        }
      ]
    }
}

# Create a pipeline
pipeline = client.create_pipeline(pipeline_config)
```


## Get pipeline

```python
# Get a pipeline by ID
pipeline = client.get_pipeline("my-pipeline-id")
```

### List pipelines

```python
pipelines = client.list_pipelines()
for pipeline in pipelines:
    print(f"Pipeline ID: {pipeline['pipeline_id']}")
    print(f"Name: {pipeline['name']}")
    print(f"Transformation Type: {pipeline['transformation_type']}")
    print(f"Created At: {pipeline['created_at']}")
    print(f"State: {pipeline['state']}")
```

### Delete pipeline

```python
# Delete a pipeline
client.delete_pipeline("my-pipeline-id")

# Or delete via pipeline instance
pipeline.delete()
```

### Update pipeline

```python
# Update the sink table name
config_patch = {
  "sink": {
    "table": "new_table_name"
  }
}

pipeline = client.get_pipeline("my-pipeline-id")
pipeline.update(config_patch)
```

### Pause / Resume pipeline

```python
# Will stop ingesting new messages and finish processing the messages inside the pipeline
pipeline.pause()

# Will resume ingestion
pipeline.resume()
```

## Pipeline Configuration

For detailed information about the pipeline configuration, see [GlassFlow docs](https://docs.glassflow.dev/pipeline/pipeline-configuration).

## Tracking

The SDK includes anonymous usage tracking to help improve the product. Tracking is enabled by default but can be disabled in two ways:

1. Using an environment variable:
```bash
export GF_TRACKING_ENABLED=false
```

2. Programmatically using the `disable_tracking` method:
```python
from glassflow_clickhouse_etl import Client

client = Client(host="my-glassflow-host")
client.disable_tracking()
```

The tracking collects anonymous information about:
- SDK version
- Platform (operating system)
- Python version
- Pipeline ID
- Whether joins or deduplication are enabled
- Kafka security protocol, auth mechanism used and whether authentication is disabled
- Errors during pipeline creation and deletion

## Development

### Setup

1. Clone the repository
2. Create a virtual environment
3. Install dependencies:

```bash
uv venv
source .venv/bin/activate
uv pip install -e .[dev]
```

### Testing

```bash
pytest
```

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "glassflow-clickhouse-etl",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.9",
    "maintainer_email": null,
    "keywords": "clickhouse, data-engineering, data-pipeline, etl, glassflow, kafka, streaming",
    "author": null,
    "author_email": "GlassFlow <hello@glassflow.dev>",
    "download_url": "https://files.pythonhosted.org/packages/da/e3/0b323f0bfad46d9ca00ac7c428f5ac42665d268ac6ba1d30ff5265d78a1f/glassflow_clickhouse_etl-1.0.0.tar.gz",
    "platform": null,
    "description": "# Clickhouse ETL Python SDK\n\n<p align=\"left\">\n  <a target=\"_blank\" href=\"https://pypi.python.org/pypi/glassflow-clickhouse-etl\">\n    <img src=\"https://img.shields.io/pypi/v/glassflow-clickhouse-etl.svg?labelColor=&color=e69e3a\">\n  </a>\n  <a target=\"_blank\" href=\"https://github.com/glassflow/clickhouse-etl-py-sdk/blob/main/LICENSE\">\n    <img src=\"https://img.shields.io/pypi/l/glassflow-clickhouse-etl.svg?labelColor=&color=e69e3a\">\n  </a>\n  <a target=\"_blank\" href=\"https://pypi.python.org/pypi/glassflow-clickhouse-etl\">\n    <img src=\"https://img.shields.io/pypi/pyversions/glassflow-clickhouse-etl.svg?labelColor=&color=e69e3a\">\n  </a>\n  <br />\n  <a target=\"_blank\" href=\"(https://github.com/glassflow/clickhouse-etl-py-sdk/actions\">\n    <img src=\"https://github.com/glassflow/clickhouse-etl-py-sdk/workflows/Test/badge.svg?labelColor=&color=e69e3a\">\n  </a>\n<!-- Pytest Coverage Comment:Begin -->\n  <img src=https://img.shields.io/badge/coverage-94%25-brightgreen>\n<!-- Pytest Coverage Comment:End -->\n</p>\n\nA Python SDK for creating and managing data pipelines between Kafka and ClickHouse.\n\n## Features\n\n- Create and manage data pipelines between Kafka and ClickHouse\n- Deduplication of events during a time window based on a key\n- Temporal joins between topics based on a common key with a given time window\n- Schema validation and configuration management\n\n## Installation\n\n```bash\npip install glassflow-clickhouse-etl\n```\n\n## Quick Start\n\n### Initialize client\n\n```python\nfrom glassflow_clickhouse_etl import Client\n\n# Initialize GlassFlow client\nclient = Client(host=\"your-glassflow-etl-url\")\n```\n\n### Create a pipeline\n\n```python\npipeline_config = {\n    \"pipeline_id\": \"deduplication-demo-pipeline\",\n    \"source\": {\n      \"type\": \"kafka\",\n      \"provider\": \"confluent\",\n      \"connection_params\": {\n        \"brokers\": [\n          \"kafka:9093\"\n        ],\n        \"protocol\": \"PLAINTEXT\",\n        \"skip_auth\": True\n      },\n      \"topics\": [\n        {\n          \"consumer_group_initial_offset\": \"latest\",\n          \"name\": \"users\",\n          \"schema\": {\n            \"type\": \"json\",\n            \"fields\": [\n              {\n                \"name\": \"event_id\",\n                \"type\": \"string\"\n              },\n              {\n                \"name\": \"user_id\",\n                \"type\": \"string\"\n              },\n              {\n                \"name\": \"name\",\n                \"type\": \"string\"\n              },\n              {\n                \"name\": \"email\",\n                \"type\": \"string\"\n              },\n              {\n                \"name\": \"created_at\",\n                \"type\": \"string\"\n              }\n            ]\n          },\n          \"deduplication\": {\n            \"enabled\": True,\n            \"id_field\": \"event_id\",\n            \"id_field_type\": \"string\",\n            \"time_window\": \"1h\"\n          }\n        }\n      ]\n    },\n    \"join\": {\n      \"enabled\": False\n    },\n    \"sink\": {\n      \"type\": \"clickhouse\",\n      \"provider\": \"localhost\",\n      \"host\": \"clickhouse\",\n      \"port\": \"9000\",\n      \"database\": \"default\",\n      \"username\": \"default\",\n      \"password\": \"c2VjcmV0\",\n      \"secure\": False,\n      \"max_batch_size\": 1000,\n      \"max_delay_time\": \"30s\",\n      \"table\": \"users_dedup\",\n      \"table_mapping\": [\n        {\n          \"source_id\": \"users\",\n          \"field_name\": \"event_id\",\n          \"column_name\": \"event_id\",\n          \"column_type\": \"UUID\"\n        },\n        {\n          \"source_id\": \"users\",\n          \"field_name\": \"user_id\",\n          \"column_name\": \"user_id\",\n          \"column_type\": \"UUID\"\n        },\n        {\n          \"source_id\": \"users\",\n          \"field_name\": \"created_at\",\n          \"column_name\": \"created_at\",\n          \"column_type\": \"DateTime\"\n        },\n        {\n          \"source_id\": \"users\",\n          \"field_name\": \"name\",\n          \"column_name\": \"name\",\n          \"column_type\": \"String\"\n        },\n        {\n          \"source_id\": \"users\",\n          \"field_name\": \"email\",\n          \"column_name\": \"email\",\n          \"column_type\": \"String\"\n        }\n      ]\n    }\n}\n\n# Create a pipeline\npipeline = client.create_pipeline(pipeline_config)\n```\n\n\n## Get pipeline\n\n```python\n# Get a pipeline by ID\npipeline = client.get_pipeline(\"my-pipeline-id\")\n```\n\n### List pipelines\n\n```python\npipelines = client.list_pipelines()\nfor pipeline in pipelines:\n    print(f\"Pipeline ID: {pipeline['pipeline_id']}\")\n    print(f\"Name: {pipeline['name']}\")\n    print(f\"Transformation Type: {pipeline['transformation_type']}\")\n    print(f\"Created At: {pipeline['created_at']}\")\n    print(f\"State: {pipeline['state']}\")\n```\n\n### Delete pipeline\n\n```python\n# Delete a pipeline\nclient.delete_pipeline(\"my-pipeline-id\")\n\n# Or delete via pipeline instance\npipeline.delete()\n```\n\n### Update pipeline\n\n```python\n# Update the sink table name\nconfig_patch = {\n  \"sink\": {\n    \"table\": \"new_table_name\"\n  }\n}\n\npipeline = client.get_pipeline(\"my-pipeline-id\")\npipeline.update(config_patch)\n```\n\n### Pause / Resume pipeline\n\n```python\n# Will stop ingesting new messages and finish processing the messages inside the pipeline\npipeline.pause()\n\n# Will resume ingestion\npipeline.resume()\n```\n\n## Pipeline Configuration\n\nFor detailed information about the pipeline configuration, see [GlassFlow docs](https://docs.glassflow.dev/pipeline/pipeline-configuration).\n\n## Tracking\n\nThe SDK includes anonymous usage tracking to help improve the product. Tracking is enabled by default but can be disabled in two ways:\n\n1. Using an environment variable:\n```bash\nexport GF_TRACKING_ENABLED=false\n```\n\n2. Programmatically using the `disable_tracking` method:\n```python\nfrom glassflow_clickhouse_etl import Client\n\nclient = Client(host=\"my-glassflow-host\")\nclient.disable_tracking()\n```\n\nThe tracking collects anonymous information about:\n- SDK version\n- Platform (operating system)\n- Python version\n- Pipeline ID\n- Whether joins or deduplication are enabled\n- Kafka security protocol, auth mechanism used and whether authentication is disabled\n- Errors during pipeline creation and deletion\n\n## Development\n\n### Setup\n\n1. Clone the repository\n2. Create a virtual environment\n3. Install dependencies:\n\n```bash\nuv venv\nsource .venv/bin/activate\nuv pip install -e .[dev]\n```\n\n### Testing\n\n```bash\npytest\n```\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "GlassFlow Clickhouse ETL Python SDK: Create GlassFlow pipelines between Kafka and ClickHouse",
    "version": "1.0.0",
    "project_urls": {
        "Documentation": "https://glassflow.github.io/clickhouse-etl-py-sdk",
        "Homepage": "https://github.com/glassflow/clickhouse-etl-py-sdk",
        "Issues": "https://github.com/glassflow/clickhouse-etl-py-sdk/issues",
        "Repository": "https://github.com/glassflow/clickhouse-etl-py-sdk.git"
    },
    "split_keywords": [
        "clickhouse",
        " data-engineering",
        " data-pipeline",
        " etl",
        " glassflow",
        " kafka",
        " streaming"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "eefbf972ad8f2160ff99219c1364a8596087fd7f6412783c3a682570ce016a6a",
                "md5": "11cf0468f9a806143ab662c19f91a0ad",
                "sha256": "9b20e359b923fb10aa9bc09099d882014054d987e58d829962ee38cdca7929ad"
            },
            "downloads": -1,
            "filename": "glassflow_clickhouse_etl-1.0.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "11cf0468f9a806143ab662c19f91a0ad",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.9",
            "size": 19918,
            "upload_time": "2025-09-01T16:33:51",
            "upload_time_iso_8601": "2025-09-01T16:33:51.785924Z",
            "url": "https://files.pythonhosted.org/packages/ee/fb/f972ad8f2160ff99219c1364a8596087fd7f6412783c3a682570ce016a6a/glassflow_clickhouse_etl-1.0.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "dae30b323f0bfad46d9ca00ac7c428f5ac42665d268ac6ba1d30ff5265d78a1f",
                "md5": "93b47a3b1285ebd24e506ad7d49ee05e",
                "sha256": "713a7ddb713d86e5eb33a71fcfe6ceaea9c0ead73f26616b88bbe3e97ea17c6e"
            },
            "downloads": -1,
            "filename": "glassflow_clickhouse_etl-1.0.0.tar.gz",
            "has_sig": false,
            "md5_digest": "93b47a3b1285ebd24e506ad7d49ee05e",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.9",
            "size": 78476,
            "upload_time": "2025-09-01T16:33:52",
            "upload_time_iso_8601": "2025-09-01T16:33:52.817377Z",
            "url": "https://files.pythonhosted.org/packages/da/e3/0b323f0bfad46d9ca00ac7c428f5ac42665d268ac6ba1d30ff5265d78a1f/glassflow_clickhouse_etl-1.0.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-09-01 16:33:52",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "glassflow",
    "github_project": "clickhouse-etl-py-sdk",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "glassflow-clickhouse-etl"
}
        
Elapsed time: 1.79413s