risingwave-pipeline-sdk


Namerisingwave-pipeline-sdk JSON
Version 0.1.1 PyPI version JSON
download
home_pageNone
SummaryA Python SDK for building RisingWave data pipelines with PostgreSQL CDC and multiple sink destinations
upload_time2025-08-27 03:34:45
maintainerNone
docs_urlNone
authorNone
requires_python>=3.10
licenseNone
keywords cdc data-pipeline iceberg postgresql risingwave risingwave-pipeline-sdk risingwave_pipeline_sdk s3 streaming
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # RisingWave Pipeline SDK

A Python SDK for building RisingWave data pipelines with PostgreSQL CDC, automatic table discovery, and multiple sink destinations.

## Features

- **PostgreSQL CDC Integration**: Complete Change Data Capture support with automatic schema discovery
- **Flexible Table Selection**: Pattern-based, interactive, or programmatic table selection
- **Multiple Sink Support**: Iceberg, S3, and PostgreSQL destinations
- **Advanced CDC Configuration**: SSL, backfilling, publication management, and more
- **SQL Generation**: Automatically generates optimized RisingWave SQL statements

## Installation

```bash
# Using uv (recommended)
uv add risingwave-pipeline-sdk

# Using pip
pip install risingwave-pipeline-sdk
```

## Quick Start

```python
from risingwave_pipeline_sdk import (
    RisingWaveClient,
    PipelineBuilder,
    PostgreSQLConfig,
    TableSelector
)

# Connect to RisingWave
client = RisingWaveClient("postgresql://root@localhost:4566/dev")

# Configure PostgreSQL CDC
config = PostgreSQLConfig(
    hostname="localhost",
    port=5432,
    username="postgres",
    password="secret",
    database="mydb",
    auto_schema_change=True
)

# Create pipeline with table selection
builder = PipelineBuilder(client)
result = builder.create_postgresql_pipeline(
    config=config,
    table_selector=TableSelector(include_patterns=["users", "orders"])
)

print(f"Created CDC source with {len(result['selected_tables'])} tables")
```

## Table Discovery and Selection

### Discover Available Tables

```python
# Discover all available tables
available_tables = builder.discover_postgresql_tables(config)

for table in available_tables:
    print(f"{table.qualified_name} - {table.row_count} rows")
```

### Flexible Table Selection

```python
# Select specific tables
TableSelector(specific_tables=["users", "orders", "products"])

# Pattern-based selection
TableSelector(
    include_patterns=["user_*", "order_*"],
    exclude_patterns=["*_temp", "*_backup"]
)

# Include all tables except specific ones
TableSelector(
    include_all=True,
    exclude_patterns=["temp_*", "backup_*"]
)
```

## PostgreSQL CDC Configuration

```python
config = PostgreSQLConfig(
    # Connection details
    hostname="localhost",
    port=5432,
    username="postgres",
    password="secret",
    database="mydb",
    schema_name="public",

    # CDC settings
    auto_schema_change=True,
    publication_name="rw_publication",
    slot_name="rw_slot",

    # SSL configuration
    ssl_mode="require",
    ssl_root_cert="/path/to/ca.pem",

    # Performance tuning
    backfill_parallelism="8",
    backfill_num_rows_per_split="100000",
    backfill_as_even_splits=True
)
```

## Sink Destinations

### Iceberg Data Lake

```python
from risingwave_pipeline_sdk import IcebergConfig

iceberg_config = IcebergConfig(
    sink_name="analytics_lake",
    warehouse_path="s3://my-warehouse/",
    database_name="analytics",
    table_name="events",
    catalog_type="storage",

    # S3 configuration
    s3_region="us-east-1",
    s3_access_key="your-access-key",
    s3_secret_key="your-secret-key",

    # Data type
    data_type="append-only",
    force_append_only=True
)

# Create sink
builder.create_sink(iceberg_config, ["events", "users"])
```

### S3 Data Archive

```python
from risingwave_pipeline_sdk import S3Config

s3_config = S3Config(
    sink_name="data_archive",
    bucket_name="my-data-bucket",
    path="raw-data/",
    region_name="us-east-1",
    access_key_id="your-access-key",
    secret_access_key="your-secret-key",

    # Format configuration
    format_type="PLAIN",
    encode_type="PARQUET"
)

builder.create_s3_sink(s3_config, ["users", "orders"])
```

### PostgreSQL Analytics Database

