prefect-dask


Nameprefect-dask JSON
Version 0.2.7 PyPI version JSON
download
home_pageNone
SummaryPrefect integrations with the Dask execution framework.
upload_time2024-04-25 19:21:39
maintainerNone
docs_urlNone
authorNone
requires_python>=3.8
licenseApache License 2.0
keywords prefect
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # `prefect-dask`

<p align="center">
    <a href="https://pypi.python.org/pypi/prefect-dask/" alt="PyPI version">
        <img alt="PyPI" src="https://img.shields.io/pypi/v/prefect-dask?color=26272B&labelColor=090422"></a>
    <a href="https://pepy.tech/badge/prefect-dask/" alt="Downloads">
        <img src="https://img.shields.io/pypi/dm/prefect-dask?color=26272B&labelColor=090422" /></a>
</p>

The `prefect-dask` collection makes it easy to include distributed processing for your flows. Check out the examples below to get started!

## Getting Started

### Integrate with Prefect flows

Perhaps you're already working with Prefect flows. Say your flow downloads many images to train your machine learning model. Unfortunately, it takes a long time to download your flows because your code is running sequentially.

After installing `prefect-dask` you can parallelize your flow in three simple steps:

1. Add the import: `from prefect_dask import DaskTaskRunner`
2. Specify the task runner in the flow decorator: `@flow(task_runner=DaskTaskRunner)`
3. Submit tasks to the flow's task runner: `a_task.submit(*args, **kwargs)`

The parallelized code  runs in about 1/3 of the time in our test!  And that's without distributing the workload over multiple machines.
Here's the before and after!

=== "Before"
    ```python hl_lines="1"
    # Completed in 15.2 seconds

    from typing import List
    from pathlib import Path

    import httpx
    from prefect import flow, task

    URL_FORMAT = (
        "https://www.cpc.ncep.noaa.gov/products/NMME/archive/"
        "{year:04d}{month:02d}0800/current/images/nino34.rescaling.ENSMEAN.png"
    )

    @task
    def download_image(year: int, month: int, directory: Path) -> Path:
        # download image from URL
        url = URL_FORMAT.format(year=year, month=month)
        resp = httpx.get(url)

        # save content to directory/YYYYMM.png
        file_path = (directory / url.split("/")[-1]).with_stem(f"{year:04d}{month:02d}")
        file_path.write_bytes(resp.content)
        return file_path

    @flow
    def download_nino_34_plumes_from_year(year: int) -> List[Path]:
        # create a directory to hold images
        directory = Path("data")
        directory.mkdir(exist_ok=True)

        # download all images
        file_paths = []
        for month in range(1, 12 + 1):
            file_path = download_image(year, month, directory)
            file_paths.append(file_path)
        return file_paths

    if __name__ == "__main__":
        download_nino_34_plumes_from_year(2022)
    ```

=== "After"

    ```python hl_lines="1 8 26 35"
    # Completed in 5.7 seconds

    from typing import List
    from pathlib import Path

    import httpx
    from prefect import flow, task
    from prefect_dask import DaskTaskRunner

    URL_FORMAT = (
        "https://www.cpc.ncep.noaa.gov/products/NMME/archive/"
        "{year:04d}{month:02d}0800/current/images/nino34.rescaling.ENSMEAN.png"
    )

    @task
    def download_image(year: int, month: int, directory: Path) -> Path:
        # download image from URL
        url = URL_FORMAT.format(year=year, month=month)
        resp = httpx.get(url)

        # save content to directory/YYYYMM.png
        file_path = (directory / url.split("/")[-1]).with_stem(f"{year:04d}{month:02d}")
        file_path.write_bytes(resp.content)
        return file_path

    @flow(task_runner=DaskTaskRunner(cluster_kwargs={"processes": False}))
    def download_nino_34_plumes_from_year(year: int) -> List[Path]:
        # create a directory to hold images
        directory = Path("data")
        directory.mkdir(exist_ok=True)

        # download all images
        file_paths = []
        for month in range(1, 12 + 1):
            file_path = download_image.submit(year, month, directory)
            file_paths.append(file_path)
        return file_paths

    if __name__ == "__main__":
        download_nino_34_plumes_from_year(2022)
    ```

The original flow completes in 15.2 seconds.

However, with just a few minor tweaks, we were able to reduce the runtime by nearly **three** folds, down to just **5.7** seconds!

### Integrate with Dask client/cluster and collections

Suppose you have an existing Dask client/cluster and collection, like a `dask.dataframe.DataFrame`, and you want to add observability.

With `prefect-dask`, there's no major overhaul necessary because Prefect was designed with incremental adoption in mind! It's as easy as:

