dagster-postgres-pandas


Namedagster-postgres-pandas JSON
Version 0.2.3 PyPI version JSON
download
home_pageNone
SummaryPostgreSQL I/O manager for Dagster with Pandas DataFrame support
upload_time2025-07-19 07:25:07
maintainerNone
docs_urlNone
authorNone
requires_python>=3.9
licenseNone
keywords dagster data-engineering io-manager pandas postgresql
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Dagster Postgres Pandas I/O Manager

[![PyPI version](https://badge.fury.io/py/dagster-postgres-pandas.svg)](https://badge.fury.io/py/dagster-postgres-pandas)
[![Python 3.10+](https://img.shields.io/badge/python-3.10+-blue.svg)](https://www.python.org/downloads/)
[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)

A robust PostgreSQL I/O manager for [Dagster](https://dagster.io/) with Pandas DataFrame support. This package provides seamless integration between Dagster assets and PostgreSQL databases, featuring dynamic schema selection, automatic schema creation, and comprehensive error handling.

## Features

-   🚀 **Easy Integration**: Drop-in replacement for Dagster's built-in I/O managers
-   🎯 **Dynamic Schema Selection**: Flexible schema assignment per asset using metadata
-   📊 **Pandas Native**: Optimized for Pandas DataFrames with chunked operations
-   🔧 **Auto Schema Creation**: Automatically creates schemas when they don't exist
-   🛡️ **Robust Error Handling**: Comprehensive error messages and connection management
-   ⚡ **Performance Optimized**: Connection pooling and efficient bulk operations
-   🔒 **Production Ready**: Timeout handling, connection retries, and detailed logging

## Installation

```bash
uv add dagster-postgres-pandas
```

### Requirements

-   Python 3.10+
-   Dagster 1.8.0+
-   Pandas 2.1.0+
-   PostgreSQL database

## Quick Start

### 1. Set up your environment

```bash
export POSTGRES_CONNECTION_STRING="postgresql://user:password@localhost:5432/database"
```

### 2. Basic usage

```python
import dagster as dg
import pandas as pd
from dagster_postgres_pandas import PostgresPandasIOManager

# Define your assets
@dg.asset
def raw_data() -> pd.DataFrame:
    """Load raw data."""
    return pd.DataFrame({
        'id': [1, 2, 3],
        'name': ['Alice', 'Bob', 'Charlie'],
        'value': [100, 200, 300]
    })

@dg.asset
def processed_data(raw_data: pd.DataFrame) -> pd.DataFrame:
    """Process the raw data."""
    return raw_data.assign(value_doubled=raw_data['value'] * 2)

# Configure Dagster
defs = dg.Definitions(
    assets=[raw_data, processed_data],
    resources={
        "io_manager": PostgresPandasIOManager(
            connection_string=dg.EnvVar("POSTGRES_CONNECTION_STRING")
        )
    }
)
```

### 3. Run your pipeline

```bash
dg dev
```

or

```bash
dagster dev
```

Your DataFrames will be automatically stored in PostgreSQL tables and loaded when needed by downstream assets.

## Configuration

### Basic Configuration

```python
from dagster_postgres_pandas import PostgresPandasIOManager

io_manager = PostgresPandasIOManager(
    connection_string="postgresql://user:password@localhost:5432/database",
    default_schema="analytics",
    if_exists="replace",
    index=False,
    timeout=30
)
```

### Configuration Options

| Parameter           | Type   | Default     | Description                                              |
| ------------------- | ------ | ----------- | -------------------------------------------------------- |
| `connection_string` | `str`  | Required    | PostgreSQL connection string                             |
| `default_schema`    | `str`  | `"public"`  | Default schema for assets                                |
| `if_exists`         | `str`  | `"replace"` | Behavior when table exists (`fail`, `replace`, `append`) |
| `index`             | `bool` | `False`     | Whether to store DataFrame index                         |
| `chunk_size`        | `int`  | `None`      | Number of rows to insert at once (None for all at once)  |
| `timeout`           | `int`  | `30`        | Connection timeout in seconds                            |

### Using Environment Variables (Recommended)

```python
from dagster_postgres_pandas import PostgresPandasIOManager
import dagster as dg

# Recommended approach for production
io_manager = PostgresPandasIOManager(
    connection_string=dg.EnvVar("POSTGRES_CONNECTION_STRING"),
    default_schema="analytics"
)
```

## Advanced Usage

### Schema Management

#### Per-Asset Schema Configuration

```python
@dg.asset(
    metadata={"schema": "analytics"}
)
def sales_data() -> pd.DataFrame:
    """This asset will be stored in the 'analytics' schema."""
    return pd.DataFrame({"sales": [100, 200, 300]})

@dg.asset(
    metadata={"schema": "raw"}
)
def raw_sales_data() -> pd.DataFrame:
    """This asset will be stored in the 'raw' schema."""
    return pd.DataFrame({"raw_sales": [95, 205, 295]})
```

#### Schema Priority

The I/O manager determines the schema in this order:

1. **Asset metadata**: `metadata={"schema": "schema_name"}`
2. **Resource configuration**: `schema` parameter in resource config
3. **Default schema**: `default_schema` parameter

### Large DataFrames

For large DataFrames, use chunked operations:

```python
io_manager = PostgresPandasIOManager(
    connection_string=dg.EnvVar("POSTGRES_CONNECTION_STRING"),
    chunk_size=10000,  # Insert 10k rows at a time
    timeout=120  # Longer timeout for large operations
)
```

## Connection String Format

PostgreSQL connection strings can be formatted in several ways:

```python
# Basic format
"postgresql://username:password@host:port/database"

# With SSL
"postgresql://username:password@host:port/database?sslmode=require"

# With additional parameters
"postgresql://username:password@host:port/database?sslmode=require&connect_timeout=30"

# Environment variable (recommended for production)
connection_string=dg.EnvVar("POSTGRES_CONNECTION_STRING")
```

## Error Handling

The package provides specific exceptions for different error conditions:

```python
from dagster_postgres_pandas import (
    PostgresIOManagerError,
    SchemaNotFoundError,
    ConnectionError,
    InvalidConfigurationError
)

try:
    # Your Dagster code
    pass
except SchemaNotFoundError:
    # Handle missing table/schema
    print("Required table doesn't exist. Make sure upstream assets are materialized.")
except ConnectionError:
    # Handle database connection issues
    print("Could not connect to PostgreSQL database.")
except PostgresIOManagerError:
    # Handle other I/O manager errors
    print("General I/O manager error occurred.")
```

## Examples

### Multi-Schema Pipeline

```python
import dagster as dg
import pandas as pd
from dagster_postgres_pandas import PostgresPandasIOManager

@dg.asset(metadata={"schema": "raw"})
def raw_users() -> pd.DataFrame:
    """Load raw user data."""
    return pd.DataFrame({
        'user_id': [1, 2, 3],
        'name': ['Alice', 'Bob', 'Charlie'],
        'email': ['alice@example.com', 'bob@example.com', 'charlie@example.com']
    })

@dg.asset(metadata={"schema": "staging"})
def staged_users(raw_users: pd.DataFrame) -> pd.DataFrame:
    """Clean and validate user data."""
    return raw_users.dropna().copy()

@dg.asset(metadata={"schema": "analytics"})
def user_analytics(staged_users: pd.DataFrame) -> pd.DataFrame:
    """Generate user analytics."""
    return staged_users.assign(
        name_length=staged_users['name'].str.len(),
        email_domain=staged_users['email'].str.split('@').str[1]
    )

defs = dg.Definitions(
    assets=[raw_users, staged_users, user_analytics],
    resources={
        "io_manager": PostgresPandasIOManager(
            connection_string=dg.EnvVar("POSTGRES_CONNECTION_STRING"),
            default_schema="public"
        )
    }
)
```

### Time Series Data with Append Mode

```python
from datetime import datetime, timedelta
import pandas as pd
import dagster as dg
from dagster_postgres_pandas import PostgresPandasIOManager

@dg.asset(metadata={"schema": "timeseries"})
def daily_metrics() -> pd.DataFrame:
    """Generate daily metrics that should be appended, not replaced."""
    today = datetime.now().date()
    return pd.DataFrame({
        'date': [today - timedelta(days=i) for i in range(3)],
        'metric_value': [100, 110, 95],
        'metric_name': ['sales', 'sales', 'sales']
    })

# Configure I/O manager for append mode
defs = dg.Definitions(
    assets=[daily_metrics],
    resources={
        "io_manager": PostgresPandasIOManager(
            connection_string=dg.EnvVar("POSTGRES_CONNECTION_STRING"),
            if_exists="append"
        )
    }
)
```

### Different I/O Managers per Asset Group

```python
from dagster_postgres_pandas import PostgresPandasIOManager

# Different configurations for different asset groups
raw_io_manager = PostgresPandasIOManager(
    connection_string=dg.EnvVar("POSTGRES_CONNECTION_STRING"),
    default_schema="raw",
    if_exists="replace"
)

analytics_io_manager = PostgresPandasIOManager(
    connection_string=dg.EnvVar("POSTGRES_CONNECTION_STRING"),
    default_schema="analytics",
    if_exists="replace",
    index=True
)

@dg.asset(io_manager_key="raw_io_manager")
def raw_data() -> pd.DataFrame:
    return pd.DataFrame({"value": [1, 2, 3]})

@dg.asset(io_manager_key="analytics_io_manager")
def processed_data(raw_data: pd.DataFrame) -> pd.DataFrame:
    return raw_data * 2

defs = dg.Definitions(
    assets=[raw_data, processed_data],
    resources={
        "raw_io_manager": raw_io_manager,
        "analytics_io_manager": analytics_io_manager
    }
)
```

## Development

### Setting up Development Environment

```bash
# Clone the repository
git clone https://github.com/klemensgraf/dagster-postgres-pandas.git
cd dagster-postgres-pandas

# Create virtual environment & install dev and test dependencies
uv sync --extra dev --extra test

# Run linting
ruff check .
ruff format .
```

### Running Tests

This project uses pytest for testing. To run the tests:

```bash
# Install test dependencies
uv sync --extra test
# or
uv sync --extra dev

# Run all tests
pytest

# Run with coverage report
pytest --cov=dagster_postgres_pandas

# Run only unit tests
pytest tests/unit/
```

### Code Quality

This project uses several tools to ensure code quality:

-   **Ruff**: Linting and formatting (replaces Black, isort, flake8, and mypy)
-   **Pytest**: Unit tests

```bash
# Run all quality checks
ruff check .
ruff format --check .

# Fix linting issues automatically
ruff check --fix .

# Run all tests
pytest
```

## Troubleshooting

### Common Issues

**Connection refused errors:**

-   Ensure PostgreSQL is running
-   Check connection string format
-   Verify network connectivity and firewall settings

**Schema not found errors:**

-   The I/O manager automatically creates schemas, but ensure you have CREATE privileges
-   Check that the upstream asset has been materialized

**Large DataFrame performance:**

-   Use `chunk_size` parameter for large DataFrames
-   Increase `timeout` for long-running operations
-   Consider using connection pooling parameters

**Import errors:**

-   Ensure all dependencies are installed: `pip install -e ".[dev]"`
-   Check Python version compatibility (3.10+)

## Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

### Development Process

1. Create an issue describing the bug or new feature, mention if you're open to be assigned to it
1. Wait for getting assigned to the issue
1. Fork the repository
1. Create your feature branch (`git checkout -b feature/amazing-feature`)
1. Make your changes
1. Add tests for your changes
1. Run the test suite (`pytest`)
1. Run code quality checks
1. Commit your changes (`git commit -m 'Add amazing feature'`)
1. Push to the branch (`git push origin feature/amazing-feature`)
1. Open a Pull Request

### Reporting Issues

When reporting issues, please include:

-   Python version
-   Dagster version
-   Database version
-   Complete error traceback
-   Minimal example to reproduce the issue

## License

This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.

## Support

If you encounter any issues or have questions:

-   **GitHub Issues**: [Open an issue](https://github.com/klemensgraf/dagster-postgres-pandas/issues)
-   **Dagster Community**: Join the [Dagster Slack](https://dagster.io/slack)
-   **Documentation**: Check the [Dagster documentation](https://docs.dagster.io/)

## Acknowledgments

-   Built on top of the excellent [Dagster](https://dagster.io/) framework
-   Powered by [Pandas](https://pandas.pydata.org/) and [SQLAlchemy](https://www.sqlalchemy.org/)
-   Inspired by the Dagster community's need for robust database I/O solutions

---

Made by [Klemens Graf](https://github.com/klemensgraf)

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "dagster-postgres-pandas",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.9",
    "maintainer_email": "Klemens Graf <klemensgraf15@gmail.com>",
    "keywords": "dagster, data-engineering, io-manager, pandas, postgresql",
    "author": null,
    "author_email": "Klemens Graf <klemensgraf15@gmail.com>",
    "download_url": "https://files.pythonhosted.org/packages/3a/4b/1345cbf788f4969daaecd0cd84a64bad4d10cea70e7b97b98d4c7f7c9a30/dagster_postgres_pandas-0.2.3.tar.gz",
    "platform": null,
    "description": "# Dagster Postgres Pandas I/O Manager\n\n[![PyPI version](https://badge.fury.io/py/dagster-postgres-pandas.svg)](https://badge.fury.io/py/dagster-postgres-pandas)\n[![Python 3.10+](https://img.shields.io/badge/python-3.10+-blue.svg)](https://www.python.org/downloads/)\n[![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT)\n\nA robust PostgreSQL I/O manager for [Dagster](https://dagster.io/) with Pandas DataFrame support. This package provides seamless integration between Dagster assets and PostgreSQL databases, featuring dynamic schema selection, automatic schema creation, and comprehensive error handling.\n\n## Features\n\n-   \ud83d\ude80 **Easy Integration**: Drop-in replacement for Dagster's built-in I/O managers\n-   \ud83c\udfaf **Dynamic Schema Selection**: Flexible schema assignment per asset using metadata\n-   \ud83d\udcca **Pandas Native**: Optimized for Pandas DataFrames with chunked operations\n-   \ud83d\udd27 **Auto Schema Creation**: Automatically creates schemas when they don't exist\n-   \ud83d\udee1\ufe0f **Robust Error Handling**: Comprehensive error messages and connection management\n-   \u26a1 **Performance Optimized**: Connection pooling and efficient bulk operations\n-   \ud83d\udd12 **Production Ready**: Timeout handling, connection retries, and detailed logging\n\n## Installation\n\n```bash\nuv add dagster-postgres-pandas\n```\n\n### Requirements\n\n-   Python 3.10+\n-   Dagster 1.8.0+\n-   Pandas 2.1.0+\n-   PostgreSQL database\n\n## Quick Start\n\n### 1. Set up your environment\n\n```bash\nexport POSTGRES_CONNECTION_STRING=\"postgresql://user:password@localhost:5432/database\"\n```\n\n### 2. Basic usage\n\n```python\nimport dagster as dg\nimport pandas as pd\nfrom dagster_postgres_pandas import PostgresPandasIOManager\n\n# Define your assets\n@dg.asset\ndef raw_data() -> pd.DataFrame:\n    \"\"\"Load raw data.\"\"\"\n    return pd.DataFrame({\n        'id': [1, 2, 3],\n        'name': ['Alice', 'Bob', 'Charlie'],\n        'value': [100, 200, 300]\n    })\n\n@dg.asset\ndef processed_data(raw_data: pd.DataFrame) -> pd.DataFrame:\n    \"\"\"Process the raw data.\"\"\"\n    return raw_data.assign(value_doubled=raw_data['value'] * 2)\n\n# Configure Dagster\ndefs = dg.Definitions(\n    assets=[raw_data, processed_data],\n    resources={\n        \"io_manager\": PostgresPandasIOManager(\n            connection_string=dg.EnvVar(\"POSTGRES_CONNECTION_STRING\")\n        )\n    }\n)\n```\n\n### 3. Run your pipeline\n\n```bash\ndg dev\n```\n\nor\n\n```bash\ndagster dev\n```\n\nYour DataFrames will be automatically stored in PostgreSQL tables and loaded when needed by downstream assets.\n\n## Configuration\n\n### Basic Configuration\n\n```python\nfrom dagster_postgres_pandas import PostgresPandasIOManager\n\nio_manager = PostgresPandasIOManager(\n    connection_string=\"postgresql://user:password@localhost:5432/database\",\n    default_schema=\"analytics\",\n    if_exists=\"replace\",\n    index=False,\n    timeout=30\n)\n```\n\n### Configuration Options\n\n| Parameter           | Type   | Default     | Description                                              |\n| ------------------- | ------ | ----------- | -------------------------------------------------------- |\n| `connection_string` | `str`  | Required    | PostgreSQL connection string                             |\n| `default_schema`    | `str`  | `\"public\"`  | Default schema for assets                                |\n| `if_exists`         | `str`  | `\"replace\"` | Behavior when table exists (`fail`, `replace`, `append`) |\n| `index`             | `bool` | `False`     | Whether to store DataFrame index                         |\n| `chunk_size`        | `int`  | `None`      | Number of rows to insert at once (None for all at once)  |\n| `timeout`           | `int`  | `30`        | Connection timeout in seconds                            |\n\n### Using Environment Variables (Recommended)\n\n```python\nfrom dagster_postgres_pandas import PostgresPandasIOManager\nimport dagster as dg\n\n# Recommended approach for production\nio_manager = PostgresPandasIOManager(\n    connection_string=dg.EnvVar(\"POSTGRES_CONNECTION_STRING\"),\n    default_schema=\"analytics\"\n)\n```\n\n## Advanced Usage\n\n### Schema Management\n\n#### Per-Asset Schema Configuration\n\n```python\n@dg.asset(\n    metadata={\"schema\": \"analytics\"}\n)\ndef sales_data() -> pd.DataFrame:\n    \"\"\"This asset will be stored in the 'analytics' schema.\"\"\"\n    return pd.DataFrame({\"sales\": [100, 200, 300]})\n\n@dg.asset(\n    metadata={\"schema\": \"raw\"}\n)\ndef raw_sales_data() -> pd.DataFrame:\n    \"\"\"This asset will be stored in the 'raw' schema.\"\"\"\n    return pd.DataFrame({\"raw_sales\": [95, 205, 295]})\n```\n\n#### Schema Priority\n\nThe I/O manager determines the schema in this order:\n\n1. **Asset metadata**: `metadata={\"schema\": \"schema_name\"}`\n2. **Resource configuration**: `schema` parameter in resource config\n3. **Default schema**: `default_schema` parameter\n\n### Large DataFrames\n\nFor large DataFrames, use chunked operations:\n\n```python\nio_manager = PostgresPandasIOManager(\n    connection_string=dg.EnvVar(\"POSTGRES_CONNECTION_STRING\"),\n    chunk_size=10000,  # Insert 10k rows at a time\n    timeout=120  # Longer timeout for large operations\n)\n```\n\n## Connection String Format\n\nPostgreSQL connection strings can be formatted in several ways:\n\n```python\n# Basic format\n\"postgresql://username:password@host:port/database\"\n\n# With SSL\n\"postgresql://username:password@host:port/database?sslmode=require\"\n\n# With additional parameters\n\"postgresql://username:password@host:port/database?sslmode=require&connect_timeout=30\"\n\n# Environment variable (recommended for production)\nconnection_string=dg.EnvVar(\"POSTGRES_CONNECTION_STRING\")\n```\n\n## Error Handling\n\nThe package provides specific exceptions for different error conditions:\n\n```python\nfrom dagster_postgres_pandas import (\n    PostgresIOManagerError,\n    SchemaNotFoundError,\n    ConnectionError,\n    InvalidConfigurationError\n)\n\ntry:\n    # Your Dagster code\n    pass\nexcept SchemaNotFoundError:\n    # Handle missing table/schema\n    print(\"Required table doesn't exist. Make sure upstream assets are materialized.\")\nexcept ConnectionError:\n    # Handle database connection issues\n    print(\"Could not connect to PostgreSQL database.\")\nexcept PostgresIOManagerError:\n    # Handle other I/O manager errors\n    print(\"General I/O manager error occurred.\")\n```\n\n## Examples\n\n### Multi-Schema Pipeline\n\n```python\nimport dagster as dg\nimport pandas as pd\nfrom dagster_postgres_pandas import PostgresPandasIOManager\n\n@dg.asset(metadata={\"schema\": \"raw\"})\ndef raw_users() -> pd.DataFrame:\n    \"\"\"Load raw user data.\"\"\"\n    return pd.DataFrame({\n        'user_id': [1, 2, 3],\n        'name': ['Alice', 'Bob', 'Charlie'],\n        'email': ['alice@example.com', 'bob@example.com', 'charlie@example.com']\n    })\n\n@dg.asset(metadata={\"schema\": \"staging\"})\ndef staged_users(raw_users: pd.DataFrame) -> pd.DataFrame:\n    \"\"\"Clean and validate user data.\"\"\"\n    return raw_users.dropna().copy()\n\n@dg.asset(metadata={\"schema\": \"analytics\"})\ndef user_analytics(staged_users: pd.DataFrame) -> pd.DataFrame:\n    \"\"\"Generate user analytics.\"\"\"\n    return staged_users.assign(\n        name_length=staged_users['name'].str.len(),\n        email_domain=staged_users['email'].str.split('@').str[1]\n    )\n\ndefs = dg.Definitions(\n    assets=[raw_users, staged_users, user_analytics],\n    resources={\n        \"io_manager\": PostgresPandasIOManager(\n            connection_string=dg.EnvVar(\"POSTGRES_CONNECTION_STRING\"),\n            default_schema=\"public\"\n        )\n    }\n)\n```\n\n### Time Series Data with Append Mode\n\n```python\nfrom datetime import datetime, timedelta\nimport pandas as pd\nimport dagster as dg\nfrom dagster_postgres_pandas import PostgresPandasIOManager\n\n@dg.asset(metadata={\"schema\": \"timeseries\"})\ndef daily_metrics() -> pd.DataFrame:\n    \"\"\"Generate daily metrics that should be appended, not replaced.\"\"\"\n    today = datetime.now().date()\n    return pd.DataFrame({\n        'date': [today - timedelta(days=i) for i in range(3)],\n        'metric_value': [100, 110, 95],\n        'metric_name': ['sales', 'sales', 'sales']\n    })\n\n# Configure I/O manager for append mode\ndefs = dg.Definitions(\n    assets=[daily_metrics],\n    resources={\n        \"io_manager\": PostgresPandasIOManager(\n            connection_string=dg.EnvVar(\"POSTGRES_CONNECTION_STRING\"),\n            if_exists=\"append\"\n        )\n    }\n)\n```\n\n### Different I/O Managers per Asset Group\n\n```python\nfrom dagster_postgres_pandas import PostgresPandasIOManager\n\n# Different configurations for different asset groups\nraw_io_manager = PostgresPandasIOManager(\n    connection_string=dg.EnvVar(\"POSTGRES_CONNECTION_STRING\"),\n    default_schema=\"raw\",\n    if_exists=\"replace\"\n)\n\nanalytics_io_manager = PostgresPandasIOManager(\n    connection_string=dg.EnvVar(\"POSTGRES_CONNECTION_STRING\"),\n    default_schema=\"analytics\",\n    if_exists=\"replace\",\n    index=True\n)\n\n@dg.asset(io_manager_key=\"raw_io_manager\")\ndef raw_data() -> pd.DataFrame:\n    return pd.DataFrame({\"value\": [1, 2, 3]})\n\n@dg.asset(io_manager_key=\"analytics_io_manager\")\ndef processed_data(raw_data: pd.DataFrame) -> pd.DataFrame:\n    return raw_data * 2\n\ndefs = dg.Definitions(\n    assets=[raw_data, processed_data],\n    resources={\n        \"raw_io_manager\": raw_io_manager,\n        \"analytics_io_manager\": analytics_io_manager\n    }\n)\n```\n\n## Development\n\n### Setting up Development Environment\n\n```bash\n# Clone the repository\ngit clone https://github.com/klemensgraf/dagster-postgres-pandas.git\ncd dagster-postgres-pandas\n\n# Create virtual environment & install dev and test dependencies\nuv sync --extra dev --extra test\n\n# Run linting\nruff check .\nruff format .\n```\n\n### Running Tests\n\nThis project uses pytest for testing. To run the tests:\n\n```bash\n# Install test dependencies\nuv sync --extra test\n# or\nuv sync --extra dev\n\n# Run all tests\npytest\n\n# Run with coverage report\npytest --cov=dagster_postgres_pandas\n\n# Run only unit tests\npytest tests/unit/\n```\n\n### Code Quality\n\nThis project uses several tools to ensure code quality:\n\n-   **Ruff**: Linting and formatting (replaces Black, isort, flake8, and mypy)\n-   **Pytest**: Unit tests\n\n```bash\n# Run all quality checks\nruff check .\nruff format --check .\n\n# Fix linting issues automatically\nruff check --fix .\n\n# Run all tests\npytest\n```\n\n## Troubleshooting\n\n### Common Issues\n\n**Connection refused errors:**\n\n-   Ensure PostgreSQL is running\n-   Check connection string format\n-   Verify network connectivity and firewall settings\n\n**Schema not found errors:**\n\n-   The I/O manager automatically creates schemas, but ensure you have CREATE privileges\n-   Check that the upstream asset has been materialized\n\n**Large DataFrame performance:**\n\n-   Use `chunk_size` parameter for large DataFrames\n-   Increase `timeout` for long-running operations\n-   Consider using connection pooling parameters\n\n**Import errors:**\n\n-   Ensure all dependencies are installed: `pip install -e \".[dev]\"`\n-   Check Python version compatibility (3.10+)\n\n## Contributing\n\nContributions are welcome! Please feel free to submit a Pull Request.\n\n### Development Process\n\n1. Create an issue describing the bug or new feature, mention if you're open to be assigned to it\n1. Wait for getting assigned to the issue\n1. Fork the repository\n1. Create your feature branch (`git checkout -b feature/amazing-feature`)\n1. Make your changes\n1. Add tests for your changes\n1. Run the test suite (`pytest`)\n1. Run code quality checks\n1. Commit your changes (`git commit -m 'Add amazing feature'`)\n1. Push to the branch (`git push origin feature/amazing-feature`)\n1. Open a Pull Request\n\n### Reporting Issues\n\nWhen reporting issues, please include:\n\n-   Python version\n-   Dagster version\n-   Database version\n-   Complete error traceback\n-   Minimal example to reproduce the issue\n\n## License\n\nThis project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.\n\n## Support\n\nIf you encounter any issues or have questions:\n\n-   **GitHub Issues**: [Open an issue](https://github.com/klemensgraf/dagster-postgres-pandas/issues)\n-   **Dagster Community**: Join the [Dagster Slack](https://dagster.io/slack)\n-   **Documentation**: Check the [Dagster documentation](https://docs.dagster.io/)\n\n## Acknowledgments\n\n-   Built on top of the excellent [Dagster](https://dagster.io/) framework\n-   Powered by [Pandas](https://pandas.pydata.org/) and [SQLAlchemy](https://www.sqlalchemy.org/)\n-   Inspired by the Dagster community's need for robust database I/O solutions\n\n---\n\nMade by [Klemens Graf](https://github.com/klemensgraf)\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "PostgreSQL I/O manager for Dagster with Pandas DataFrame support",
    "version": "0.2.3",
    "project_urls": {
        "Bug Tracker": "https://github.com/klemensgraf/dagster-postgres-pandas/issues",
        "Changelog": "https://github.com/klemensgraf/dagster-postgres-pandas/blob/main/CHANGELOG.md",
        "Documentation": "https://github.com/klemensgraf/dagster-postgres-pandas/blob/main/README.md",
        "Homepage": "https://github.com/klemensgraf/dagster-postgres-pandas",
        "Repository": "https://github.com/klemensgraf/dagster-postgres-pandas.git"
    },
    "split_keywords": [
        "dagster",
        " data-engineering",
        " io-manager",
        " pandas",
        " postgresql"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "9420e28322ed7dbce9e93a246c2dffb27fb030bf4224727cb01f3a281573e5af",
                "md5": "22dc3541863d7c1e12fab18fcc78ff34",
                "sha256": "9f583e360f3fd399eb2ce50cbb8f7a5cba06b3d12b22ef0704e0f55e7be41521"
            },
            "downloads": -1,
            "filename": "dagster_postgres_pandas-0.2.3-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "22dc3541863d7c1e12fab18fcc78ff34",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.9",
            "size": 11282,
            "upload_time": "2025-07-19T07:25:05",
            "upload_time_iso_8601": "2025-07-19T07:25:05.691255Z",
            "url": "https://files.pythonhosted.org/packages/94/20/e28322ed7dbce9e93a246c2dffb27fb030bf4224727cb01f3a281573e5af/dagster_postgres_pandas-0.2.3-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "3a4b1345cbf788f4969daaecd0cd84a64bad4d10cea70e7b97b98d4c7f7c9a30",
                "md5": "181dcb79bfd2abacb8f252942a5da4c1",
                "sha256": "1b0d1324b0df0343d4e99cae5104387dc3c5223713be5e219b0530bdb39337fb"
            },
            "downloads": -1,
            "filename": "dagster_postgres_pandas-0.2.3.tar.gz",
            "has_sig": false,
            "md5_digest": "181dcb79bfd2abacb8f252942a5da4c1",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.9",
            "size": 15453,
            "upload_time": "2025-07-19T07:25:07",
            "upload_time_iso_8601": "2025-07-19T07:25:07.626286Z",
            "url": "https://files.pythonhosted.org/packages/3a/4b/1345cbf788f4969daaecd0cd84a64bad4d10cea70e7b97b98d4c7f7c9a30/dagster_postgres_pandas-0.2.3.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-07-19 07:25:07",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "klemensgraf",
    "github_project": "dagster-postgres-pandas",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": false,
    "lcname": "dagster-postgres-pandas"
}
        
Elapsed time: 1.23927s