```python
from risingwave_pipeline_sdk import PostgreSQLSinkConfig

analytics_config = PostgreSQLSinkConfig(
    sink_name="analytics_db",
    hostname="analytics.example.com",
    port=5432,
    username="analytics_user",
    password="password",
    database="analytics",
    postgres_schema="real_time"
)

# Create sink with custom transformations
custom_queries = {
    "users": "SELECT id, name, email, created_at FROM users WHERE active = true",
    "orders": "SELECT * FROM orders WHERE status != 'cancelled'"
}

builder.create_postgresql_sink(
    analytics_config,
    ["users", "orders"],
    select_queries=custom_queries
)
```

## Complete Pipeline Example

```python
# 1. Set up CDC source
cdc_result = builder.create_postgresql_pipeline(
    config=postgres_config,
    table_selector=TableSelector(include_patterns=["user_*", "order_*"])
)

selected_tables = [t.qualified_name for t in cdc_result['selected_tables']]

# 2. Create multiple sinks
builder.create_s3_sink(s3_config, selected_tables)  # Data lake
builder.create_postgresql_sink(analytics_config, selected_tables)  # Analytics
builder.create_sink(iceberg_config, selected_tables)  # Iceberg warehouse
```

## Examples

The `examples/` directory contains complete working examples:

- **`postgres_cdc_iceberg_pipeline.py`** - End-to-end CDC to Iceberg pipeline
- **`interactive_discovery.py`** - Interactive table discovery and selection
- **`env_config_example.py`** - Environment variable based configuration

## Environment Configuration

Configure using environment variables for production deployments:

```bash
# RisingWave connection
export RW_HOST=localhost
export RW_PORT=4566
export RW_USER=root
export RW_DATABASE=dev

# PostgreSQL CDC source
export PG_HOST=localhost
export PG_PORT=5432
export PG_USER=postgres
export PG_PASSWORD=secret
export PG_DATABASE=mydb

# Table selection
export TABLE_PATTERNS="users,orders,products"
```

## Development

```bash
# Clone and set up development environment
git clone https://github.com/risingwavelabs/risingwave-pipeline-sdk.git
cd risingwave-pipeline-sdk

# Install with development dependencies
uv venv
source .venv/bin/activate
uv pip install -e .[dev]

# Run tests
pytest

# Format code
ruff format .
```

## Requirements

- Python ≥ 3.10
- RisingWave instance (local or cloud)
- PostgreSQL with CDC enabled
- Required Python packages: `psycopg[binary]`, `pydantic`

## License