1. Adding the imports
2. Sprinkling a few `task` and `flow` decorators
3. Using `get_dask_client` context manager on collections to distribute work across workers
4. Specifying the task runner and client's address in the flow decorator
5. Submitting the tasks to the flow's task runner

=== "Before"

    ```python
    import dask.dataframe
    import dask.distributed



    client = dask.distributed.Client()


    def read_data(start: str, end: str) -> dask.dataframe.DataFrame:
        df = dask.datasets.timeseries(start, end, partition_freq="4w")
        return df


    def process_data(df: dask.dataframe.DataFrame) -> dask.dataframe.DataFrame:

        df_yearly_avg = df.groupby(df.index.year).mean()
        return df_yearly_avg.compute()


    def dask_pipeline():
        df = read_data("1988", "2022")
        df_yearly_average = process_data(df)
        return df_yearly_average

    dask_pipeline()
    ```


=== "After"

    ```python hl_lines="3 4 8 13 15 19 21 22"
    import dask.dataframe
    import dask.distributed
    from prefect import flow, task
    from prefect_dask import DaskTaskRunner, get_dask_client

    client = dask.distributed.Client()

    @task
    def read_data(start: str, end: str) -> dask.dataframe.DataFrame:
        df = dask.datasets.timeseries(start, end, partition_freq="4w")
        return df

    @task
    def process_data(df: dask.dataframe.DataFrame) -> dask.dataframe.DataFrame:
        with get_dask_client():
            df_yearly_avg = df.groupby(df.index.year).mean()
            return df_yearly_avg.compute()

    @flow(task_runner=DaskTaskRunner(address=client.scheduler.address))
    def dask_pipeline():
        df = read_data.submit("1988", "2022")
        df_yearly_average = process_data.submit(df)
        return df_yearly_average

    dask_pipeline()
    ```

Now, you can conveniently see when each task completed, both in the terminal and the UI!

```bash
14:10:09.845 | INFO    | prefect.engine - Created flow run 'chocolate-pony' for flow 'dask-flow'
14:10:09.847 | INFO    | prefect.task_runner.dask - Connecting to an existing Dask cluster at tcp://127.0.0.1:59255
14:10:09.857 | INFO    | distributed.scheduler - Receive client connection: Client-8c1e0f24-9133-11ed-800e-86f2469c4e7a
14:10:09.859 | INFO    | distributed.core - Starting established connection to tcp://127.0.0.1:59516
14:10:09.862 | INFO    | prefect.task_runner.dask - The Dask dashboard is available at http://127.0.0.1:8787/status
14:10:11.344 | INFO    | Flow run 'chocolate-pony' - Created task run 'read_data-5bc97744-0' for task 'read_data'
14:10:11.626 | INFO    | Flow run 'chocolate-pony' - Submitted task run 'read_data-5bc97744-0' for execution.
14:10:11.795 | INFO    | Flow run 'chocolate-pony' - Created task run 'process_data-090555ba-0' for task 'process_data'
14:10:11.798 | INFO    | Flow run 'chocolate-pony' - Submitted task run 'process_data-090555ba-0' for execution.
14:10:13.279 | INFO    | Task run 'read_data-5bc97744-0' - Finished in state Completed()
14:11:43.539 | INFO    | Task run 'process_data-090555ba-0' - Finished in state Completed()
14:11:43.883 | INFO    | Flow run 'chocolate-pony' - Finished in state Completed('All states completed.')
```

## Resources

For additional examples, check out the [Usage Guide](usage_guide)!

### Installation

Get started by installing `prefect-dask`!

=== "pip"

    ```bash
    pip install -U prefect-dask
    ```

=== "conda"

    ```bash
    conda install -c conda-forge prefect-dask
    ```

Requires an installation of Python 3.7+.

We recommend using a Python virtual environment manager such as pipenv, conda, or virtualenv.

