# A-Pipe
**A-Pipe** allows to create data pipelines with lazy computation and caching.
**Features:**
- Lazy computation and cache loading
- Pickle and parquet serialization
- Support for hashing of `numpy` arrays and `pandas` DataFrames
- Support for `dask.Delayed` objects
## Installation
```shell
pip install apipe
```
## Examples
### Simple function caching
```python
import time
import apipe
import numpy as np
from loguru import logger
@apipe.eager_cached()
def load_data(table: str):
time.sleep(1)
arr = np.ones(5)
logger.debug(f"transferred array data from table={table}")
return arr
logger.info("start loading data")
# --- First pass: transfer data and save on disk
data = load_data("weather-ldn")
logger.info(f"finished loading data: {load_data()}")
# --- Second pass: load data from disk
data = load_data("weather-ldn")
logger.info(f"finished loading data: {load_data()}")
```
### Data pipeline with lazy execution and caching
```python
import apipe
import pandas as pd
import numpy as np
from loguru import logger
# --- Define data transformations via step functions (similar to dask.delayed)
@apipe.delayed_cached() # lazy computation + caching on disk
def load_1():
df = pd.DataFrame({"a": [1., 2.], "b": [0.1, np.nan]})
logger.debug("Loaded {} records".format(len(df)))
return df
@apipe.delayed_cached() # lazy computation + caching on disk
def load_2(timestamp):
df = pd.DataFrame({"a": [0.9, 3.], "b": [0.001, 1.]})
logger.debug("Loaded {} records".format(len(df)))
return df
@apipe.delayed_cached() # lazy computation + caching on disk
def compute(x, y, eps):
assert x.shape == y.shape
diff = ((x - y).abs() / (y.abs()+eps)).mean().mean()
logger.debug("Difference is computed")
return diff
# --- Define pipeline dependencies
ts = pd.Timestamp(2019, 1, 1)
eps = 0.01
s1 = load_1()
s2 = load_2(ts)
diff = compute(s1, s2, eps)
# --- Trigger pipeline execution (first pass: compute everything and save on disk)
logger.info("diff: {:.3f}".format(apipe.delayed_compute((diff, ))[0]))
# --- Trigger pipeline execution (second pass: load from disk the end result only)
logger.info("diff: {:.3f}".format(apipe.delayed_compute((diff, ))[0]))
```
See more examples in a [notebook](https://github.com/mysterious-ben/ds-examples/blob/master/dataflows/dask_delayed_with_caching.ipynb).
Raw data
{
"_id": null,
"home_page": "https://github.com/mysterious-ben/apipe",
"name": "apipe",
"maintainer": null,
"docs_url": null,
"requires_python": "<4.0,>=3.9",
"maintainer_email": null,
"keywords": "python, pipeline, dask, pandas",
"author": "Mysterious Ben",
"author_email": "datascience@tuta.io",
"download_url": "https://files.pythonhosted.org/packages/2b/c9/6a8be4aa4929e817b741ae82062e957f4aa073127f7177f1d7d95658c815/apipe-0.1.9.tar.gz",
"platform": null,
"description": "# A-Pipe\n\n**A-Pipe** allows to create data pipelines with lazy computation and caching.\n\n**Features:**\n- Lazy computation and cache loading\n- Pickle and parquet serialization\n- Support for hashing of `numpy` arrays and `pandas` DataFrames\n- Support for `dask.Delayed` objects\n\n## Installation\n\n```shell\npip install apipe\n```\n\n## Examples\n\n### Simple function caching\n\n```python\nimport time\nimport apipe\nimport numpy as np\nfrom loguru import logger\n\n@apipe.eager_cached()\ndef load_data(table: str):\n time.sleep(1)\n arr = np.ones(5)\n logger.debug(f\"transferred array data from table={table}\")\n return arr\n\nlogger.info(\"start loading data\")\n\n# --- First pass: transfer data and save on disk\ndata = load_data(\"weather-ldn\")\nlogger.info(f\"finished loading data: {load_data()}\")\n\n# --- Second pass: load data from disk\ndata = load_data(\"weather-ldn\")\nlogger.info(f\"finished loading data: {load_data()}\")\n```\n\n\n### Data pipeline with lazy execution and caching\n\n```python\nimport apipe\nimport pandas as pd\nimport numpy as np\nfrom loguru import logger\n\n# --- Define data transformations via step functions (similar to dask.delayed)\n\n@apipe.delayed_cached() # lazy computation + caching on disk\ndef load_1():\n df = pd.DataFrame({\"a\": [1., 2.], \"b\": [0.1, np.nan]})\n logger.debug(\"Loaded {} records\".format(len(df)))\n return df\n\n@apipe.delayed_cached() # lazy computation + caching on disk\ndef load_2(timestamp):\n df = pd.DataFrame({\"a\": [0.9, 3.], \"b\": [0.001, 1.]})\n logger.debug(\"Loaded {} records\".format(len(df)))\n return df\n\n@apipe.delayed_cached() # lazy computation + caching on disk\ndef compute(x, y, eps):\n assert x.shape == y.shape\n diff = ((x - y).abs() / (y.abs()+eps)).mean().mean()\n logger.debug(\"Difference is computed\")\n return diff\n\n# --- Define pipeline dependencies\nts = pd.Timestamp(2019, 1, 1)\neps = 0.01\ns1 = load_1()\ns2 = load_2(ts)\ndiff = compute(s1, s2, eps)\n\n# --- Trigger pipeline execution (first pass: compute everything and save on disk)\nlogger.info(\"diff: {:.3f}\".format(apipe.delayed_compute((diff, ))[0]))\n\n# --- Trigger pipeline execution (second pass: load from disk the end result only)\nlogger.info(\"diff: {:.3f}\".format(apipe.delayed_compute((diff, ))[0]))\n```\n\nSee more examples in a [notebook](https://github.com/mysterious-ben/ds-examples/blob/master/dataflows/dask_delayed_with_caching.ipynb).",
"bugtrack_url": null,
"license": "Apache License, Version 2.0",
"summary": "Data pipelines with lazy computation and caching",
"version": "0.1.9",
"project_urls": {
"Homepage": "https://github.com/mysterious-ben/apipe",
"Repository": "https://github.com/mysterious-ben/apipe"
},
"split_keywords": [
"python",
" pipeline",
" dask",
" pandas"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "ece08628d9a07755418ad35e84317b7f5d3bcd50c285ec2ff20b09c602943fb3",
"md5": "4bf5a008f41ad7e8d8a7d4d5cd9e9475",
"sha256": "ff84fc3fcbd652b7d97403cbcac8969d59c954b4e3ae0f481c170bb29249f39e"
},
"downloads": -1,
"filename": "apipe-0.1.9-py3-none-any.whl",
"has_sig": false,
"md5_digest": "4bf5a008f41ad7e8d8a7d4d5cd9e9475",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<4.0,>=3.9",
"size": 16197,
"upload_time": "2024-05-07T07:39:35",
"upload_time_iso_8601": "2024-05-07T07:39:35.682514Z",
"url": "https://files.pythonhosted.org/packages/ec/e0/8628d9a07755418ad35e84317b7f5d3bcd50c285ec2ff20b09c602943fb3/apipe-0.1.9-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "2bc96a8be4aa4929e817b741ae82062e957f4aa073127f7177f1d7d95658c815",
"md5": "360c43718cdc855848904b93f0c77a9e",
"sha256": "6fb8916f1d09878801daad17a58019c8ee8e220021085eb55f7393e4e88cf1b3"
},
"downloads": -1,
"filename": "apipe-0.1.9.tar.gz",
"has_sig": false,
"md5_digest": "360c43718cdc855848904b93f0c77a9e",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<4.0,>=3.9",
"size": 12238,
"upload_time": "2024-05-07T07:39:37",
"upload_time_iso_8601": "2024-05-07T07:39:37.462910Z",
"url": "https://files.pythonhosted.org/packages/2b/c9/6a8be4aa4929e817b741ae82062e957f4aa073127f7177f1d7d95658c815/apipe-0.1.9.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-05-07 07:39:37",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "mysterious-ben",
"github_project": "apipe",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"requirements": [],
"lcname": "apipe"
}