# A-Pipe
**A-Pipe** allows to create data pipelines with lazy computation and caching.
**Features:**
- Lazy computation and cache loading
- Pickle and parquet serialization
- Support for hashing of `numpy` arrays and `pandas` DataFrames
- Support for `dask.Delayed` objects
## Installation
```shell
pip install apipe
```
## Examples
### Simple function caching
```python
import time
import apipe
import numpy as np
from loguru import logger
@apipe.eager_cached()
def load_data(table: str):
time.sleep(1)
arr = np.ones(5)
logger.debug(f"transferred array data from table={table}")
return arr
logger.info("start loading data")
# --- First pass: transfer data and save on disk
data = load_data("weather-ldn")
logger.info(f"finished loading data: {load_data()}")
# --- Second pass: load data from disk
data = load_data("weather-ldn")
logger.info(f"finished loading data: {load_data()}")
```
### Data pipeline with lazy execution and caching
```python
import apipe
import pandas as pd
import numpy as np
from loguru import logger
# --- Define data transformations via step functions (similar to dask.delayed)
@apipe.delayed_cached() # lazy computation + caching on disk
def load_1():
df = pd.DataFrame({"a": [1., 2.], "b": [0.1, np.nan]})
logger.debug("Loaded {} records".format(len(df)))
return df
@apipe.delayed_cached() # lazy computation + caching on disk
def load_2(timestamp):
df = pd.DataFrame({"a": [0.9, 3.], "b": [0.001, 1.]})
logger.debug("Loaded {} records".format(len(df)))
return df
@apipe.delayed_cached() # lazy computation + caching on disk
def compute(x, y, eps):
assert x.shape == y.shape
diff = ((x - y).abs() / (y.abs()+eps)).mean().mean()
logger.debug("Difference is computed")
return diff
# --- Define pipeline dependencies
ts = pd.Timestamp(2019, 1, 1)
eps = 0.01
s1 = load_1()
s2 = load_2(ts)
diff = compute(s1, s2, eps)
# --- Trigger pipeline execution (first pass: compute everything and save on disk)
logger.info("diff: {:.3f}".format(apipe.delayed_compute((diff, ))[0]))
# --- Trigger pipeline execution (second pass: load from disk the end result only)
logger.info("diff: {:.3f}".format(apipe.delayed_compute((diff, ))[0]))
```
See more examples in a [notebook](https://github.com/mysterious-ben/ds-examples/blob/master/dataflows/dask_delayed_with_caching.ipynb).
Raw data
{
"_id": null,
"home_page": "https://github.com/mysterious-ben/apipe",
"name": "apipe",
"maintainer": null,
"docs_url": null,
"requires_python": "<4.0,>=3.9",
"maintainer_email": null,
"keywords": "python, pipeline, dask, pandas",
"author": "Mysterious Ben",
"author_email": "datascience@tuta.io",
"download_url": "https://files.pythonhosted.org/packages/5c/82/eda34c6d753ff569726cbbdb6dc932e1381d5f47ec5d14a6974c8792a4b6/apipe-0.1.8.tar.gz",
"platform": null,
"description": "# A-Pipe\n\n**A-Pipe** allows to create data pipelines with lazy computation and caching.\n\n**Features:**\n- Lazy computation and cache loading\n- Pickle and parquet serialization\n- Support for hashing of `numpy` arrays and `pandas` DataFrames\n- Support for `dask.Delayed` objects\n\n## Installation\n\n```shell\npip install apipe\n```\n\n## Examples\n\n### Simple function caching\n\n```python\nimport time\nimport apipe\nimport numpy as np\nfrom loguru import logger\n\n@apipe.eager_cached()\ndef load_data(table: str):\n time.sleep(1)\n arr = np.ones(5)\n logger.debug(f\"transferred array data from table={table}\")\n return arr\n\nlogger.info(\"start loading data\")\n\n# --- First pass: transfer data and save on disk\ndata = load_data(\"weather-ldn\")\nlogger.info(f\"finished loading data: {load_data()}\")\n\n# --- Second pass: load data from disk\ndata = load_data(\"weather-ldn\")\nlogger.info(f\"finished loading data: {load_data()}\")\n```\n\n\n### Data pipeline with lazy execution and caching\n\n```python\nimport apipe\nimport pandas as pd\nimport numpy as np\nfrom loguru import logger\n\n# --- Define data transformations via step functions (similar to dask.delayed)\n\n@apipe.delayed_cached() # lazy computation + caching on disk\ndef load_1():\n df = pd.DataFrame({\"a\": [1., 2.], \"b\": [0.1, np.nan]})\n logger.debug(\"Loaded {} records\".format(len(df)))\n return df\n\n@apipe.delayed_cached() # lazy computation + caching on disk\ndef load_2(timestamp):\n df = pd.DataFrame({\"a\": [0.9, 3.], \"b\": [0.001, 1.]})\n logger.debug(\"Loaded {} records\".format(len(df)))\n return df\n\n@apipe.delayed_cached() # lazy computation + caching on disk\ndef compute(x, y, eps):\n assert x.shape == y.shape\n diff = ((x - y).abs() / (y.abs()+eps)).mean().mean()\n logger.debug(\"Difference is computed\")\n return diff\n\n# --- Define pipeline dependencies\nts = pd.Timestamp(2019, 1, 1)\neps = 0.01\ns1 = load_1()\ns2 = load_2(ts)\ndiff = compute(s1, s2, eps)\n\n# --- Trigger pipeline execution (first pass: compute everything and save on disk)\nlogger.info(\"diff: {:.3f}\".format(apipe.delayed_compute((diff, ))[0]))\n\n# --- Trigger pipeline execution (second pass: load from disk the end result only)\nlogger.info(\"diff: {:.3f}\".format(apipe.delayed_compute((diff, ))[0]))\n```\n\nSee more examples in a [notebook](https://github.com/mysterious-ben/ds-examples/blob/master/dataflows/dask_delayed_with_caching.ipynb).",
"bugtrack_url": null,
"license": "Apache License, Version 2.0",
"summary": "Data pipelines with lazy computation and caching",
"version": "0.1.8",
"project_urls": {
"Homepage": "https://github.com/mysterious-ben/apipe",
"Repository": "https://github.com/mysterious-ben/apipe"
},
"split_keywords": [
"python",
" pipeline",
" dask",
" pandas"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "5fd27821a21a7c3d4c2358c038ba983881411d126e389141b84f5572e97079b7",
"md5": "b01925c95c885c045f000bbacf18ff67",
"sha256": "7e51b81a4f9b5f02938692013e2031c9667c204f400cb542ffbf0c564e2d2f79"
},
"downloads": -1,
"filename": "apipe-0.1.8-py3-none-any.whl",
"has_sig": false,
"md5_digest": "b01925c95c885c045f000bbacf18ff67",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<4.0,>=3.9",
"size": 16202,
"upload_time": "2024-04-24T10:28:39",
"upload_time_iso_8601": "2024-04-24T10:28:39.080602Z",
"url": "https://files.pythonhosted.org/packages/5f/d2/7821a21a7c3d4c2358c038ba983881411d126e389141b84f5572e97079b7/apipe-0.1.8-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "5c82eda34c6d753ff569726cbbdb6dc932e1381d5f47ec5d14a6974c8792a4b6",
"md5": "1e95d70344c8011923a64b55db6871e2",
"sha256": "0aa7ed71a10679a07aec2c51bebca0558d413f92f0fb3d09436a9aab1930d7d3"
},
"downloads": -1,
"filename": "apipe-0.1.8.tar.gz",
"has_sig": false,
"md5_digest": "1e95d70344c8011923a64b55db6871e2",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<4.0,>=3.9",
"size": 12254,
"upload_time": "2024-04-24T10:28:40",
"upload_time_iso_8601": "2024-04-24T10:28:40.282690Z",
"url": "https://files.pythonhosted.org/packages/5c/82/eda34c6d753ff569726cbbdb6dc932e1381d5f47ec5d14a6974c8792a4b6/apipe-0.1.8.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-04-24 10:28:40",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "mysterious-ben",
"github_project": "apipe",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"requirements": [
{
"name": "click",
"specs": [
[
"==",
"8.1.7"
]
]
},
{
"name": "cloudpickle",
"specs": [
[
"==",
"2.2.1"
]
]
},
{
"name": "colorama",
"specs": [
[
"==",
"0.4.6"
]
]
},
{
"name": "dask",
"specs": [
[
"==",
"2022.12.1"
]
]
},
{
"name": "fsspec",
"specs": [
[
"==",
"2024.3.1"
]
]
},
{
"name": "locket",
"specs": [
[
"==",
"1.0.0"
]
]
},
{
"name": "loguru",
"specs": [
[
"==",
"0.7.2"
]
]
},
{
"name": "numpy",
"specs": [
[
"==",
"1.26.4"
]
]
},
{
"name": "packaging",
"specs": [
[
"==",
"24.0"
]
]
},
{
"name": "pandas",
"specs": [
[
"==",
"2.2.2"
]
]
},
{
"name": "partd",
"specs": [
[
"==",
"1.4.1"
]
]
},
{
"name": "pyarrow",
"specs": [
[
"==",
"16.0.0"
]
]
},
{
"name": "python-dateutil",
"specs": [
[
"==",
"2.9.0.post0"
]
]
},
{
"name": "pytz",
"specs": [
[
"==",
"2024.1"
]
]
},
{
"name": "pyyaml",
"specs": [
[
"==",
"6.0.1"
]
]
},
{
"name": "six",
"specs": [
[
"==",
"1.16.0"
]
]
},
{
"name": "toolz",
"specs": [
[
"==",
"0.12.1"
]
]
},
{
"name": "tzdata",
"specs": [
[
"==",
"2024.1"
]
]
},
{
"name": "win32-setctime",
"specs": [
[
"==",
"1.1.0"
]
]
},
{
"name": "xxhash",
"specs": [
[
"==",
"3.4.1"
]
]
}
],
"lcname": "apipe"
}