risingwave-connect-py


Namerisingwave-connect-py JSON
Version 0.2.7 PyPI version JSON
download
home_pageNone
SummaryPython SDK for integrating RisingWave with various CDC sources (like PostgreSQL) and supporting multiple sink destinations such as Iceberg and S3.
upload_time2025-08-29 07:24:29
maintainerNone
docs_urlNone
authorNone
requires_python>=3.10
licenseNone
keywords cdc data-connector iceberg postgresql risingwave risingwave-connect risingwave-connect-py risingwave_connect risingwave_connect_py s3 streaming
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # RisingWave Connect

A Python SDK for connecting to RisingWave with PostgreSQL CDC, automatic table discovery, and multiple sink destinations.

## Features

- **PostgreSQL CDC Integration**: Complete Change Data Capture support with automatic schema discovery
- **Table-Level Filtering**: Optimized table selection with pattern matching and validation
- **Column-Level Filtering**: Selective column replication with type control and primary key validation
- **Multiple Sink Support**: Iceberg, S3, and PostgreSQL destinations
- **Advanced CDC Configuration**: SSL, backfilling, publication management, and more

## Installation

```bash
# Using uv (recommended)
uv add risingwave-connect-py

# Using pip
pip install risingwave-connect-py
```

## Quick Start

```python
from risingwave_connect import (
    RisingWaveClient,
    ConnectBuilder,
    PostgreSQLConfig,
    TableSelector
)

# Connect to RisingWave
client = RisingWaveClient("postgresql://root@localhost:4566/dev")

# Configure PostgreSQL CDC
config = PostgreSQLConfig(
    hostname="localhost",
    port=5432,
    username="postgres",
    password="secret",
    database="mydb",
    auto_schema_change=True
)

# Create connector with table selection
builder = ConnectBuilder(client)
result = builder.create_postgresql_connection(
    config=config,
    table_selector=TableSelector(include_patterns=["users", "orders"])
)

print(f"Created CDC source with {len(result['selected_tables'])} tables")
```

## Table Discovery and Selection

### Discover Available Tables

```python
# Discover all available tables
available_tables = builder.discover_postgresql_tables(config)

for table in available_tables:
    print(f"{table.qualified_name} - {table.row_count} rows")
```

### Table-Level Filtering

```python
# Select specific tables
table_selector = ["users", "orders", "products"]

# Using TableSelector for specific tables
from risingwave_connect.discovery.base import TableSelector
table_selector = TableSelector(specific_tables=["users", "orders"])

# Pattern-based selection (checks all tables, then filters)
table_selector = TableSelector(
    include_patterns=["user_*", "order_*"],
    exclude_patterns=["*_temp", "*_backup"]
)

# Include all tables except specific ones
table_selector = TableSelector(
    include_all=True,
    exclude_patterns=["temp_*", "backup_*"]
)
```

### Column-Level Filtering

Select specific columns, control types, and ensure primary key consistency.

```python
from risingwave_connect.discovery.base import (
    TableColumnConfig, ColumnSelection, TableInfo
)

# Define table info
users_table = TableInfo(
    schema_name="public",
    table_name="users",
    table_type="BASE TABLE"
)

# Select specific columns with type control
users_columns = [
    ColumnSelection(
        column_name="id",
        is_primary_key=True,
        risingwave_type="INT"  # Override type if needed
    ),
    ColumnSelection(
        column_name="name",
        risingwave_type="VARCHAR",
        is_nullable=False
    ),
    ColumnSelection(
        column_name="email",
        risingwave_type="VARCHAR"
    ),
    ColumnSelection(
        column_name="created_at",
        risingwave_type="TIMESTAMP"
    )
    # Note: Excluding sensitive columns like 'password_hash'
]

# Create table configuration
users_config = TableColumnConfig(
    table_info=users_table,
    selected_columns=users_columns,
    custom_table_name="clean_users"  # Optional: custom name in RisingWave
)

# Apply column filtering
column_configs = {"users": users_config}

result = builder.create_postgresql_connection(
    config=postgres_config,
    table_selector=["users", "orders"],
    column_configs=column_configs  # NEW parameter
)
```

