apipe


Nameapipe JSON
Version 0.1.9 PyPI version JSON
download
home_pagehttps://github.com/mysterious-ben/apipe
SummaryData pipelines with lazy computation and caching
upload_time2024-05-07 07:39:37
maintainerNone
docs_urlNone
authorMysterious Ben
requires_python<4.0,>=3.9
licenseApache License, Version 2.0
keywords python pipeline dask pandas
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # A-Pipe

**A-Pipe** allows to create data pipelines with lazy computation and caching.

**Features:**
- Lazy computation and cache loading
- Pickle and parquet serialization
- Support for hashing of `numpy` arrays and `pandas` DataFrames
- Support for `dask.Delayed` objects

## Installation

```shell
pip install apipe
```

## Examples

### Simple function caching

```python
import time
import apipe
import numpy as np
from loguru import logger

@apipe.eager_cached()
def load_data(table: str):
    time.sleep(1)
    arr = np.ones(5)
    logger.debug(f"transferred array data from table={table}")
    return arr

logger.info("start loading data")

# --- First pass: transfer data and save on disk
data = load_data("weather-ldn")
logger.info(f"finished loading data: {load_data()}")

# --- Second pass: load data from disk
data = load_data("weather-ldn")
logger.info(f"finished loading data: {load_data()}")
```


### Data pipeline with lazy execution and caching

```python
import apipe
import pandas as pd
import numpy as np
from loguru import logger

# --- Define data transformations via step functions (similar to dask.delayed)

@apipe.delayed_cached()  # lazy computation + caching on disk
def load_1():
    df = pd.DataFrame({"a": [1., 2.], "b": [0.1, np.nan]})
    logger.debug("Loaded {} records".format(len(df)))
    return df

@apipe.delayed_cached()  # lazy computation + caching on disk
def load_2(timestamp):
    df = pd.DataFrame({"a": [0.9, 3.], "b": [0.001, 1.]})
    logger.debug("Loaded {} records".format(len(df)))
    return df

@apipe.delayed_cached()  # lazy computation + caching on disk
def compute(x, y, eps):
    assert x.shape == y.shape
    diff = ((x - y).abs() / (y.abs()+eps)).mean().mean()
    logger.debug("Difference is computed")
    return diff

# --- Define pipeline dependencies
ts = pd.Timestamp(2019, 1, 1)
eps = 0.01
s1 = load_1()
s2 = load_2(ts)
diff = compute(s1, s2, eps)

# --- Trigger pipeline execution (first pass: compute everything and save on disk)
logger.info("diff: {:.3f}".format(apipe.delayed_compute((diff, ))[0]))

# --- Trigger pipeline execution (second pass: load from disk the end result only)
logger.info("diff: {:.3f}".format(apipe.delayed_compute((diff, ))[0]))
```

