pydiverse-pipedag


Namepydiverse-pipedag JSON
Version 0.10.2 PyPI version JSON
download
home_pageNone
SummaryA pipeline orchestration library executing tasks within one python session. It takes care of SQL table (de)materialization, caching and cache invalidation. Blob storage is supported as well for example for storing model files.
upload_time2025-07-08 12:14:31
maintainerNone
docs_urlNone
authorQuantCo, Inc.
requires_python<3.14,>=3.10.18
licenseBSD 3-Clause License Copyright (c) 2022, pydiverse All rights reserved. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # pydiverse.pipedag

[![CI](https://github.com/pydiverse/pydiverse.pipedag/actions/workflows/tests.yml/badge.svg)](https://github.com/pydiverse/pydiverse.pipedag/actions/workflows/tests.yml)

A pipeline orchestration library executing tasks within one python session. It takes care of SQL table
(de)materialization, caching and cache invalidation. Blob storage is supported as well for example
for storing model files.

This is an early stage version 0.x, however, it is already used in real projects. We are happy to receive your
feedback as [issues](https://github.com/pydiverse/pydiverse.pipedag/issues) on the GitHub repo. Feel free to also
comment on existing issues to extend them to your needs or to add solution ideas.

## Usage

pydiverse.pipedag can either be installed via pypi with `pip install pydiverse-pipedag duckdb duckdb-engine` or via
conda-forge with `conda install pydiverse-pipedag duckdb duckdb-engine -c conda-forge`. If you don't use duckdb for
testing, you can obmit it here. However, it is needed to run the following example.

## Example

A flow can look like this (i.e. put this in a file named `run_pipeline.py`):

```python
import tempfile

import pandas as pd
import sqlalchemy as sa

from pydiverse.pipedag import Flow, Stage, Table, materialize
from pydiverse.pipedag.context import StageLockContext
from pydiverse.pipedag.core.config import create_basic_pipedag_config
from pydiverse.common.util.structlog import setup_logging


@materialize(lazy=True)
def lazy_task_1():
    return sa.select(
        sa.literal(1).label("x"),
        sa.literal(2).label("y"),
    )


@materialize(lazy=True, input_type=sa.Table)
def lazy_task_2(input1: sa.sql.expression.Alias, input2: sa.sql.expression.Alias):
    query = sa.select(
        (input1.c.x * 5).label("x5"),
        input2.c.a,
    ).select_from(input1.outerjoin(input2, input2.c.x == input1.c.x))

    return Table(query, name="task_2_out", primary_key=["a"])


@materialize(lazy=True, input_type=sa.Table)
def lazy_task_3(input1: sa.sql.expression.Alias):
    return sa.text(f"SELECT * FROM {input1.original.schema}.{input1.original.name}")


@materialize(lazy=True, input_type=sa.Table)
def lazy_task_4(input1: sa.sql.expression.Alias):
    return sa.text(f"SELECT * FROM {input1.original.schema}.{input1.original.name}")


@materialize(nout=2, version="1.0.0")
def eager_inputs():
    dfA = pd.DataFrame(
        {
            "a": [0, 1, 2, 4],
            "b": [9, 8, 7, 6],
        }
    )
    dfB = pd.DataFrame(
        {
            "a": [2, 1, 0, 1],
            "x": [1, 1, 2, 2],
        }
    )
    return Table(dfA, "dfA"), Table(dfB, "dfB_%%")


@materialize(version="1.0.0", input_type=pd.DataFrame)
def eager_task(tbl1: pd.DataFrame, tbl2: pd.DataFrame):
    return tbl1.merge(tbl2, on="x")


def main():
    with tempfile.TemporaryDirectory() as temp_dir:
        cfg = create_basic_pipedag_config(
            f"duckdb:///{temp_dir}/db.duckdb",
            disable_stage_locking=True,  # This is special for duckdb
            # Attention: If uncommented, stage and task names might be sent to the following URL.
            #   You can self-host kroki if you like:
            #   https://docs.kroki.io/kroki/setup/install/
            # kroki_url="https://kroki.io",
        ).get("default")
        with cfg:
            with Flow() as f:
                with Stage("stage_1"):
                    lazy_1 = lazy_task_1()
                    a, b = eager_inputs()

                with Stage("stage_2"):
                    lazy_2 = lazy_task_2(lazy_1, b)
                    lazy_3 = lazy_task_3(lazy_2)
                    eager = eager_task(lazy_1, b)

                with Stage("stage_3"):
                    lazy_4 = lazy_task_4(lazy_2)
                _ = lazy_3, lazy_4, eager  # unused terminal output tables

            # Run flow
            result = f.run()
            assert result.successful

            # Run in a different way for testing
            with StageLockContext():
                result = f.run()
                assert result.successful
                assert result.get(lazy_1, as_type=pd.DataFrame)["x"][0] == 1


if __name__ == "__main__":
    setup_logging()  # you can set up the logging and/or structlog libraries as you wish
    main()
```
For SQLAlchemy >= 2.0, you can use sa.Alias instead of sa.sql.expression.Alias.

The `with tempfile.TemporaryDirectory()` is only needed to have an OS independent temporary directory available.
You can also get rid of it like this:

```python
def main():
    cfg = create_basic_pipedag_config(
        "duckdb:////tmp/pipedag/{instance_id}/db.duckdb",
        disable_stage_locking=True,  # This is special for duckdb
    ).get("default")
    ...
```

For a more sophisticated setup with a `pipedag.yaml` configuration file and with a separate database
(i.e. containerized Postgres), please look [here](https://pydiversepipedag.readthedocs.io/en/latest/database_testing.html).

## Troubleshooting

### Installing mssql odbc driver for linux

Installing with
instructions [here](https://docs.microsoft.com/en-us/sql/connect/odbc/linux-mac/installing-the-microsoft-odbc-driver-for-sql-server)
worked.
But `odbcinst -j` revealed that it installed the configuration in `/etc/unixODBC/*`. But conda installed pyodbc brings
its own `odbcinst` executable and that shows odbc config files are expected in `/etc/*`. Symlinks were enough to fix the
problem. Try `python -c 'import pyodbc;print(pyodbc.drivers())'` and see whether you get more than an empty list.
Furthermore, make sure you use 127.0.0.1 instead of localhost. It seems that /etc/hosts is ignored.

### Incompatibility with specific `pydiverse.transform` Versions

pydiverse.pipedag currently doesn't support `pydiverse.transform` Versions (0.2.0, 0.2.1, 0.2.2), due to major
differences to pdt 0.2.3 and pdt <0.2.
However, it does still work with pdt <0.2.

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "pydiverse-pipedag",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<3.14,>=3.10.18",
    "maintainer_email": null,
    "keywords": null,
    "author": "QuantCo, Inc.",
    "author_email": "Nicolas Camenisch <garnele007@gmail.com>, Finn Rudolph <finn.rudolph@t-online.de>, Martin Trautmann <windiana@users.sf.net>",
    "download_url": "https://files.pythonhosted.org/packages/b6/83/c647fb2c1718e95e55791eb1bf76aafe36fcaacb50feef027a9246f8123a/pydiverse_pipedag-0.10.2.tar.gz",
    "platform": null,
    "description": "# pydiverse.pipedag\n\n[![CI](https://github.com/pydiverse/pydiverse.pipedag/actions/workflows/tests.yml/badge.svg)](https://github.com/pydiverse/pydiverse.pipedag/actions/workflows/tests.yml)\n\nA pipeline orchestration library executing tasks within one python session. It takes care of SQL table\n(de)materialization, caching and cache invalidation. Blob storage is supported as well for example\nfor storing model files.\n\nThis is an early stage version 0.x, however, it is already used in real projects. We are happy to receive your\nfeedback as [issues](https://github.com/pydiverse/pydiverse.pipedag/issues) on the GitHub repo. Feel free to also\ncomment on existing issues to extend them to your needs or to add solution ideas.\n\n## Usage\n\npydiverse.pipedag can either be installed via pypi with `pip install pydiverse-pipedag duckdb duckdb-engine` or via\nconda-forge with `conda install pydiverse-pipedag duckdb duckdb-engine -c conda-forge`. If you don't use duckdb for\ntesting, you can obmit it here. However, it is needed to run the following example.\n\n## Example\n\nA flow can look like this (i.e. put this in a file named `run_pipeline.py`):\n\n```python\nimport tempfile\n\nimport pandas as pd\nimport sqlalchemy as sa\n\nfrom pydiverse.pipedag import Flow, Stage, Table, materialize\nfrom pydiverse.pipedag.context import StageLockContext\nfrom pydiverse.pipedag.core.config import create_basic_pipedag_config\nfrom pydiverse.common.util.structlog import setup_logging\n\n\n@materialize(lazy=True)\ndef lazy_task_1():\n    return sa.select(\n        sa.literal(1).label(\"x\"),\n        sa.literal(2).label(\"y\"),\n    )\n\n\n@materialize(lazy=True, input_type=sa.Table)\ndef lazy_task_2(input1: sa.sql.expression.Alias, input2: sa.sql.expression.Alias):\n    query = sa.select(\n        (input1.c.x * 5).label(\"x5\"),\n        input2.c.a,\n    ).select_from(input1.outerjoin(input2, input2.c.x == input1.c.x))\n\n    return Table(query, name=\"task_2_out\", primary_key=[\"a\"])\n\n\n@materialize(lazy=True, input_type=sa.Table)\ndef lazy_task_3(input1: sa.sql.expression.Alias):\n    return sa.text(f\"SELECT * FROM {input1.original.schema}.{input1.original.name}\")\n\n\n@materialize(lazy=True, input_type=sa.Table)\ndef lazy_task_4(input1: sa.sql.expression.Alias):\n    return sa.text(f\"SELECT * FROM {input1.original.schema}.{input1.original.name}\")\n\n\n@materialize(nout=2, version=\"1.0.0\")\ndef eager_inputs():\n    dfA = pd.DataFrame(\n        {\n            \"a\": [0, 1, 2, 4],\n            \"b\": [9, 8, 7, 6],\n        }\n    )\n    dfB = pd.DataFrame(\n        {\n            \"a\": [2, 1, 0, 1],\n            \"x\": [1, 1, 2, 2],\n        }\n    )\n    return Table(dfA, \"dfA\"), Table(dfB, \"dfB_%%\")\n\n\n@materialize(version=\"1.0.0\", input_type=pd.DataFrame)\ndef eager_task(tbl1: pd.DataFrame, tbl2: pd.DataFrame):\n    return tbl1.merge(tbl2, on=\"x\")\n\n\ndef main():\n    with tempfile.TemporaryDirectory() as temp_dir:\n        cfg = create_basic_pipedag_config(\n            f\"duckdb:///{temp_dir}/db.duckdb\",\n            disable_stage_locking=True,  # This is special for duckdb\n            # Attention: If uncommented, stage and task names might be sent to the following URL.\n            #   You can self-host kroki if you like:\n            #   https://docs.kroki.io/kroki/setup/install/\n            # kroki_url=\"https://kroki.io\",\n        ).get(\"default\")\n        with cfg:\n            with Flow() as f:\n                with Stage(\"stage_1\"):\n                    lazy_1 = lazy_task_1()\n                    a, b = eager_inputs()\n\n                with Stage(\"stage_2\"):\n                    lazy_2 = lazy_task_2(lazy_1, b)\n                    lazy_3 = lazy_task_3(lazy_2)\n                    eager = eager_task(lazy_1, b)\n\n                with Stage(\"stage_3\"):\n                    lazy_4 = lazy_task_4(lazy_2)\n                _ = lazy_3, lazy_4, eager  # unused terminal output tables\n\n            # Run flow\n            result = f.run()\n            assert result.successful\n\n            # Run in a different way for testing\n            with StageLockContext():\n                result = f.run()\n                assert result.successful\n                assert result.get(lazy_1, as_type=pd.DataFrame)[\"x\"][0] == 1\n\n\nif __name__ == \"__main__\":\n    setup_logging()  # you can set up the logging and/or structlog libraries as you wish\n    main()\n```\nFor SQLAlchemy >= 2.0, you can use sa.Alias instead of sa.sql.expression.Alias.\n\nThe `with tempfile.TemporaryDirectory()` is only needed to have an OS independent temporary directory available.\nYou can also get rid of it like this:\n\n```python\ndef main():\n    cfg = create_basic_pipedag_config(\n        \"duckdb:////tmp/pipedag/{instance_id}/db.duckdb\",\n        disable_stage_locking=True,  # This is special for duckdb\n    ).get(\"default\")\n    ...\n```\n\nFor a more sophisticated setup with a `pipedag.yaml` configuration file and with a separate database\n(i.e. containerized Postgres), please look [here](https://pydiversepipedag.readthedocs.io/en/latest/database_testing.html).\n\n## Troubleshooting\n\n### Installing mssql odbc driver for linux\n\nInstalling with\ninstructions [here](https://docs.microsoft.com/en-us/sql/connect/odbc/linux-mac/installing-the-microsoft-odbc-driver-for-sql-server)\nworked.\nBut `odbcinst -j` revealed that it installed the configuration in `/etc/unixODBC/*`. But conda installed pyodbc brings\nits own `odbcinst` executable and that shows odbc config files are expected in `/etc/*`. Symlinks were enough to fix the\nproblem. Try `python -c 'import pyodbc;print(pyodbc.drivers())'` and see whether you get more than an empty list.\nFurthermore, make sure you use 127.0.0.1 instead of localhost. It seems that /etc/hosts is ignored.\n\n### Incompatibility with specific `pydiverse.transform` Versions\n\npydiverse.pipedag currently doesn't support `pydiverse.transform` Versions (0.2.0, 0.2.1, 0.2.2), due to major\ndifferences to pdt 0.2.3 and pdt <0.2.\nHowever, it does still work with pdt <0.2.\n",
    "bugtrack_url": null,
    "license": "BSD 3-Clause License\n        \n        Copyright (c) 2022, pydiverse\n        All rights reserved.\n        \n        Redistribution and use in source and binary forms, with or without\n        modification, are permitted provided that the following conditions are met:\n        \n        1. Redistributions of source code must retain the above copyright notice, this\n           list of conditions and the following disclaimer.\n        \n        2. Redistributions in binary form must reproduce the above copyright notice,\n           this list of conditions and the following disclaimer in the documentation\n           and/or other materials provided with the distribution.\n        \n        3. Neither the name of the copyright holder nor the names of its\n           contributors may be used to endorse or promote products derived from\n           this software without specific prior written permission.\n        \n        THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS \"AS IS\"\n        AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE\n        IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE\n        DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE\n        FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL\n        DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR\n        SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER\n        CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,\n        OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE\n        OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.",
    "summary": "A pipeline orchestration library executing tasks within one python session. It takes care of SQL table (de)materialization, caching and cache invalidation. Blob storage is supported as well for example for storing model files.",
    "version": "0.10.2",
    "project_urls": null,
    "split_keywords": [],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "1ad0d28f72def92e62c553d2d45bfee5fc3c9dce1cb72cfaefed202ce7547c2b",
                "md5": "e990d5054e2e9dbd9198b7a301064de3",
                "sha256": "ba240d313f5dd105aafc514488d76e1b2845b4aabf9df7b3bc93c6c92c328e0e"
            },
            "downloads": -1,
            "filename": "pydiverse_pipedag-0.10.2-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "e990d5054e2e9dbd9198b7a301064de3",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<3.14,>=3.10.18",
            "size": 196725,
            "upload_time": "2025-07-08T12:14:30",
            "upload_time_iso_8601": "2025-07-08T12:14:30.277579Z",
            "url": "https://files.pythonhosted.org/packages/1a/d0/d28f72def92e62c553d2d45bfee5fc3c9dce1cb72cfaefed202ce7547c2b/pydiverse_pipedag-0.10.2-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "b683c647fb2c1718e95e55791eb1bf76aafe36fcaacb50feef027a9246f8123a",
                "md5": "41a7e787c0ff19881c5f8f7c6bf3f7e3",
                "sha256": "f4817a3b01f6f28b80c388c2e8be3e9f61b48911b1ca49fb996d5211d76da5ff"
            },
            "downloads": -1,
            "filename": "pydiverse_pipedag-0.10.2.tar.gz",
            "has_sig": false,
            "md5_digest": "41a7e787c0ff19881c5f8f7c6bf3f7e3",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<3.14,>=3.10.18",
            "size": 966021,
            "upload_time": "2025-07-08T12:14:31",
            "upload_time_iso_8601": "2025-07-08T12:14:31.899702Z",
            "url": "https://files.pythonhosted.org/packages/b6/83/c647fb2c1718e95e55791eb1bf76aafe36fcaacb50feef027a9246f8123a/pydiverse_pipedag-0.10.2.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-07-08 12:14:31",
    "github": false,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "lcname": "pydiverse-pipedag"
}
        
Elapsed time: 0.97685s