These tasks are designed to work with Prefect 2. For more information about how to use Prefect, please refer to the [Prefect documentation](https://docs.prefect.io/).

### Feedback

If you encounter any bugs while using `prefect-dask`, feel free to open an issue in the [prefect](https://github.com/PrefectHQ/prefect) repository.

If you have any questions or issues while using `prefect-dask`, you can find help in either the [Prefect Discourse forum](https://discourse.prefect.io/) or the [Prefect Slack community](https://prefect.io/slack).

### Contributing

If you'd like to help contribute to fix an issue or add a feature to `prefect-dask`, please [propose changes through a pull request from a fork of the repository](https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/proposing-changes-to-your-work-with-pull-requests/creating-a-pull-request-from-a-fork).

Here are the steps:

1. [Fork the repository](https://docs.github.com/en/get-started/quickstart/fork-a-repo#forking-a-repository)
2. [Clone the forked repository](https://docs.github.com/en/get-started/quickstart/fork-a-repo#cloning-your-forked-repository)
3. Install the repository and its dependencies:
```
pip install -e ".[dev]"
```
4. Make desired changes
5. Add tests
6. Install `pre-commit` to perform quality checks prior to commit:
```
pre-commit install
```
7. `git commit`, `git push`, and create a pull request

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "prefect-dask",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.8",
    "maintainer_email": null,
    "keywords": "prefect",
    "author": null,
    "author_email": "\"Prefect Technologies, Inc.\" <help@prefect.io>",
    "download_url": "https://files.pythonhosted.org/packages/15/da/d33d847c7a83b3aef270feb5bb02f6eed976997b268475a434b19fddcaf1/prefect_dask-0.2.7.tar.gz",
    "platform": null,
    "description": "# `prefect-dask`\n\n<p align=\"center\">\n    <a href=\"https://pypi.python.org/pypi/prefect-dask/\" alt=\"PyPI version\">\n        <img alt=\"PyPI\" src=\"https://img.shields.io/pypi/v/prefect-dask?color=26272B&labelColor=090422\"></a>\n    <a href=\"https://pepy.tech/badge/prefect-dask/\" alt=\"Downloads\">\n        <img src=\"https://img.shields.io/pypi/dm/prefect-dask?color=26272B&labelColor=090422\" /></a>\n</p>\n\nThe `prefect-dask` collection makes it easy to include distributed processing for your flows. Check out the examples below to get started!\n\n## Getting Started\n\n### Integrate with Prefect flows\n\nPerhaps you're already working with Prefect flows. Say your flow downloads many images to train your machine learning model. Unfortunately, it takes a long time to download your flows because your code is running sequentially.\n\nAfter installing `prefect-dask` you can parallelize your flow in three simple steps:\n\n1. Add the import: `from prefect_dask import DaskTaskRunner`\n2. Specify the task runner in the flow decorator: `@flow(task_runner=DaskTaskRunner)`\n3. Submit tasks to the flow's task runner: `a_task.submit(*args, **kwargs)`\n\nThe parallelized code  runs in about 1/3 of the time in our test!  And that's without distributing the workload over multiple machines.\nHere's the before and after!\n\n=== \"Before\"\n    ```python hl_lines=\"1\"\n    # Completed in 15.2 seconds\n\n    from typing import List\n    from pathlib import Path\n\n    import httpx\n    from prefect import flow, task\n\n    URL_FORMAT = (\n        \"https://www.cpc.ncep.noaa.gov/products/NMME/archive/\"\n        \"{year:04d}{month:02d}0800/current/images/nino34.rescaling.ENSMEAN.png\"\n    )\n\n    @task\n    def download_image(year: int, month: int, directory: Path) -> Path:\n        # download image from URL\n        url = URL_FORMAT.format(year=year, month=month)\n        resp = httpx.get(url)\n\n        # save content to directory/YYYYMM.png\n        file_path = (directory / url.split(\"/\")[-1]).with_stem(f\"{year:04d}{month:02d}\")\n        file_path.write_bytes(resp.content)\n        return file_path\n\n    @flow\n    def download_nino_34_plumes_from_year(year: int) -> List[Path]:\n        # create a directory to hold images\n        directory = Path(\"data\")\n        directory.mkdir(exist_ok=True)\n\n        # download all images\n        file_paths = []\n        for month in range(1, 12 + 1):\n            file_path = download_image(year, month, directory)\n            file_paths.append(file_path)\n        return file_paths\n\n    if __name__ == \"__main__\":\n        download_nino_34_plumes_from_year(2022)\n    ```\n\n=== \"After\"\n\n    ```python hl_lines=\"1 8 26 35\"\n    # Completed in 5.7 seconds\n\n    from typing import List\n    from pathlib import Path\n\n    import httpx\n    from prefect import flow, task\n    from prefect_dask import DaskTaskRunner\n\n    URL_FORMAT = (\n        \"https://www.cpc.ncep.noaa.gov/products/NMME/archive/\"\n        \"{year:04d}{month:02d}0800/current/images/nino34.rescaling.ENSMEAN.png\"\n    )\n\n    @task\n    def download_image(year: int, month: int, directory: Path) -> Path:\n        # download image from URL\n        url = URL_FORMAT.format(year=year, month=month)\n        resp = httpx.get(url)\n\n        # save content to directory/YYYYMM.png\n        file_path = (directory / url.split(\"/\")[-1]).with_stem(f\"{year:04d}{month:02d}\")\n        file_path.write_bytes(resp.content)\n        return file_path\n\n    @flow(task_runner=DaskTaskRunner(cluster_kwargs={\"processes\": False}))\n    def download_nino_34_plumes_from_year(year: int) -> List[Path]:\n        # create a directory to hold images\n        directory = Path(\"data\")\n        directory.mkdir(exist_ok=True)\n\n        # download all images\n        file_paths = []\n        for month in range(1, 12 + 1):\n            file_path = download_image.submit(year, month, directory)\n            file_paths.append(file_path)\n        return file_paths\n\n    if __name__ == \"__main__\":\n        download_nino_34_plumes_from_year(2022)\n    ```\n\nThe original flow completes in 15.2 seconds.\n\nHowever, with just a few minor tweaks, we were able to reduce the runtime by nearly **three** folds, down to just **5.7** seconds!\n\n### Integrate with Dask client/cluster and collections\n\nSuppose you have an existing Dask client/cluster and collection, like a `dask.dataframe.DataFrame`, and you want to add observability.\n\nWith `prefect-dask`, there's no major overhaul necessary because Prefect was designed with incremental adoption in mind! It's as easy as:\n\n1. Adding the imports\n2. Sprinkling a few `task` and `flow` decorators\n3. Using `get_dask_client` context manager on collections to distribute work across workers\n4. Specifying the task runner and client's address in the flow decorator\n5. Submitting the tasks to the flow's task runner\n\n=== \"Before\"\n\n    ```python\n    import dask.dataframe\n    import dask.distributed\n\n\n\n    client = dask.distributed.Client()\n\n\n    def read_data(start: str, end: str) -> dask.dataframe.DataFrame:\n        df = dask.datasets.timeseries(start, end, partition_freq=\"4w\")\n        return df\n\n\n    def process_data(df: dask.dataframe.DataFrame) -> dask.dataframe.DataFrame:\n\n        df_yearly_avg = df.groupby(df.index.year).mean()\n        return df_yearly_avg.compute()\n\n\n    def dask_pipeline():\n        df = read_data(\"1988\", \"2022\")\n        df_yearly_average = process_data(df)\n        return df_yearly_average\n\n    dask_pipeline()\n    ```\n\n\n=== \"After\"\n\n    ```python hl_lines=\"3 4 8 13 15 19 21 22\"\n    import dask.dataframe\n    import dask.distributed\n    from prefect import flow, task\n    from prefect_dask import DaskTaskRunner, get_dask_client\n\n    client = dask.distributed.Client()\n\n    @task\n    def read_data(start: str, end: str) -> dask.dataframe.DataFrame:\n        df = dask.datasets.timeseries(start, end, partition_freq=\"4w\")\n        return df\n\n    @task\n    def process_data(df: dask.dataframe.DataFrame) -> dask.dataframe.DataFrame:\n        with get_dask_client():\n            df_yearly_avg = df.groupby(df.index.year).mean()\n            return df_yearly_avg.compute()\n\n    @flow(task_runner=DaskTaskRunner(address=client.scheduler.address))\n    def dask_pipeline():\n        df = read_data.submit(\"1988\", \"2022\")\n        df_yearly_average = process_data.submit(df)\n        return df_yearly_average\n\n    dask_pipeline()\n    ```\n\nNow, you can conveniently see when each task completed, both in the terminal and the UI!\n\n```bash\n14:10:09.845 | INFO    | prefect.engine - Created flow run 'chocolate-pony' for flow 'dask-flow'\n14:10:09.847 | INFO    | prefect.task_runner.dask - Connecting to an existing Dask cluster at tcp://127.0.0.1:59255\n14:10:09.857 | INFO    | distributed.scheduler - Receive client connection: Client-8c1e0f24-9133-11ed-800e-86f2469c4e7a\n14:10:09.859 | INFO    | distributed.core - Starting established connection to tcp://127.0.0.1:59516\n14:10:09.862 | INFO    | prefect.task_runner.dask - The Dask dashboard is available at http://127.0.0.1:8787/status\n14:10:11.344 | INFO    | Flow run 'chocolate-pony' - Created task run 'read_data-5bc97744-0' for task 'read_data'\n14:10:11.626 | INFO    | Flow run 'chocolate-pony' - Submitted task run 'read_data-5bc97744-0' for execution.\n14:10:11.795 | INFO    | Flow run 'chocolate-pony' - Created task run 'process_data-090555ba-0' for task 'process_data'\n14:10:11.798 | INFO    | Flow run 'chocolate-pony' - Submitted task run 'process_data-090555ba-0' for execution.\n14:10:13.279 | INFO    | Task run 'read_data-5bc97744-0' - Finished in state Completed()\n14:11:43.539 | INFO    | Task run 'process_data-090555ba-0' - Finished in state Completed()\n14:11:43.883 | INFO    | Flow run 'chocolate-pony' - Finished in state Completed('All states completed.')\n```\n\n## Resources\n\nFor additional examples, check out the [Usage Guide](usage_guide)!\n\n### Installation\n\nGet started by installing `prefect-dask`!\n\n=== \"pip\"\n\n    ```bash\n    pip install -U prefect-dask\n    ```\n\n=== \"conda\"\n\n    ```bash\n    conda install -c conda-forge prefect-dask\n    ```\n\nRequires an installation of Python 3.7+.\n\nWe recommend using a Python virtual environment manager such as pipenv, conda, or virtualenv.\n\nThese tasks are designed to work with Prefect 2. For more information about how to use Prefect, please refer to the [Prefect documentation](https://docs.prefect.io/).\n\n### Feedback\n\nIf you encounter any bugs while using `prefect-dask`, feel free to open an issue in the [prefect](https://github.com/PrefectHQ/prefect) repository.\n\nIf you have any questions or issues while using `prefect-dask`, you can find help in either the [Prefect Discourse forum](https://discourse.prefect.io/) or the [Prefect Slack community](https://prefect.io/slack).\n\n### Contributing\n\nIf you'd like to help contribute to fix an issue or add a feature to `prefect-dask`, please [propose changes through a pull request from a fork of the repository](https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/proposing-changes-to-your-work-with-pull-requests/creating-a-pull-request-from-a-fork).\n\nHere are the steps:\n\n1. [Fork the repository](https://docs.github.com/en/get-started/quickstart/fork-a-repo#forking-a-repository)\n2. [Clone the forked repository](https://docs.github.com/en/get-started/quickstart/fork-a-repo#cloning-your-forked-repository)\n3. Install the repository and its dependencies:\n```\npip install -e \".[dev]\"\n```\n4. Make desired changes\n5. Add tests\n6. Install `pre-commit` to perform quality checks prior to commit:\n```\npre-commit install\n```\n7. `git commit`, `git push`, and create a pull request\n",
    "bugtrack_url": null,
    "license": "Apache License 2.0",
    "summary": "Prefect integrations with the Dask execution framework.",
    "version": "0.2.7",
    "project_urls": {
        "Homepage": "https://github.com/PrefectHQ/prefect/tree/main/src/integrations/prefect-dask"
    },
    "split_keywords": [
        "prefect"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "59ae7e975a085c634fbc169ace6d56b81891986549952b564a3d790f8aa93033",
                "md5": "b0da318ee56201fb5951d9896c5bb46d",
                "sha256": "b7537175dac378789feccc6bc9aa22088bdb350d5666f6617143edc8269db12e"
            },
            "downloads": -1,
            "filename": "prefect_dask-0.2.7-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "b0da318ee56201fb5951d9896c5bb46d",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.8",
            "size": 11523,
            "upload_time": "2024-04-25T19:21:38",
            "upload_time_iso_8601": "2024-04-25T19:21:38.046639Z",
            "url": "https://files.pythonhosted.org/packages/59/ae/7e975a085c634fbc169ace6d56b81891986549952b564a3d790f8aa93033/prefect_dask-0.2.7-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "15dad33d847c7a83b3aef270feb5bb02f6eed976997b268475a434b19fddcaf1",
                "md5": "a0f3dc4be7307474d0314ae84f10a347",
                "sha256": "8cc1705c645ea1cf34c3c01c2c58f323b84865561a0e7251c7c9b8f019612509"
            },
            "downloads": -1,
            "filename": "prefect_dask-0.2.7.tar.gz",
            "has_sig": false,
            "md5_digest": "a0f3dc4be7307474d0314ae84f10a347",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8",
            "size": 18532,
            "upload_time": "2024-04-25T19:21:39",
            "upload_time_iso_8601": "2024-04-25T19:21:39.820654Z",
            "url": "https://files.pythonhosted.org/packages/15/da/d33d847c7a83b3aef270feb5bb02f6eed976997b268475a434b19fddcaf1/prefect_dask-0.2.7.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-04-25 19:21:39",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "PrefectHQ",
    "github_project": "prefect",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "circle": true,
    "requirements": [],
    "lcname": "prefect-dask"
}
        
Elapsed time: 0.27583s