# RisingWave Pipeline SDK
A Python SDK for building RisingWave data pipelines with PostgreSQL CDC, automatic table discovery, and multiple sink destinations.
## Features
- **PostgreSQL CDC Integration**: Complete Change Data Capture support with automatic schema discovery
- **Flexible Table Selection**: Pattern-based, interactive, or programmatic table selection
- **Multiple Sink Support**: Iceberg, S3, and PostgreSQL destinations
- **Advanced CDC Configuration**: SSL, backfilling, publication management, and more
- **SQL Generation**: Automatically generates optimized RisingWave SQL statements
## Installation
```bash
# Using uv (recommended)
uv add risingwave-pipeline-sdk
# Using pip
pip install risingwave-pipeline-sdk
```
## Quick Start
```python
from risingwave_pipeline_sdk import (
RisingWaveClient,
PipelineBuilder,
PostgreSQLConfig,
TableSelector
)
# Connect to RisingWave
client = RisingWaveClient("postgresql://root@localhost:4566/dev")
# Configure PostgreSQL CDC
config = PostgreSQLConfig(
hostname="localhost",
port=5432,
username="postgres",
password="secret",
database="mydb",
auto_schema_change=True
)
# Create pipeline with table selection
builder = PipelineBuilder(client)
result = builder.create_postgresql_pipeline(
config=config,
table_selector=TableSelector(include_patterns=["users", "orders"])
)
print(f"Created CDC source with {len(result['selected_tables'])} tables")
```
## Table Discovery and Selection
### Discover Available Tables
```python
# Discover all available tables
available_tables = builder.discover_postgresql_tables(config)
for table in available_tables:
print(f"{table.qualified_name} - {table.row_count} rows")
```
### Flexible Table Selection
```python
# Select specific tables
TableSelector(specific_tables=["users", "orders", "products"])
# Pattern-based selection
TableSelector(
include_patterns=["user_*", "order_*"],
exclude_patterns=["*_temp", "*_backup"]
)
# Include all tables except specific ones
TableSelector(
include_all=True,
exclude_patterns=["temp_*", "backup_*"]
)
```
## PostgreSQL CDC Configuration
```python
config = PostgreSQLConfig(
# Connection details
hostname="localhost",
port=5432,
username="postgres",
password="secret",
database="mydb",
schema_name="public",
# CDC settings
auto_schema_change=True,
publication_name="rw_publication",
slot_name="rw_slot",
# SSL configuration
ssl_mode="require",
ssl_root_cert="/path/to/ca.pem",
# Performance tuning
backfill_parallelism="8",
backfill_num_rows_per_split="100000",
backfill_as_even_splits=True
)
```
## Sink Destinations
### Iceberg Data Lake
```python
from risingwave_pipeline_sdk import IcebergConfig
iceberg_config = IcebergConfig(
sink_name="analytics_lake",
warehouse_path="s3://my-warehouse/",
database_name="analytics",
table_name="events",
catalog_type="storage",
# S3 configuration
s3_region="us-east-1",
s3_access_key="your-access-key",
s3_secret_key="your-secret-key",
# Data type
data_type="append-only",
force_append_only=True
)
# Create sink
builder.create_sink(iceberg_config, ["events", "users"])
```
### S3 Data Archive
```python
from risingwave_pipeline_sdk import S3Config
s3_config = S3Config(
sink_name="data_archive",
bucket_name="my-data-bucket",
path="raw-data/",
region_name="us-east-1",
access_key_id="your-access-key",
secret_access_key="your-secret-key",
# Format configuration
format_type="PLAIN",
encode_type="PARQUET"
)
builder.create_s3_sink(s3_config, ["users", "orders"])
```
### PostgreSQL Analytics Database
```python
from risingwave_pipeline_sdk import PostgreSQLSinkConfig
analytics_config = PostgreSQLSinkConfig(
sink_name="analytics_db",
hostname="analytics.example.com",
port=5432,
username="analytics_user",
password="password",
database="analytics",
postgres_schema="real_time"
)
# Create sink with custom transformations
custom_queries = {
"users": "SELECT id, name, email, created_at FROM users WHERE active = true",
"orders": "SELECT * FROM orders WHERE status != 'cancelled'"
}
builder.create_postgresql_sink(
analytics_config,
["users", "orders"],
select_queries=custom_queries
)
```
## Complete Pipeline Example
```python
# 1. Set up CDC source
cdc_result = builder.create_postgresql_pipeline(
config=postgres_config,
table_selector=TableSelector(include_patterns=["user_*", "order_*"])
)
selected_tables = [t.qualified_name for t in cdc_result['selected_tables']]
# 2. Create multiple sinks
builder.create_s3_sink(s3_config, selected_tables) # Data lake
builder.create_postgresql_sink(analytics_config, selected_tables) # Analytics
builder.create_sink(iceberg_config, selected_tables) # Iceberg warehouse
```
## Examples
The `examples/` directory contains complete working examples:
- **`postgres_cdc_iceberg_pipeline.py`** - End-to-end CDC to Iceberg pipeline
- **`interactive_discovery.py`** - Interactive table discovery and selection
- **`env_config_example.py`** - Environment variable based configuration
## Environment Configuration
Configure using environment variables for production deployments:
```bash
# RisingWave connection
export RW_HOST=localhost
export RW_PORT=4566
export RW_USER=root
export RW_DATABASE=dev
# PostgreSQL CDC source
export PG_HOST=localhost
export PG_PORT=5432
export PG_USER=postgres
export PG_PASSWORD=secret
export PG_DATABASE=mydb
# Table selection
export TABLE_PATTERNS="users,orders,products"
```
## Development
```bash
# Clone and set up development environment
git clone https://github.com/risingwavelabs/risingwave-pipeline-sdk.git
cd risingwave-pipeline-sdk
# Install with development dependencies
uv venv
source .venv/bin/activate
uv pip install -e .[dev]
# Run tests
pytest
# Format code
ruff format .
```
## Requirements
- Python ≥ 3.10
- RisingWave instance (local or cloud)
- PostgreSQL with CDC enabled
- Required Python packages: `psycopg[binary]`, `pydantic`
## License
Apache 2.0 License
Raw data
{
"_id": null,
"home_page": null,
"name": "risingwave-pipeline-sdk",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.10",
"maintainer_email": null,
"keywords": "cdc, data-pipeline, iceberg, postgresql, risingwave, risingwave-pipeline-sdk, risingwave_pipeline_sdk, s3, streaming",
"author": null,
"author_email": "Yuxuan Liao <yuxuan@risingwave-labs.com>",
"download_url": "https://files.pythonhosted.org/packages/f3/41/7e04b45bf5ab3be60501473f63f2d9c72cb3d2051ffccdf321fa627d07fe/risingwave_pipeline_sdk-0.1.1.tar.gz",
"platform": null,
"description": "# RisingWave Pipeline SDK\n\nA Python SDK for building RisingWave data pipelines with PostgreSQL CDC, automatic table discovery, and multiple sink destinations.\n\n## Features\n\n- **PostgreSQL CDC Integration**: Complete Change Data Capture support with automatic schema discovery\n- **Flexible Table Selection**: Pattern-based, interactive, or programmatic table selection\n- **Multiple Sink Support**: Iceberg, S3, and PostgreSQL destinations\n- **Advanced CDC Configuration**: SSL, backfilling, publication management, and more\n- **SQL Generation**: Automatically generates optimized RisingWave SQL statements\n\n## Installation\n\n```bash\n# Using uv (recommended)\nuv add risingwave-pipeline-sdk\n\n# Using pip\npip install risingwave-pipeline-sdk\n```\n\n## Quick Start\n\n```python\nfrom risingwave_pipeline_sdk import (\n RisingWaveClient,\n PipelineBuilder,\n PostgreSQLConfig,\n TableSelector\n)\n\n# Connect to RisingWave\nclient = RisingWaveClient(\"postgresql://root@localhost:4566/dev\")\n\n# Configure PostgreSQL CDC\nconfig = PostgreSQLConfig(\n hostname=\"localhost\",\n port=5432,\n username=\"postgres\",\n password=\"secret\",\n database=\"mydb\",\n auto_schema_change=True\n)\n\n# Create pipeline with table selection\nbuilder = PipelineBuilder(client)\nresult = builder.create_postgresql_pipeline(\n config=config,\n table_selector=TableSelector(include_patterns=[\"users\", \"orders\"])\n)\n\nprint(f\"Created CDC source with {len(result['selected_tables'])} tables\")\n```\n\n## Table Discovery and Selection\n\n### Discover Available Tables\n\n```python\n# Discover all available tables\navailable_tables = builder.discover_postgresql_tables(config)\n\nfor table in available_tables:\n print(f\"{table.qualified_name} - {table.row_count} rows\")\n```\n\n### Flexible Table Selection\n\n```python\n# Select specific tables\nTableSelector(specific_tables=[\"users\", \"orders\", \"products\"])\n\n# Pattern-based selection\nTableSelector(\n include_patterns=[\"user_*\", \"order_*\"],\n exclude_patterns=[\"*_temp\", \"*_backup\"]\n)\n\n# Include all tables except specific ones\nTableSelector(\n include_all=True,\n exclude_patterns=[\"temp_*\", \"backup_*\"]\n)\n```\n\n## PostgreSQL CDC Configuration\n\n```python\nconfig = PostgreSQLConfig(\n # Connection details\n hostname=\"localhost\",\n port=5432,\n username=\"postgres\",\n password=\"secret\",\n database=\"mydb\",\n schema_name=\"public\",\n\n # CDC settings\n auto_schema_change=True,\n publication_name=\"rw_publication\",\n slot_name=\"rw_slot\",\n\n # SSL configuration\n ssl_mode=\"require\",\n ssl_root_cert=\"/path/to/ca.pem\",\n\n # Performance tuning\n backfill_parallelism=\"8\",\n backfill_num_rows_per_split=\"100000\",\n backfill_as_even_splits=True\n)\n```\n\n## Sink Destinations\n\n### Iceberg Data Lake\n\n```python\nfrom risingwave_pipeline_sdk import IcebergConfig\n\niceberg_config = IcebergConfig(\n sink_name=\"analytics_lake\",\n warehouse_path=\"s3://my-warehouse/\",\n database_name=\"analytics\",\n table_name=\"events\",\n catalog_type=\"storage\",\n\n # S3 configuration\n s3_region=\"us-east-1\",\n s3_access_key=\"your-access-key\",\n s3_secret_key=\"your-secret-key\",\n\n # Data type\n data_type=\"append-only\",\n force_append_only=True\n)\n\n# Create sink\nbuilder.create_sink(iceberg_config, [\"events\", \"users\"])\n```\n\n### S3 Data Archive\n\n```python\nfrom risingwave_pipeline_sdk import S3Config\n\ns3_config = S3Config(\n sink_name=\"data_archive\",\n bucket_name=\"my-data-bucket\",\n path=\"raw-data/\",\n region_name=\"us-east-1\",\n access_key_id=\"your-access-key\",\n secret_access_key=\"your-secret-key\",\n\n # Format configuration\n format_type=\"PLAIN\",\n encode_type=\"PARQUET\"\n)\n\nbuilder.create_s3_sink(s3_config, [\"users\", \"orders\"])\n```\n\n### PostgreSQL Analytics Database\n\n```python\nfrom risingwave_pipeline_sdk import PostgreSQLSinkConfig\n\nanalytics_config = PostgreSQLSinkConfig(\n sink_name=\"analytics_db\",\n hostname=\"analytics.example.com\",\n port=5432,\n username=\"analytics_user\",\n password=\"password\",\n database=\"analytics\",\n postgres_schema=\"real_time\"\n)\n\n# Create sink with custom transformations\ncustom_queries = {\n \"users\": \"SELECT id, name, email, created_at FROM users WHERE active = true\",\n \"orders\": \"SELECT * FROM orders WHERE status != 'cancelled'\"\n}\n\nbuilder.create_postgresql_sink(\n analytics_config,\n [\"users\", \"orders\"],\n select_queries=custom_queries\n)\n```\n\n## Complete Pipeline Example\n\n```python\n# 1. Set up CDC source\ncdc_result = builder.create_postgresql_pipeline(\n config=postgres_config,\n table_selector=TableSelector(include_patterns=[\"user_*\", \"order_*\"])\n)\n\nselected_tables = [t.qualified_name for t in cdc_result['selected_tables']]\n\n# 2. Create multiple sinks\nbuilder.create_s3_sink(s3_config, selected_tables) # Data lake\nbuilder.create_postgresql_sink(analytics_config, selected_tables) # Analytics\nbuilder.create_sink(iceberg_config, selected_tables) # Iceberg warehouse\n```\n\n## Examples\n\nThe `examples/` directory contains complete working examples:\n\n- **`postgres_cdc_iceberg_pipeline.py`** - End-to-end CDC to Iceberg pipeline\n- **`interactive_discovery.py`** - Interactive table discovery and selection\n- **`env_config_example.py`** - Environment variable based configuration\n\n## Environment Configuration\n\nConfigure using environment variables for production deployments:\n\n```bash\n# RisingWave connection\nexport RW_HOST=localhost\nexport RW_PORT=4566\nexport RW_USER=root\nexport RW_DATABASE=dev\n\n# PostgreSQL CDC source\nexport PG_HOST=localhost\nexport PG_PORT=5432\nexport PG_USER=postgres\nexport PG_PASSWORD=secret\nexport PG_DATABASE=mydb\n\n# Table selection\nexport TABLE_PATTERNS=\"users,orders,products\"\n```\n\n## Development\n\n```bash\n# Clone and set up development environment\ngit clone https://github.com/risingwavelabs/risingwave-pipeline-sdk.git\ncd risingwave-pipeline-sdk\n\n# Install with development dependencies\nuv venv\nsource .venv/bin/activate\nuv pip install -e .[dev]\n\n# Run tests\npytest\n\n# Format code\nruff format .\n```\n\n## Requirements\n\n- Python \u2265 3.10\n- RisingWave instance (local or cloud)\n- PostgreSQL with CDC enabled\n- Required Python packages: `psycopg[binary]`, `pydantic`\n\n## License\n\nApache 2.0 License\n",
"bugtrack_url": null,
"license": null,
"summary": "A Python SDK for building RisingWave data pipelines with PostgreSQL CDC and multiple sink destinations",
"version": "0.1.1",
"project_urls": {
"Homepage": "https://github.com/risingwavelabs/risingwave-pipeline-sdk",
"Issues": "https://github.com/risingwavelabs/risingwave-pipeline-sdk/issues",
"Repository": "https://github.com/risingwavelabs/risingwave-pipeline-sdk"
},
"split_keywords": [
"cdc",
" data-pipeline",
" iceberg",
" postgresql",
" risingwave",
" risingwave-pipeline-sdk",
" risingwave_pipeline_sdk",
" s3",
" streaming"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "a67f47eb2de6b4411e9d7d005fe64b00168b77792278dbe0f04802acbed9190c",
"md5": "ff0a8956b40ef245363b1292e83d661e",
"sha256": "7826ff46f64cd33b2b4147fb115004c681e4e680ef8768cd8c3b5f1d41dc77aa"
},
"downloads": -1,
"filename": "risingwave_pipeline_sdk-0.1.1-py3-none-any.whl",
"has_sig": false,
"md5_digest": "ff0a8956b40ef245363b1292e83d661e",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.10",
"size": 29102,
"upload_time": "2025-08-27T03:34:43",
"upload_time_iso_8601": "2025-08-27T03:34:43.716716Z",
"url": "https://files.pythonhosted.org/packages/a6/7f/47eb2de6b4411e9d7d005fe64b00168b77792278dbe0f04802acbed9190c/risingwave_pipeline_sdk-0.1.1-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "f3417e04b45bf5ab3be60501473f63f2d9c72cb3d2051ffccdf321fa627d07fe",
"md5": "49512aeec03aee25cd459d33ce6072c1",
"sha256": "3463b7a76f30c0f41c2dd9ed48ba4860b440be9941f6820dcc8fdfead29cf367"
},
"downloads": -1,
"filename": "risingwave_pipeline_sdk-0.1.1.tar.gz",
"has_sig": false,
"md5_digest": "49512aeec03aee25cd459d33ce6072c1",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.10",
"size": 75031,
"upload_time": "2025-08-27T03:34:45",
"upload_time_iso_8601": "2025-08-27T03:34:45.207320Z",
"url": "https://files.pythonhosted.org/packages/f3/41/7e04b45bf5ab3be60501473f63f2d9c72cb3d2051ffccdf321fa627d07fe/risingwave_pipeline_sdk-0.1.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-08-27 03:34:45",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "risingwavelabs",
"github_project": "risingwave-pipeline-sdk",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "risingwave-pipeline-sdk"
}