See more examples in a [notebook](https://github.com/mysterious-ben/ds-examples/blob/master/dataflows/dask_delayed_with_caching.ipynb).
            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/mysterious-ben/apipe",
    "name": "apipe",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<4.0,>=3.9",
    "maintainer_email": null,
    "keywords": "python, pipeline, dask, pandas",
    "author": "Mysterious Ben",
    "author_email": "datascience@tuta.io",
    "download_url": "https://files.pythonhosted.org/packages/2b/c9/6a8be4aa4929e817b741ae82062e957f4aa073127f7177f1d7d95658c815/apipe-0.1.9.tar.gz",
    "platform": null,
    "description": "# A-Pipe\n\n**A-Pipe** allows to create data pipelines with lazy computation and caching.\n\n**Features:**\n- Lazy computation and cache loading\n- Pickle and parquet serialization\n- Support for hashing of `numpy` arrays and `pandas` DataFrames\n- Support for `dask.Delayed` objects\n\n## Installation\n\n```shell\npip install apipe\n```\n\n## Examples\n\n### Simple function caching\n\n```python\nimport time\nimport apipe\nimport numpy as np\nfrom loguru import logger\n\n@apipe.eager_cached()\ndef load_data(table: str):\n    time.sleep(1)\n    arr = np.ones(5)\n    logger.debug(f\"transferred array data from table={table}\")\n    return arr\n\nlogger.info(\"start loading data\")\n\n# --- First pass: transfer data and save on disk\ndata = load_data(\"weather-ldn\")\nlogger.info(f\"finished loading data: {load_data()}\")\n\n# --- Second pass: load data from disk\ndata = load_data(\"weather-ldn\")\nlogger.info(f\"finished loading data: {load_data()}\")\n```\n\n\n### Data pipeline with lazy execution and caching\n\n```python\nimport apipe\nimport pandas as pd\nimport numpy as np\nfrom loguru import logger\n\n# --- Define data transformations via step functions (similar to dask.delayed)\n\n@apipe.delayed_cached()  # lazy computation + caching on disk\ndef load_1():\n    df = pd.DataFrame({\"a\": [1., 2.], \"b\": [0.1, np.nan]})\n    logger.debug(\"Loaded {} records\".format(len(df)))\n    return df\n\n@apipe.delayed_cached()  # lazy computation + caching on disk\ndef load_2(timestamp):\n    df = pd.DataFrame({\"a\": [0.9, 3.], \"b\": [0.001, 1.]})\n    logger.debug(\"Loaded {} records\".format(len(df)))\n    return df\n\n@apipe.delayed_cached()  # lazy computation + caching on disk\ndef compute(x, y, eps):\n    assert x.shape == y.shape\n    diff = ((x - y).abs() / (y.abs()+eps)).mean().mean()\n    logger.debug(\"Difference is computed\")\n    return diff\n\n# --- Define pipeline dependencies\nts = pd.Timestamp(2019, 1, 1)\neps = 0.01\ns1 = load_1()\ns2 = load_2(ts)\ndiff = compute(s1, s2, eps)\n\n# --- Trigger pipeline execution (first pass: compute everything and save on disk)\nlogger.info(\"diff: {:.3f}\".format(apipe.delayed_compute((diff, ))[0]))\n\n# --- Trigger pipeline execution (second pass: load from disk the end result only)\nlogger.info(\"diff: {:.3f}\".format(apipe.delayed_compute((diff, ))[0]))\n```\n\nSee more examples in a [notebook](https://github.com/mysterious-ben/ds-examples/blob/master/dataflows/dask_delayed_with_caching.ipynb).",
    "bugtrack_url": null,
    "license": "Apache License, Version 2.0",
    "summary": "Data pipelines with lazy computation and caching",
    "version": "0.1.9",
    "project_urls": {
        "Homepage": "https://github.com/mysterious-ben/apipe",
        "Repository": "https://github.com/mysterious-ben/apipe"
    },
    "split_keywords": [
        "python",
        " pipeline",
        " dask",
        " pandas"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "ece08628d9a07755418ad35e84317b7f5d3bcd50c285ec2ff20b09c602943fb3",
                "md5": "4bf5a008f41ad7e8d8a7d4d5cd9e9475",
                "sha256": "ff84fc3fcbd652b7d97403cbcac8969d59c954b4e3ae0f481c170bb29249f39e"
            },
            "downloads": -1,
            "filename": "apipe-0.1.9-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "4bf5a008f41ad7e8d8a7d4d5cd9e9475",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<4.0,>=3.9",
            "size": 16197,
            "upload_time": "2024-05-07T07:39:35",
            "upload_time_iso_8601": "2024-05-07T07:39:35.682514Z",
            "url": "https://files.pythonhosted.org/packages/ec/e0/8628d9a07755418ad35e84317b7f5d3bcd50c285ec2ff20b09c602943fb3/apipe-0.1.9-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "2bc96a8be4aa4929e817b741ae82062e957f4aa073127f7177f1d7d95658c815",
                "md5": "360c43718cdc855848904b93f0c77a9e",
                "sha256": "6fb8916f1d09878801daad17a58019c8ee8e220021085eb55f7393e4e88cf1b3"
            },
            "downloads": -1,
            "filename": "apipe-0.1.9.tar.gz",
            "has_sig": false,
            "md5_digest": "360c43718cdc855848904b93f0c77a9e",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<4.0,>=3.9",
            "size": 12238,
            "upload_time": "2024-05-07T07:39:37",
            "upload_time_iso_8601": "2024-05-07T07:39:37.462910Z",
            "url": "https://files.pythonhosted.org/packages/2b/c9/6a8be4aa4929e817b741ae82062e957f4aa073127f7177f1d7d95658c815/apipe-0.1.9.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-05-07 07:39:37",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "mysterious-ben",
    "github_project": "apipe",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "requirements": [],
    "lcname": "apipe"
}
        
Elapsed time: 1.86046s