apipe


Nameapipe JSON
Version 0.1.8 PyPI version JSON
download
home_pagehttps://github.com/mysterious-ben/apipe
SummaryData pipelines with lazy computation and caching
upload_time2024-04-24 10:28:40
maintainerNone
docs_urlNone
authorMysterious Ben
requires_python<4.0,>=3.9
licenseApache License, Version 2.0
keywords python pipeline dask pandas
VCS
bugtrack_url
requirements click cloudpickle colorama dask fsspec locket loguru numpy packaging pandas partd pyarrow python-dateutil pytz pyyaml six toolz tzdata win32-setctime xxhash
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # A-Pipe

**A-Pipe** allows to create data pipelines with lazy computation and caching.

**Features:**
- Lazy computation and cache loading
- Pickle and parquet serialization
- Support for hashing of `numpy` arrays and `pandas` DataFrames
- Support for `dask.Delayed` objects

## Installation

```shell
pip install apipe
```

## Examples

### Simple function caching

```python
import time
import apipe
import numpy as np
from loguru import logger

@apipe.eager_cached()
def load_data(table: str):
    time.sleep(1)
    arr = np.ones(5)
    logger.debug(f"transferred array data from table={table}")
    return arr

logger.info("start loading data")

# --- First pass: transfer data and save on disk
data = load_data("weather-ldn")
logger.info(f"finished loading data: {load_data()}")

# --- Second pass: load data from disk
data = load_data("weather-ldn")
logger.info(f"finished loading data: {load_data()}")
```


### Data pipeline with lazy execution and caching

```python
import apipe
import pandas as pd
import numpy as np
from loguru import logger

# --- Define data transformations via step functions (similar to dask.delayed)

@apipe.delayed_cached()  # lazy computation + caching on disk
def load_1():
    df = pd.DataFrame({"a": [1., 2.], "b": [0.1, np.nan]})
    logger.debug("Loaded {} records".format(len(df)))
    return df

@apipe.delayed_cached()  # lazy computation + caching on disk
def load_2(timestamp):
    df = pd.DataFrame({"a": [0.9, 3.], "b": [0.001, 1.]})
    logger.debug("Loaded {} records".format(len(df)))
    return df

@apipe.delayed_cached()  # lazy computation + caching on disk
def compute(x, y, eps):
    assert x.shape == y.shape
    diff = ((x - y).abs() / (y.abs()+eps)).mean().mean()
    logger.debug("Difference is computed")
    return diff

# --- Define pipeline dependencies
ts = pd.Timestamp(2019, 1, 1)
eps = 0.01
s1 = load_1()
s2 = load_2(ts)
diff = compute(s1, s2, eps)

# --- Trigger pipeline execution (first pass: compute everything and save on disk)
logger.info("diff: {:.3f}".format(apipe.delayed_compute((diff, ))[0]))

# --- Trigger pipeline execution (second pass: load from disk the end result only)
logger.info("diff: {:.3f}".format(apipe.delayed_compute((diff, ))[0]))
```