Apache 2.0 License

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "risingwave-pipeline-sdk",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.10",
    "maintainer_email": null,
    "keywords": "cdc, data-pipeline, iceberg, postgresql, risingwave, risingwave-pipeline-sdk, risingwave_pipeline_sdk, s3, streaming",
    "author": null,
    "author_email": "Yuxuan Liao <yuxuan@risingwave-labs.com>",
    "download_url": "https://files.pythonhosted.org/packages/f3/41/7e04b45bf5ab3be60501473f63f2d9c72cb3d2051ffccdf321fa627d07fe/risingwave_pipeline_sdk-0.1.1.tar.gz",
    "platform": null,
    "description": "# RisingWave Pipeline SDK\n\nA Python SDK for building RisingWave data pipelines with PostgreSQL CDC, automatic table discovery, and multiple sink destinations.\n\n## Features\n\n- **PostgreSQL CDC Integration**: Complete Change Data Capture support with automatic schema discovery\n- **Flexible Table Selection**: Pattern-based, interactive, or programmatic table selection\n- **Multiple Sink Support**: Iceberg, S3, and PostgreSQL destinations\n- **Advanced CDC Configuration**: SSL, backfilling, publication management, and more\n- **SQL Generation**: Automatically generates optimized RisingWave SQL statements\n\n## Installation\n\n```bash\n# Using uv (recommended)\nuv add risingwave-pipeline-sdk\n\n# Using pip\npip install risingwave-pipeline-sdk\n```\n\n## Quick Start\n\n```python\nfrom risingwave_pipeline_sdk import (\n    RisingWaveClient,\n    PipelineBuilder,\n    PostgreSQLConfig,\n    TableSelector\n)\n\n# Connect to RisingWave\nclient = RisingWaveClient(\"postgresql://root@localhost:4566/dev\")\n\n# Configure PostgreSQL CDC\nconfig = PostgreSQLConfig(\n    hostname=\"localhost\",\n    port=5432,\n    username=\"postgres\",\n    password=\"secret\",\n    database=\"mydb\",\n    auto_schema_change=True\n)\n\n# Create pipeline with table selection\nbuilder = PipelineBuilder(client)\nresult = builder.create_postgresql_pipeline(\n    config=config,\n    table_selector=TableSelector(include_patterns=[\"users\", \"orders\"])\n)\n\nprint(f\"Created CDC source with {len(result['selected_tables'])} tables\")\n```\n\n## Table Discovery and Selection\n\n### Discover Available Tables\n\n```python\n# Discover all available tables\navailable_tables = builder.discover_postgresql_tables(config)\n\nfor table in available_tables:\n    print(f\"{table.qualified_name} - {table.row_count} rows\")\n```\n\n### Flexible Table Selection\n\n```python\n# Select specific tables\nTableSelector(specific_tables=[\"users\", \"orders\", \"products\"])\n\n# Pattern-based selection\nTableSelector(\n    include_patterns=[\"user_*\", \"order_*\"],\n    exclude_patterns=[\"*_temp\", \"*_backup\"]\n)\n\n# Include all tables except specific ones\nTableSelector(\n    include_all=True,\n    exclude_patterns=[\"temp_*\", \"backup_*\"]\n)\n```\n\n## PostgreSQL CDC Configuration\n\n```python\nconfig = PostgreSQLConfig(\n    # Connection details\n    hostname=\"localhost\",\n    port=5432,\n    username=\"postgres\",\n    password=\"secret\",\n    database=\"mydb\",\n    schema_name=\"public\",\n\n    # CDC settings\n    auto_schema_change=True,\n    publication_name=\"rw_publication\",\n    slot_name=\"rw_slot\",\n\n    # SSL configuration\n    ssl_mode=\"require\",\n    ssl_root_cert=\"/path/to/ca.pem\",\n\n    # Performance tuning\n    backfill_parallelism=\"8\",\n    backfill_num_rows_per_split=\"100000\",\n    backfill_as_even_splits=True\n)\n```\n\n## Sink Destinations\n\n### Iceberg Data Lake\n\n```python\nfrom risingwave_pipeline_sdk import IcebergConfig\n\niceberg_config = IcebergConfig(\n    sink_name=\"analytics_lake\",\n    warehouse_path=\"s3://my-warehouse/\",\n    database_name=\"analytics\",\n    table_name=\"events\",\n    catalog_type=\"storage\",\n\n    # S3 configuration\n    s3_region=\"us-east-1\",\n    s3_access_key=\"your-access-key\",\n    s3_secret_key=\"your-secret-key\",\n\n    # Data type\n    data_type=\"append-only\",\n    force_append_only=True\n)\n\n# Create sink\nbuilder.create_sink(iceberg_config, [\"events\", \"users\"])\n```\n\n### S3 Data Archive\n\n```python\nfrom risingwave_pipeline_sdk import S3Config\n\ns3_config = S3Config(\n    sink_name=\"data_archive\",\n    bucket_name=\"my-data-bucket\",\n    path=\"raw-data/\",\n    region_name=\"us-east-1\",\n    access_key_id=\"your-access-key\",\n    secret_access_key=\"your-secret-key\",\n\n    # Format configuration\n    format_type=\"PLAIN\",\n    encode_type=\"PARQUET\"\n)\n\nbuilder.create_s3_sink(s3_config, [\"users\", \"orders\"])\n```\n\n### PostgreSQL Analytics Database\n\n```python\nfrom risingwave_pipeline_sdk import PostgreSQLSinkConfig\n\nanalytics_config = PostgreSQLSinkConfig(\n    sink_name=\"analytics_db\",\n    hostname=\"analytics.example.com\",\n    port=5432,\n    username=\"analytics_user\",\n    password=\"password\",\n    database=\"analytics\",\n    postgres_schema=\"real_time\"\n)\n\n# Create sink with custom transformations\ncustom_queries = {\n    \"users\": \"SELECT id, name, email, created_at FROM users WHERE active = true\",\n    \"orders\": \"SELECT * FROM orders WHERE status != 'cancelled'\"\n}\n\nbuilder.create_postgresql_sink(\n    analytics_config,\n    [\"users\", \"orders\"],\n    select_queries=custom_queries\n)\n```\n\n## Complete Pipeline Example\n\n```python\n# 1. Set up CDC source\ncdc_result = builder.create_postgresql_pipeline(\n    config=postgres_config,\n    table_selector=TableSelector(include_patterns=[\"user_*\", \"order_*\"])\n)\n\nselected_tables = [t.qualified_name for t in cdc_result['selected_tables']]\n\n# 2. Create multiple sinks\nbuilder.create_s3_sink(s3_config, selected_tables)  # Data lake\nbuilder.create_postgresql_sink(analytics_config, selected_tables)  # Analytics\nbuilder.create_sink(iceberg_config, selected_tables)  # Iceberg warehouse\n```\n\n## Examples\n\nThe `examples/` directory contains complete working examples:\n\n- **`postgres_cdc_iceberg_pipeline.py`** - End-to-end CDC to Iceberg pipeline\n- **`interactive_discovery.py`** - Interactive table discovery and selection\n- **`env_config_example.py`** - Environment variable based configuration\n\n## Environment Configuration\n\nConfigure using environment variables for production deployments:\n\n```bash\n# RisingWave connection\nexport RW_HOST=localhost\nexport RW_PORT=4566\nexport RW_USER=root\nexport RW_DATABASE=dev\n\n# PostgreSQL CDC source\nexport PG_HOST=localhost\nexport PG_PORT=5432\nexport PG_USER=postgres\nexport PG_PASSWORD=secret\nexport PG_DATABASE=mydb\n\n# Table selection\nexport TABLE_PATTERNS=\"users,orders,products\"\n```\n\n## Development\n\n```bash\n# Clone and set up development environment\ngit clone https://github.com/risingwavelabs/risingwave-pipeline-sdk.git\ncd risingwave-pipeline-sdk\n\n# Install with development dependencies\nuv venv\nsource .venv/bin/activate\nuv pip install -e .[dev]\n\n# Run tests\npytest\n\n# Format code\nruff format .\n```\n\n## Requirements\n\n- Python \u2265 3.10\n- RisingWave instance (local or cloud)\n- PostgreSQL with CDC enabled\n- Required Python packages: `psycopg[binary]`, `pydantic`\n\n## License\n\nApache 2.0 License\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "A Python SDK for building RisingWave data pipelines with PostgreSQL CDC and multiple sink destinations",
    "version": "0.1.1",
    "project_urls": {
        "Homepage": "https://github.com/risingwavelabs/risingwave-pipeline-sdk",
        "Issues": "https://github.com/risingwavelabs/risingwave-pipeline-sdk/issues",
        "Repository": "https://github.com/risingwavelabs/risingwave-pipeline-sdk"
    },
    "split_keywords": [
        "cdc",
        " data-pipeline",
        " iceberg",
        " postgresql",
        " risingwave",
        " risingwave-pipeline-sdk",
        " risingwave_pipeline_sdk",
        " s3",
        " streaming"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "a67f47eb2de6b4411e9d7d005fe64b00168b77792278dbe0f04802acbed9190c",
                "md5": "ff0a8956b40ef245363b1292e83d661e",
                "sha256": "7826ff46f64cd33b2b4147fb115004c681e4e680ef8768cd8c3b5f1d41dc77aa"
            },
            "downloads": -1,
            "filename": "risingwave_pipeline_sdk-0.1.1-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "ff0a8956b40ef245363b1292e83d661e",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.10",
            "size": 29102,
            "upload_time": "2025-08-27T03:34:43",
            "upload_time_iso_8601": "2025-08-27T03:34:43.716716Z",
            "url": "https://files.pythonhosted.org/packages/a6/7f/47eb2de6b4411e9d7d005fe64b00168b77792278dbe0f04802acbed9190c/risingwave_pipeline_sdk-0.1.1-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "f3417e04b45bf5ab3be60501473f63f2d9c72cb3d2051ffccdf321fa627d07fe",
                "md5": "49512aeec03aee25cd459d33ce6072c1",
                "sha256": "3463b7a76f30c0f41c2dd9ed48ba4860b440be9941f6820dcc8fdfead29cf367"
            },
            "downloads": -1,
            "filename": "risingwave_pipeline_sdk-0.1.1.tar.gz",
            "has_sig": false,
            "md5_digest": "49512aeec03aee25cd459d33ce6072c1",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.10",
            "size": 75031,
            "upload_time": "2025-08-27T03:34:45",
            "upload_time_iso_8601": "2025-08-27T03:34:45.207320Z",
            "url": "https://files.pythonhosted.org/packages/f3/41/7e04b45bf5ab3be60501473f63f2d9c72cb3d2051ffccdf321fa627d07fe/risingwave_pipeline_sdk-0.1.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-08-27 03:34:45",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "risingwavelabs",
    "github_project": "risingwave-pipeline-sdk",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "risingwave-pipeline-sdk"
}
        
Elapsed time: 1.39130s