Name | pydiverse-pipedag JSON |
Version |
0.10.2
JSON |
| download |
home_page | None |
Summary | A pipeline orchestration library executing tasks within one python session. It takes care of SQL table (de)materialization, caching and cache invalidation. Blob storage is supported as well for example for storing model files. |
upload_time | 2025-07-08 12:14:31 |
maintainer | None |
docs_url | None |
author | QuantCo, Inc. |
requires_python | <3.14,>=3.10.18 |
license | BSD 3-Clause License
Copyright (c) 2022, pydiverse
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
3. Neither the name of the copyright holder nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
keywords |
|
VCS |
|
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
# pydiverse.pipedag
[](https://github.com/pydiverse/pydiverse.pipedag/actions/workflows/tests.yml)
A pipeline orchestration library executing tasks within one python session. It takes care of SQL table
(de)materialization, caching and cache invalidation. Blob storage is supported as well for example
for storing model files.
This is an early stage version 0.x, however, it is already used in real projects. We are happy to receive your
feedback as [issues](https://github.com/pydiverse/pydiverse.pipedag/issues) on the GitHub repo. Feel free to also
comment on existing issues to extend them to your needs or to add solution ideas.
## Usage
pydiverse.pipedag can either be installed via pypi with `pip install pydiverse-pipedag duckdb duckdb-engine` or via
conda-forge with `conda install pydiverse-pipedag duckdb duckdb-engine -c conda-forge`. If you don't use duckdb for
testing, you can obmit it here. However, it is needed to run the following example.
## Example
A flow can look like this (i.e. put this in a file named `run_pipeline.py`):
```python
import tempfile
import pandas as pd
import sqlalchemy as sa
from pydiverse.pipedag import Flow, Stage, Table, materialize
from pydiverse.pipedag.context import StageLockContext
from pydiverse.pipedag.core.config import create_basic_pipedag_config
from pydiverse.common.util.structlog import setup_logging
@materialize(lazy=True)
def lazy_task_1():
return sa.select(
sa.literal(1).label("x"),
sa.literal(2).label("y"),
)
@materialize(lazy=True, input_type=sa.Table)
def lazy_task_2(input1: sa.sql.expression.Alias, input2: sa.sql.expression.Alias):
query = sa.select(
(input1.c.x * 5).label("x5"),
input2.c.a,
).select_from(input1.outerjoin(input2, input2.c.x == input1.c.x))
return Table(query, name="task_2_out", primary_key=["a"])
@materialize(lazy=True, input_type=sa.Table)
def lazy_task_3(input1: sa.sql.expression.Alias):
return sa.text(f"SELECT * FROM {input1.original.schema}.{input1.original.name}")
@materialize(lazy=True, input_type=sa.Table)
def lazy_task_4(input1: sa.sql.expression.Alias):
return sa.text(f"SELECT * FROM {input1.original.schema}.{input1.original.name}")
@materialize(nout=2, version="1.0.0")
def eager_inputs():
dfA = pd.DataFrame(
{
"a": [0, 1, 2, 4],
"b": [9, 8, 7, 6],
}
)
dfB = pd.DataFrame(
{
"a": [2, 1, 0, 1],
"x": [1, 1, 2, 2],
}
)
return Table(dfA, "dfA"), Table(dfB, "dfB_%%")
@materialize(version="1.0.0", input_type=pd.DataFrame)
def eager_task(tbl1: pd.DataFrame, tbl2: pd.DataFrame):
return tbl1.merge(tbl2, on="x")
def main():
with tempfile.TemporaryDirectory() as temp_dir:
cfg = create_basic_pipedag_config(
f"duckdb:///{temp_dir}/db.duckdb",
disable_stage_locking=True, # This is special for duckdb
# Attention: If uncommented, stage and task names might be sent to the following URL.
# You can self-host kroki if you like:
# https://docs.kroki.io/kroki/setup/install/
# kroki_url="https://kroki.io",
).get("default")
with cfg:
with Flow() as f:
with Stage("stage_1"):
lazy_1 = lazy_task_1()
a, b = eager_inputs()
with Stage("stage_2"):
lazy_2 = lazy_task_2(lazy_1, b)
lazy_3 = lazy_task_3(lazy_2)
eager = eager_task(lazy_1, b)
with Stage("stage_3"):
lazy_4 = lazy_task_4(lazy_2)
_ = lazy_3, lazy_4, eager # unused terminal output tables
# Run flow
result = f.run()
assert result.successful
# Run in a different way for testing
with StageLockContext():
result = f.run()
assert result.successful
assert result.get(lazy_1, as_type=pd.DataFrame)["x"][0] == 1
if __name__ == "__main__":
setup_logging() # you can set up the logging and/or structlog libraries as you wish
main()
```
For SQLAlchemy >= 2.0, you can use sa.Alias instead of sa.sql.expression.Alias.
The `with tempfile.TemporaryDirectory()` is only needed to have an OS independent temporary directory available.
You can also get rid of it like this:
```python
def main():
cfg = create_basic_pipedag_config(
"duckdb:////tmp/pipedag/{instance_id}/db.duckdb",
disable_stage_locking=True, # This is special for duckdb
).get("default")
...
```
For a more sophisticated setup with a `pipedag.yaml` configuration file and with a separate database
(i.e. containerized Postgres), please look [here](https://pydiversepipedag.readthedocs.io/en/latest/database_testing.html).
## Troubleshooting
### Installing mssql odbc driver for linux
Installing with
instructions [here](https://docs.microsoft.com/en-us/sql/connect/odbc/linux-mac/installing-the-microsoft-odbc-driver-for-sql-server)
worked.
But `odbcinst -j` revealed that it installed the configuration in `/etc/unixODBC/*`. But conda installed pyodbc brings
its own `odbcinst` executable and that shows odbc config files are expected in `/etc/*`. Symlinks were enough to fix the
problem. Try `python -c 'import pyodbc;print(pyodbc.drivers())'` and see whether you get more than an empty list.
Furthermore, make sure you use 127.0.0.1 instead of localhost. It seems that /etc/hosts is ignored.
### Incompatibility with specific `pydiverse.transform` Versions
pydiverse.pipedag currently doesn't support `pydiverse.transform` Versions (0.2.0, 0.2.1, 0.2.2), due to major
differences to pdt 0.2.3 and pdt <0.2.
However, it does still work with pdt <0.2.
Raw data
{
"_id": null,
"home_page": null,
"name": "pydiverse-pipedag",
"maintainer": null,
"docs_url": null,
"requires_python": "<3.14,>=3.10.18",
"maintainer_email": null,
"keywords": null,
"author": "QuantCo, Inc.",
"author_email": "Nicolas Camenisch <garnele007@gmail.com>, Finn Rudolph <finn.rudolph@t-online.de>, Martin Trautmann <windiana@users.sf.net>",
"download_url": "https://files.pythonhosted.org/packages/b6/83/c647fb2c1718e95e55791eb1bf76aafe36fcaacb50feef027a9246f8123a/pydiverse_pipedag-0.10.2.tar.gz",
"platform": null,
"description": "# pydiverse.pipedag\n\n[](https://github.com/pydiverse/pydiverse.pipedag/actions/workflows/tests.yml)\n\nA pipeline orchestration library executing tasks within one python session. It takes care of SQL table\n(de)materialization, caching and cache invalidation. Blob storage is supported as well for example\nfor storing model files.\n\nThis is an early stage version 0.x, however, it is already used in real projects. We are happy to receive your\nfeedback as [issues](https://github.com/pydiverse/pydiverse.pipedag/issues) on the GitHub repo. Feel free to also\ncomment on existing issues to extend them to your needs or to add solution ideas.\n\n## Usage\n\npydiverse.pipedag can either be installed via pypi with `pip install pydiverse-pipedag duckdb duckdb-engine` or via\nconda-forge with `conda install pydiverse-pipedag duckdb duckdb-engine -c conda-forge`. If you don't use duckdb for\ntesting, you can obmit it here. However, it is needed to run the following example.\n\n## Example\n\nA flow can look like this (i.e. put this in a file named `run_pipeline.py`):\n\n```python\nimport tempfile\n\nimport pandas as pd\nimport sqlalchemy as sa\n\nfrom pydiverse.pipedag import Flow, Stage, Table, materialize\nfrom pydiverse.pipedag.context import StageLockContext\nfrom pydiverse.pipedag.core.config import create_basic_pipedag_config\nfrom pydiverse.common.util.structlog import setup_logging\n\n\n@materialize(lazy=True)\ndef lazy_task_1():\n return sa.select(\n sa.literal(1).label(\"x\"),\n sa.literal(2).label(\"y\"),\n )\n\n\n@materialize(lazy=True, input_type=sa.Table)\ndef lazy_task_2(input1: sa.sql.expression.Alias, input2: sa.sql.expression.Alias):\n query = sa.select(\n (input1.c.x * 5).label(\"x5\"),\n input2.c.a,\n ).select_from(input1.outerjoin(input2, input2.c.x == input1.c.x))\n\n return Table(query, name=\"task_2_out\", primary_key=[\"a\"])\n\n\n@materialize(lazy=True, input_type=sa.Table)\ndef lazy_task_3(input1: sa.sql.expression.Alias):\n return sa.text(f\"SELECT * FROM {input1.original.schema}.{input1.original.name}\")\n\n\n@materialize(lazy=True, input_type=sa.Table)\ndef lazy_task_4(input1: sa.sql.expression.Alias):\n return sa.text(f\"SELECT * FROM {input1.original.schema}.{input1.original.name}\")\n\n\n@materialize(nout=2, version=\"1.0.0\")\ndef eager_inputs():\n dfA = pd.DataFrame(\n {\n \"a\": [0, 1, 2, 4],\n \"b\": [9, 8, 7, 6],\n }\n )\n dfB = pd.DataFrame(\n {\n \"a\": [2, 1, 0, 1],\n \"x\": [1, 1, 2, 2],\n }\n )\n return Table(dfA, \"dfA\"), Table(dfB, \"dfB_%%\")\n\n\n@materialize(version=\"1.0.0\", input_type=pd.DataFrame)\ndef eager_task(tbl1: pd.DataFrame, tbl2: pd.DataFrame):\n return tbl1.merge(tbl2, on=\"x\")\n\n\ndef main():\n with tempfile.TemporaryDirectory() as temp_dir:\n cfg = create_basic_pipedag_config(\n f\"duckdb:///{temp_dir}/db.duckdb\",\n disable_stage_locking=True, # This is special for duckdb\n # Attention: If uncommented, stage and task names might be sent to the following URL.\n # You can self-host kroki if you like:\n # https://docs.kroki.io/kroki/setup/install/\n # kroki_url=\"https://kroki.io\",\n ).get(\"default\")\n with cfg:\n with Flow() as f:\n with Stage(\"stage_1\"):\n lazy_1 = lazy_task_1()\n a, b = eager_inputs()\n\n with Stage(\"stage_2\"):\n lazy_2 = lazy_task_2(lazy_1, b)\n lazy_3 = lazy_task_3(lazy_2)\n eager = eager_task(lazy_1, b)\n\n with Stage(\"stage_3\"):\n lazy_4 = lazy_task_4(lazy_2)\n _ = lazy_3, lazy_4, eager # unused terminal output tables\n\n # Run flow\n result = f.run()\n assert result.successful\n\n # Run in a different way for testing\n with StageLockContext():\n result = f.run()\n assert result.successful\n assert result.get(lazy_1, as_type=pd.DataFrame)[\"x\"][0] == 1\n\n\nif __name__ == \"__main__\":\n setup_logging() # you can set up the logging and/or structlog libraries as you wish\n main()\n```\nFor SQLAlchemy >= 2.0, you can use sa.Alias instead of sa.sql.expression.Alias.\n\nThe `with tempfile.TemporaryDirectory()` is only needed to have an OS independent temporary directory available.\nYou can also get rid of it like this:\n\n```python\ndef main():\n cfg = create_basic_pipedag_config(\n \"duckdb:////tmp/pipedag/{instance_id}/db.duckdb\",\n disable_stage_locking=True, # This is special for duckdb\n ).get(\"default\")\n ...\n```\n\nFor a more sophisticated setup with a `pipedag.yaml` configuration file and with a separate database\n(i.e. containerized Postgres), please look [here](https://pydiversepipedag.readthedocs.io/en/latest/database_testing.html).\n\n## Troubleshooting\n\n### Installing mssql odbc driver for linux\n\nInstalling with\ninstructions [here](https://docs.microsoft.com/en-us/sql/connect/odbc/linux-mac/installing-the-microsoft-odbc-driver-for-sql-server)\nworked.\nBut `odbcinst -j` revealed that it installed the configuration in `/etc/unixODBC/*`. But conda installed pyodbc brings\nits own `odbcinst` executable and that shows odbc config files are expected in `/etc/*`. Symlinks were enough to fix the\nproblem. Try `python -c 'import pyodbc;print(pyodbc.drivers())'` and see whether you get more than an empty list.\nFurthermore, make sure you use 127.0.0.1 instead of localhost. It seems that /etc/hosts is ignored.\n\n### Incompatibility with specific `pydiverse.transform` Versions\n\npydiverse.pipedag currently doesn't support `pydiverse.transform` Versions (0.2.0, 0.2.1, 0.2.2), due to major\ndifferences to pdt 0.2.3 and pdt <0.2.\nHowever, it does still work with pdt <0.2.\n",
"bugtrack_url": null,
"license": "BSD 3-Clause License\n \n Copyright (c) 2022, pydiverse\n All rights reserved.\n \n Redistribution and use in source and binary forms, with or without\n modification, are permitted provided that the following conditions are met:\n \n 1. Redistributions of source code must retain the above copyright notice, this\n list of conditions and the following disclaimer.\n \n 2. Redistributions in binary form must reproduce the above copyright notice,\n this list of conditions and the following disclaimer in the documentation\n and/or other materials provided with the distribution.\n \n 3. Neither the name of the copyright holder nor the names of its\n contributors may be used to endorse or promote products derived from\n this software without specific prior written permission.\n \n THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS \"AS IS\"\n AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE\n IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE\n DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE\n FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL\n DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR\n SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER\n CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,\n OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE\n OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.",
"summary": "A pipeline orchestration library executing tasks within one python session. It takes care of SQL table (de)materialization, caching and cache invalidation. Blob storage is supported as well for example for storing model files.",
"version": "0.10.2",
"project_urls": null,
"split_keywords": [],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "1ad0d28f72def92e62c553d2d45bfee5fc3c9dce1cb72cfaefed202ce7547c2b",
"md5": "e990d5054e2e9dbd9198b7a301064de3",
"sha256": "ba240d313f5dd105aafc514488d76e1b2845b4aabf9df7b3bc93c6c92c328e0e"
},
"downloads": -1,
"filename": "pydiverse_pipedag-0.10.2-py3-none-any.whl",
"has_sig": false,
"md5_digest": "e990d5054e2e9dbd9198b7a301064de3",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<3.14,>=3.10.18",
"size": 196725,
"upload_time": "2025-07-08T12:14:30",
"upload_time_iso_8601": "2025-07-08T12:14:30.277579Z",
"url": "https://files.pythonhosted.org/packages/1a/d0/d28f72def92e62c553d2d45bfee5fc3c9dce1cb72cfaefed202ce7547c2b/pydiverse_pipedag-0.10.2-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "b683c647fb2c1718e95e55791eb1bf76aafe36fcaacb50feef027a9246f8123a",
"md5": "41a7e787c0ff19881c5f8f7c6bf3f7e3",
"sha256": "f4817a3b01f6f28b80c388c2e8be3e9f61b48911b1ca49fb996d5211d76da5ff"
},
"downloads": -1,
"filename": "pydiverse_pipedag-0.10.2.tar.gz",
"has_sig": false,
"md5_digest": "41a7e787c0ff19881c5f8f7c6bf3f7e3",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<3.14,>=3.10.18",
"size": 966021,
"upload_time": "2025-07-08T12:14:31",
"upload_time_iso_8601": "2025-07-08T12:14:31.899702Z",
"url": "https://files.pythonhosted.org/packages/b6/83/c647fb2c1718e95e55791eb1bf76aafe36fcaacb50feef027a9246f8123a/pydiverse_pipedag-0.10.2.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-07-08 12:14:31",
"github": false,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"lcname": "pydiverse-pipedag"
}