See more examples in a [notebook](https://github.com/mysterious-ben/ds-examples/blob/master/dataflows/dask_delayed_with_caching.ipynb).
            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/mysterious-ben/apipe",
    "name": "apipe",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<4.0,>=3.9",
    "maintainer_email": null,
    "keywords": "python, pipeline, dask, pandas",
    "author": "Mysterious Ben",
    "author_email": "datascience@tuta.io",
    "download_url": "https://files.pythonhosted.org/packages/5c/82/eda34c6d753ff569726cbbdb6dc932e1381d5f47ec5d14a6974c8792a4b6/apipe-0.1.8.tar.gz",
    "platform": null,
    "description": "# A-Pipe\n\n**A-Pipe** allows to create data pipelines with lazy computation and caching.\n\n**Features:**\n- Lazy computation and cache loading\n- Pickle and parquet serialization\n- Support for hashing of `numpy` arrays and `pandas` DataFrames\n- Support for `dask.Delayed` objects\n\n## Installation\n\n```shell\npip install apipe\n```\n\n## Examples\n\n### Simple function caching\n\n```python\nimport time\nimport apipe\nimport numpy as np\nfrom loguru import logger\n\n@apipe.eager_cached()\ndef load_data(table: str):\n    time.sleep(1)\n    arr = np.ones(5)\n    logger.debug(f\"transferred array data from table={table}\")\n    return arr\n\nlogger.info(\"start loading data\")\n\n# --- First pass: transfer data and save on disk\ndata = load_data(\"weather-ldn\")\nlogger.info(f\"finished loading data: {load_data()}\")\n\n# --- Second pass: load data from disk\ndata = load_data(\"weather-ldn\")\nlogger.info(f\"finished loading data: {load_data()}\")\n```\n\n\n### Data pipeline with lazy execution and caching\n\n```python\nimport apipe\nimport pandas as pd\nimport numpy as np\nfrom loguru import logger\n\n# --- Define data transformations via step functions (similar to dask.delayed)\n\n@apipe.delayed_cached()  # lazy computation + caching on disk\ndef load_1():\n    df = pd.DataFrame({\"a\": [1., 2.], \"b\": [0.1, np.nan]})\n    logger.debug(\"Loaded {} records\".format(len(df)))\n    return df\n\n@apipe.delayed_cached()  # lazy computation + caching on disk\ndef load_2(timestamp):\n    df = pd.DataFrame({\"a\": [0.9, 3.], \"b\": [0.001, 1.]})\n    logger.debug(\"Loaded {} records\".format(len(df)))\n    return df\n\n@apipe.delayed_cached()  # lazy computation + caching on disk\ndef compute(x, y, eps):\n    assert x.shape == y.shape\n    diff = ((x - y).abs() / (y.abs()+eps)).mean().mean()\n    logger.debug(\"Difference is computed\")\n    return diff\n\n# --- Define pipeline dependencies\nts = pd.Timestamp(2019, 1, 1)\neps = 0.01\ns1 = load_1()\ns2 = load_2(ts)\ndiff = compute(s1, s2, eps)\n\n# --- Trigger pipeline execution (first pass: compute everything and save on disk)\nlogger.info(\"diff: {:.3f}\".format(apipe.delayed_compute((diff, ))[0]))\n\n# --- Trigger pipeline execution (second pass: load from disk the end result only)\nlogger.info(\"diff: {:.3f}\".format(apipe.delayed_compute((diff, ))[0]))\n```\n\nSee more examples in a [notebook](https://github.com/mysterious-ben/ds-examples/blob/master/dataflows/dask_delayed_with_caching.ipynb).",
    "bugtrack_url": null,
    "license": "Apache License, Version 2.0",
    "summary": "Data pipelines with lazy computation and caching",
    "version": "0.1.8",
    "project_urls": {
        "Homepage": "https://github.com/mysterious-ben/apipe",
        "Repository": "https://github.com/mysterious-ben/apipe"
    },
    "split_keywords": [
        "python",
        " pipeline",
        " dask",
        " pandas"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "5fd27821a21a7c3d4c2358c038ba983881411d126e389141b84f5572e97079b7",
                "md5": "b01925c95c885c045f000bbacf18ff67",
                "sha256": "7e51b81a4f9b5f02938692013e2031c9667c204f400cb542ffbf0c564e2d2f79"
            },
            "downloads": -1,
            "filename": "apipe-0.1.8-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "b01925c95c885c045f000bbacf18ff67",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<4.0,>=3.9",
            "size": 16202,
            "upload_time": "2024-04-24T10:28:39",
            "upload_time_iso_8601": "2024-04-24T10:28:39.080602Z",
            "url": "https://files.pythonhosted.org/packages/5f/d2/7821a21a7c3d4c2358c038ba983881411d126e389141b84f5572e97079b7/apipe-0.1.8-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "5c82eda34c6d753ff569726cbbdb6dc932e1381d5f47ec5d14a6974c8792a4b6",
                "md5": "1e95d70344c8011923a64b55db6871e2",
                "sha256": "0aa7ed71a10679a07aec2c51bebca0558d413f92f0fb3d09436a9aab1930d7d3"
            },
            "downloads": -1,
            "filename": "apipe-0.1.8.tar.gz",
            "has_sig": false,
            "md5_digest": "1e95d70344c8011923a64b55db6871e2",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<4.0,>=3.9",
            "size": 12254,
            "upload_time": "2024-04-24T10:28:40",
            "upload_time_iso_8601": "2024-04-24T10:28:40.282690Z",
            "url": "https://files.pythonhosted.org/packages/5c/82/eda34c6d753ff569726cbbdb6dc932e1381d5f47ec5d14a6974c8792a4b6/apipe-0.1.8.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-04-24 10:28:40",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "mysterious-ben",
    "github_project": "apipe",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "requirements": [
        {
            "name": "click",
            "specs": [
                [
                    "==",
                    "8.1.7"
                ]
            ]
        },
        {
            "name": "cloudpickle",
            "specs": [
                [
                    "==",
                    "2.2.1"
                ]
            ]
        },
        {
            "name": "colorama",
            "specs": [
                [
                    "==",
                    "0.4.6"
                ]
            ]
        },
        {
            "name": "dask",
            "specs": [
                [
                    "==",
                    "2022.12.1"
                ]
            ]
        },
        {
            "name": "fsspec",
            "specs": [
                [
                    "==",
                    "2024.3.1"
                ]
            ]
        },
        {
            "name": "locket",
            "specs": [
                [
                    "==",
                    "1.0.0"
                ]
            ]
        },
        {
            "name": "loguru",
            "specs": [
                [
                    "==",
                    "0.7.2"
                ]
            ]
        },
        {
            "name": "numpy",
            "specs": [
                [
                    "==",
                    "1.26.4"
                ]
            ]
        },
        {
            "name": "packaging",
            "specs": [
                [
                    "==",
                    "24.0"
                ]
            ]
        },
        {
            "name": "pandas",
            "specs": [
                [
                    "==",
                    "2.2.2"
                ]
            ]
        },
        {
            "name": "partd",
            "specs": [
                [
                    "==",
                    "1.4.1"
                ]
            ]
        },
        {
            "name": "pyarrow",
            "specs": [
                [
                    "==",
                    "16.0.0"
                ]
            ]
        },
        {
            "name": "python-dateutil",
            "specs": [
                [
                    "==",
                    "2.9.0.post0"
                ]
            ]
        },
        {
            "name": "pytz",
            "specs": [
                [
                    "==",
                    "2024.1"
                ]
            ]
        },
        {
            "name": "pyyaml",
            "specs": [
                [
                    "==",
                    "6.0.1"
                ]
            ]
        },
        {
            "name": "six",
            "specs": [
                [
                    "==",
                    "1.16.0"
                ]
            ]
        },
        {
            "name": "toolz",
            "specs": [
                [
                    "==",
                    "0.12.1"
                ]
            ]
        },
        {
            "name": "tzdata",
            "specs": [
                [
                    "==",
                    "2024.1"
                ]
            ]
        },
        {
            "name": "win32-setctime",
            "specs": [
                [
                    "==",
                    "1.1.0"
                ]
            ]
        },
        {
            "name": "xxhash",
            "specs": [
                [
                    "==",
                    "3.4.1"
                ]
            ]
        }
    ],
    "lcname": "apipe"
}
        
Elapsed time: 0.26838s