**Generated SQL with Column Filtering**:

```sql
CREATE TABLE clean_users (
    id INT PRIMARY KEY,
    name VARCHAR NOT NULL,
    email VARCHAR,
    created_at TIMESTAMP
)
FROM postgres_source TABLE 'public.users';
```

## PostgreSQL CDC Configuration

```python
config = PostgreSQLConfig(
    # Connection details
    hostname="localhost",
    port=5432,
    username="postgres",
    password="secret",
    database="mydb",
    schema_name="public",

    # CDC settings
    auto_schema_change=True,
    publication_name="rw_publication",
    slot_name="rw_slot",

    # SSL configuration
    ssl_mode="require",
    ssl_root_cert="/path/to/ca.pem",

    # Performance tuning
    backfill_parallelism="8",
    backfill_num_rows_per_split="100000",
    backfill_as_even_splits=True
)
```

### Schema Evolution

RisingWave supports automatic schema changes for PostgreSQL CDC sources when `auto_schema_change=True` is enabled:

For detailed information about schema evolution capabilities and limitations, see the [RisingWave Schema Evolution Documentation](https://docs.risingwave.com/ingestion/sources/postgresql/pg-cdc#automatic-schema-changes).

### Supported Data Types

RisingWave supports a comprehensive set of PostgreSQL data types for CDC replication. The SDK automatically maps PostgreSQL types to compatible RisingWave types.

For the complete list of supported PostgreSQL data types and their RisingWave equivalents, see the [RisingWave Supported Data Types Documentation](https://docs.risingwave.com/ingestion/sources/postgresql/pg-cdc#supported-data-types).

## Sink Destinations

### Iceberg Data Lake

```python
from risingwave_connect import IcebergConfig

iceberg_config = IcebergConfig(
    sink_name="analytics_lake",
    warehouse_path="s3://my-warehouse/",
    database_name="analytics",
    table_name="events",
    catalog_type="storage",

    # S3 configuration
    s3_region="us-east-1",
    s3_access_key="your-access-key",
    s3_secret_key="your-secret-key",

    # Data type
    data_type="append-only",
    force_append_only=True
)

# Create sink
builder.create_sink(iceberg_config, ["events", "users"])
```

## Complete Connection Examples

### Basic CDC with All Columns

```python
# Simple table selection with all columns
result = builder.create_postgresql_connection(
    config=postgres_config,
    table_selector=["users", "orders", "products"]  # Fast: only checks these tables
)

selected_tables = [t.qualified_name for t in result['selected_tables']]
```

### Advanced CDC with Column Filtering

```python
from risingwave_connect.discovery.base import (
    TableColumnConfig, ColumnSelection, TableInfo
)

# Configure selective columns for multiple tables
users_config = TableColumnConfig(
    table_info=TableInfo(schema_name="public", table_name="users"),
    selected_columns=[
        ColumnSelection(column_name="id", is_primary_key=True, risingwave_type="INT"),
        ColumnSelection(column_name="name", risingwave_type="VARCHAR", is_nullable=False),
        ColumnSelection(column_name="email", risingwave_type="VARCHAR"),
        ColumnSelection(column_name="created_at", risingwave_type="TIMESTAMP")
    ],
    custom_table_name="clean_users"
)

orders_config = TableColumnConfig(
    table_info=TableInfo(schema_name="public", table_name="orders"),
    selected_columns=[
        ColumnSelection(column_name="order_id", is_primary_key=True, risingwave_type="BIGINT"),
        ColumnSelection(column_name="user_id", risingwave_type="INT", is_nullable=False),
        ColumnSelection(column_name="total_amount", risingwave_type="DECIMAL(10,2)"),
        ColumnSelection(column_name="status", risingwave_type="VARCHAR"),
        ColumnSelection(column_name="created_at", risingwave_type="TIMESTAMP")
    ]
)

# Apply column configurations
column_configs = {
    "users": users_config,
    "orders": orders_config
    # No config for 'products' - will include all columns
}

# Create CDC with column filtering
cdc_result = builder.create_postgresql_connection(
    config=postgres_config,
    table_selector=["users", "orders", "products"],
    column_configs=column_configs
)

# Create sinks for filtered data
selected_tables = [t.qualified_name for t in cdc_result['selected_tables']]
builder.create_s3_sink(s3_config, selected_tables)
builder.create_sink(iceberg_config, selected_tables)
```

### Multi-Destination Data Pipeline

```python
# 1. Set up CDC source with filtering
cdc_result = builder.create_postgresql_connection(
    config=postgres_config,
    table_selector=TableSelector(include_patterns=["user_*", "order_*"])
)

selected_tables = [t.qualified_name for t in cdc_result['selected_tables']]

# 2. Create multiple data destinations
builder.create_s3_sink(s3_config, selected_tables)  # Data lake
builder.create_postgresql_sink(analytics_config, selected_tables)  # Analytics
builder.create_sink(iceberg_config, selected_tables)  # Iceberg warehouse
```

## Examples

The `examples/` directory contains complete working examples:

- **`postgres_cdc_iceberg_pipeline.py`** - End-to-end CDC to Iceberg pipeline
- **`table_column_filtering_example.py`** - Comprehensive table and column filtering examples

## Development

```bash
# Clone and set up development environment
git clone https://github.com/risingwavelabs/risingwave-connect-py.git
cd risingwave-connect-py

# Install with development dependencies
uv venv
source .venv/bin/activate
uv pip install -e .[dev]

# Run tests
pytest

# Format code
ruff format .
```

## Requirements

- Python ≥ 3.10
- RisingWave instance (local or cloud)
- PostgreSQL with CDC enabled
- Required Python packages: `psycopg[binary]`, `pydantic`

## License

Apache 2.0 License

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "risingwave-connect-py",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.10",
    "maintainer_email": null,
    "keywords": "cdc, data-connector, iceberg, postgresql, risingwave, risingwave-connect, risingwave-connect-py, risingwave_connect, risingwave_connect_py, s3, streaming",
    "author": null,
    "author_email": "Yuxuan Liao <yuxuan@risingwave-labs.com>",
    "download_url": "https://files.pythonhosted.org/packages/0d/fd/023f3502cbf0ee7fbab1e33a2a01a595be3f30a75964e589d88795b3c726/risingwave_connect_py-0.2.7.tar.gz",
    "platform": null,
    "description": "# RisingWave Connect\n\nA Python SDK for connecting to RisingWave with PostgreSQL CDC, automatic table discovery, and multiple sink destinations.\n\n## Features\n\n- **PostgreSQL CDC Integration**: Complete Change Data Capture support with automatic schema discovery\n- **Table-Level Filtering**: Optimized table selection with pattern matching and validation\n- **Column-Level Filtering**: Selective column replication with type control and primary key validation\n- **Multiple Sink Support**: Iceberg, S3, and PostgreSQL destinations\n- **Advanced CDC Configuration**: SSL, backfilling, publication management, and more\n\n## Installation\n\n```bash\n# Using uv (recommended)\nuv add risingwave-connect-py\n\n# Using pip\npip install risingwave-connect-py\n```\n\n## Quick Start\n\n```python\nfrom risingwave_connect import (\n    RisingWaveClient,\n    ConnectBuilder,\n    PostgreSQLConfig,\n    TableSelector\n)\n\n# Connect to RisingWave\nclient = RisingWaveClient(\"postgresql://root@localhost:4566/dev\")\n\n# Configure PostgreSQL CDC\nconfig = PostgreSQLConfig(\n    hostname=\"localhost\",\n    port=5432,\n    username=\"postgres\",\n    password=\"secret\",\n    database=\"mydb\",\n    auto_schema_change=True\n)\n\n# Create connector with table selection\nbuilder = ConnectBuilder(client)\nresult = builder.create_postgresql_connection(\n    config=config,\n    table_selector=TableSelector(include_patterns=[\"users\", \"orders\"])\n)\n\nprint(f\"Created CDC source with {len(result['selected_tables'])} tables\")\n```\n\n## Table Discovery and Selection\n\n### Discover Available Tables\n\n```python\n# Discover all available tables\navailable_tables = builder.discover_postgresql_tables(config)\n\nfor table in available_tables:\n    print(f\"{table.qualified_name} - {table.row_count} rows\")\n```\n\n### Table-Level Filtering\n\n```python\n# Select specific tables\ntable_selector = [\"users\", \"orders\", \"products\"]\n\n# Using TableSelector for specific tables\nfrom risingwave_connect.discovery.base import TableSelector\ntable_selector = TableSelector(specific_tables=[\"users\", \"orders\"])\n\n# Pattern-based selection (checks all tables, then filters)\ntable_selector = TableSelector(\n    include_patterns=[\"user_*\", \"order_*\"],\n    exclude_patterns=[\"*_temp\", \"*_backup\"]\n)\n\n# Include all tables except specific ones\ntable_selector = TableSelector(\n    include_all=True,\n    exclude_patterns=[\"temp_*\", \"backup_*\"]\n)\n```\n\n### Column-Level Filtering\n\nSelect specific columns, control types, and ensure primary key consistency.\n\n```python\nfrom risingwave_connect.discovery.base import (\n    TableColumnConfig, ColumnSelection, TableInfo\n)\n\n# Define table info\nusers_table = TableInfo(\n    schema_name=\"public\",\n    table_name=\"users\",\n    table_type=\"BASE TABLE\"\n)\n\n# Select specific columns with type control\nusers_columns = [\n    ColumnSelection(\n        column_name=\"id\",\n        is_primary_key=True,\n        risingwave_type=\"INT\"  # Override type if needed\n    ),\n    ColumnSelection(\n        column_name=\"name\",\n        risingwave_type=\"VARCHAR\",\n        is_nullable=False\n    ),\n    ColumnSelection(\n        column_name=\"email\",\n        risingwave_type=\"VARCHAR\"\n    ),\n    ColumnSelection(\n        column_name=\"created_at\",\n        risingwave_type=\"TIMESTAMP\"\n    )\n    # Note: Excluding sensitive columns like 'password_hash'\n]\n\n# Create table configuration\nusers_config = TableColumnConfig(\n    table_info=users_table,\n    selected_columns=users_columns,\n    custom_table_name=\"clean_users\"  # Optional: custom name in RisingWave\n)\n\n# Apply column filtering\ncolumn_configs = {\"users\": users_config}\n\nresult = builder.create_postgresql_connection(\n    config=postgres_config,\n    table_selector=[\"users\", \"orders\"],\n    column_configs=column_configs  # NEW parameter\n)\n```\n\n**Generated SQL with Column Filtering**:\n\n```sql\nCREATE TABLE clean_users (\n    id INT PRIMARY KEY,\n    name VARCHAR NOT NULL,\n    email VARCHAR,\n    created_at TIMESTAMP\n)\nFROM postgres_source TABLE 'public.users';\n```\n\n## PostgreSQL CDC Configuration\n\n```python\nconfig = PostgreSQLConfig(\n    # Connection details\n    hostname=\"localhost\",\n    port=5432,\n    username=\"postgres\",\n    password=\"secret\",\n    database=\"mydb\",\n    schema_name=\"public\",\n\n    # CDC settings\n    auto_schema_change=True,\n    publication_name=\"rw_publication\",\n    slot_name=\"rw_slot\",\n\n    # SSL configuration\n    ssl_mode=\"require\",\n    ssl_root_cert=\"/path/to/ca.pem\",\n\n    # Performance tuning\n    backfill_parallelism=\"8\",\n    backfill_num_rows_per_split=\"100000\",\n    backfill_as_even_splits=True\n)\n```\n\n### Schema Evolution\n\nRisingWave supports automatic schema changes for PostgreSQL CDC sources when `auto_schema_change=True` is enabled:\n\nFor detailed information about schema evolution capabilities and limitations, see the [RisingWave Schema Evolution Documentation](https://docs.risingwave.com/ingestion/sources/postgresql/pg-cdc#automatic-schema-changes).\n\n### Supported Data Types\n\nRisingWave supports a comprehensive set of PostgreSQL data types for CDC replication. The SDK automatically maps PostgreSQL types to compatible RisingWave types.\n\nFor the complete list of supported PostgreSQL data types and their RisingWave equivalents, see the [RisingWave Supported Data Types Documentation](https://docs.risingwave.com/ingestion/sources/postgresql/pg-cdc#supported-data-types).\n\n## Sink Destinations\n\n### Iceberg Data Lake\n\n```python\nfrom risingwave_connect import IcebergConfig\n\niceberg_config = IcebergConfig(\n    sink_name=\"analytics_lake\",\n    warehouse_path=\"s3://my-warehouse/\",\n    database_name=\"analytics\",\n    table_name=\"events\",\n    catalog_type=\"storage\",\n\n    # S3 configuration\n    s3_region=\"us-east-1\",\n    s3_access_key=\"your-access-key\",\n    s3_secret_key=\"your-secret-key\",\n\n    # Data type\n    data_type=\"append-only\",\n    force_append_only=True\n)\n\n# Create sink\nbuilder.create_sink(iceberg_config, [\"events\", \"users\"])\n```\n\n## Complete Connection Examples\n\n### Basic CDC with All Columns\n\n```python\n# Simple table selection with all columns\nresult = builder.create_postgresql_connection(\n    config=postgres_config,\n    table_selector=[\"users\", \"orders\", \"products\"]  # Fast: only checks these tables\n)\n\nselected_tables = [t.qualified_name for t in result['selected_tables']]\n```\n\n### Advanced CDC with Column Filtering\n\n```python\nfrom risingwave_connect.discovery.base import (\n    TableColumnConfig, ColumnSelection, TableInfo\n)\n\n# Configure selective columns for multiple tables\nusers_config = TableColumnConfig(\n    table_info=TableInfo(schema_name=\"public\", table_name=\"users\"),\n    selected_columns=[\n        ColumnSelection(column_name=\"id\", is_primary_key=True, risingwave_type=\"INT\"),\n        ColumnSelection(column_name=\"name\", risingwave_type=\"VARCHAR\", is_nullable=False),\n        ColumnSelection(column_name=\"email\", risingwave_type=\"VARCHAR\"),\n        ColumnSelection(column_name=\"created_at\", risingwave_type=\"TIMESTAMP\")\n    ],\n    custom_table_name=\"clean_users\"\n)\n\norders_config = TableColumnConfig(\n    table_info=TableInfo(schema_name=\"public\", table_name=\"orders\"),\n    selected_columns=[\n        ColumnSelection(column_name=\"order_id\", is_primary_key=True, risingwave_type=\"BIGINT\"),\n        ColumnSelection(column_name=\"user_id\", risingwave_type=\"INT\", is_nullable=False),\n        ColumnSelection(column_name=\"total_amount\", risingwave_type=\"DECIMAL(10,2)\"),\n        ColumnSelection(column_name=\"status\", risingwave_type=\"VARCHAR\"),\n        ColumnSelection(column_name=\"created_at\", risingwave_type=\"TIMESTAMP\")\n    ]\n)\n\n# Apply column configurations\ncolumn_configs = {\n    \"users\": users_config,\n    \"orders\": orders_config\n    # No config for 'products' - will include all columns\n}\n\n# Create CDC with column filtering\ncdc_result = builder.create_postgresql_connection(\n    config=postgres_config,\n    table_selector=[\"users\", \"orders\", \"products\"],\n    column_configs=column_configs\n)\n\n# Create sinks for filtered data\nselected_tables = [t.qualified_name for t in cdc_result['selected_tables']]\nbuilder.create_s3_sink(s3_config, selected_tables)\nbuilder.create_sink(iceberg_config, selected_tables)\n```\n\n### Multi-Destination Data Pipeline\n\n```python\n# 1. Set up CDC source with filtering\ncdc_result = builder.create_postgresql_connection(\n    config=postgres_config,\n    table_selector=TableSelector(include_patterns=[\"user_*\", \"order_*\"])\n)\n\nselected_tables = [t.qualified_name for t in cdc_result['selected_tables']]\n\n# 2. Create multiple data destinations\nbuilder.create_s3_sink(s3_config, selected_tables)  # Data lake\nbuilder.create_postgresql_sink(analytics_config, selected_tables)  # Analytics\nbuilder.create_sink(iceberg_config, selected_tables)  # Iceberg warehouse\n```\n\n## Examples\n\nThe `examples/` directory contains complete working examples:\n\n- **`postgres_cdc_iceberg_pipeline.py`** - End-to-end CDC to Iceberg pipeline\n- **`table_column_filtering_example.py`** - Comprehensive table and column filtering examples\n\n## Development\n\n```bash\n# Clone and set up development environment\ngit clone https://github.com/risingwavelabs/risingwave-connect-py.git\ncd risingwave-connect-py\n\n# Install with development dependencies\nuv venv\nsource .venv/bin/activate\nuv pip install -e .[dev]\n\n# Run tests\npytest\n\n# Format code\nruff format .\n```\n\n## Requirements\n\n- Python \u2265 3.10\n- RisingWave instance (local or cloud)\n- PostgreSQL with CDC enabled\n- Required Python packages: `psycopg[binary]`, `pydantic`\n\n## License\n\nApache 2.0 License\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "Python SDK for integrating RisingWave with various CDC sources (like PostgreSQL) and supporting multiple sink destinations such as Iceberg and S3.",
    "version": "0.2.7",
    "project_urls": {
        "Homepage": "https://github.com/risingwavelabs/risingwave-connect-py",
        "Issues": "https://github.com/risingwavelabs/risingwave-connect-py/issues",
        "Repository": "https://github.com/risingwavelabs/risingwave-connect-py"
    },
    "split_keywords": [
        "cdc",
        " data-connector",
        " iceberg",
        " postgresql",
        " risingwave",
        " risingwave-connect",
        " risingwave-connect-py",
        " risingwave_connect",
        " risingwave_connect_py",
        " s3",
        " streaming"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "c19b0250aab93c49a1ed95675c7dedeae2477a6088361b039d186724ec502205",
                "md5": "21a6ba843a21f4ac3dabff99258a48d8",
                "sha256": "14b6c5be6114649f23be4d69f74bfdc4873d011a641c56ea5fb139886f6833a2"
            },
            "downloads": -1,
            "filename": "risingwave_connect_py-0.2.7-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "21a6ba843a21f4ac3dabff99258a48d8",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.10",
            "size": 34453,
            "upload_time": "2025-08-29T07:24:27",
            "upload_time_iso_8601": "2025-08-29T07:24:27.540412Z",
            "url": "https://files.pythonhosted.org/packages/c1/9b/0250aab93c49a1ed95675c7dedeae2477a6088361b039d186724ec502205/risingwave_connect_py-0.2.7-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "0dfd023f3502cbf0ee7fbab1e33a2a01a595be3f30a75964e589d88795b3c726",
                "md5": "ad8eca3b425dbab43e5056f7cbe3c458",
                "sha256": "d9401cf41af938ecc616a11c252e443ac2f8531928a30c69b789285c2289cc2f"
            },
            "downloads": -1,
            "filename": "risingwave_connect_py-0.2.7.tar.gz",
            "has_sig": false,
            "md5_digest": "ad8eca3b425dbab43e5056f7cbe3c458",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.10",
            "size": 80548,
            "upload_time": "2025-08-29T07:24:29",
            "upload_time_iso_8601": "2025-08-29T07:24:29.202038Z",
            "url": "https://files.pythonhosted.org/packages/0d/fd/023f3502cbf0ee7fbab1e33a2a01a595be3f30a75964e589d88795b3c726/risingwave_connect_py-0.2.7.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-08-29 07:24:29",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "risingwavelabs",
    "github_project": "risingwave-connect-py",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "risingwave-connect-py"
}
        
Elapsed time: 1.05821s