prefect-ray


Nameprefect-ray JSON
Version 0.3.4 PyPI version JSON
download
home_pageNone
SummaryPrefect integrations with the Ray execution framework.
upload_time2024-04-25 21:20:13
maintainerNone
docs_urlNone
authorNone
requires_python>=3.8
licenseApache License 2.0
keywords prefect
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # prefect-ray

<p align="center">
    <!--- Insert a cover image here -->
    <!--- <br> -->
    <a href="https://pypi.python.org/pypi/prefect-ray/" alt="PyPI version">
        <img alt="PyPI" src="https://img.shields.io/pypi/v/prefect-ray?color=26272B&labelColor=090422"></a>
    <a href="https://pepy.tech/badge/prefect-ray/" alt="Downloads">
        <img src="https://img.shields.io/pypi/dm/prefect-ray?color=26272B&labelColor=090422" /></a>
</p>

## Welcome!
Visit the full docs [here](https://PrefectHQ.github.io/prefect-ray) to see additional examples and the API reference.

`prefect-ray` contains Prefect integrations with the [Ray](https://www.ray.io/) execution framework, a flexible distributed computing framework for Python.

Provides a `RayTaskRunner` that enables Prefect flows to run tasks execute tasks in parallel using Ray.

## Getting Started

### Python setup

Requires an installation of Python 3.8 or newer.

We recommend using a Python virtual environment manager such as pipenv, conda, or virtualenv.

These tasks are designed to work with Prefect 2.0+. For more information about how to use Prefect, please refer to the [Prefect documentation](https://docs.prefect.io/).

### Installation

Install `prefect-ray` with `pip`:

```bash
pip install prefect-ray
```

Users running Apple Silicon (such as M1 macs) should check out the Ray docs [here](https://docs.ray.io/en/master/ray-overview/installation.html#m1-mac-apple-silicon-support) for more details.

## Running tasks on Ray

The `RayTaskRunner` is a [Prefect task runner](https://docs.prefect.io/concepts/task-runners/) that submits tasks to [Ray](https://www.ray.io/) for parallel execution. 

By default, a temporary Ray instance is created for the duration of the flow run.

For example, this flow counts to 3 in parallel.

```python
import time

from prefect import flow, task
from prefect_ray import RayTaskRunner

@task
def shout(number):
    time.sleep(0.5)
    print(f"#{number}")

@flow(task_runner=RayTaskRunner)
def count_to(highest_number):
    for number in range(highest_number):
        shout.submit(number)

if __name__ == "__main__":
    count_to(10)

# outputs
#3
#7
#2
#6
#4
#0
#1
#5
#8
#9
```

If you already have a Ray instance running, you can provide the connection URL via an `address` argument.

To configure your flow to use the `RayTaskRunner`:

1. Make sure the `prefect-ray` collection is installed as described earlier: `pip install prefect-ray`.
2. In your flow code, import `RayTaskRunner` from `prefect_ray.task_runners`.
3. Assign it as the task runner when the flow is defined using the `task_runner=RayTaskRunner` argument.

For example, this flow uses the `RayTaskRunner` with a local, temporary Ray instance created by Prefect at flow run time.

```python
from prefect import flow
from prefect_ray.task_runners import RayTaskRunner

@flow(task_runner=RayTaskRunner())
def my_flow():
    ... 
```

This flow uses the `RayTaskRunner` configured to access an existing Ray instance at `ray://192.0.2.255:8786`.

```python
from prefect import flow
from prefect_ray.task_runners import RayTaskRunner

@flow(task_runner=RayTaskRunner(address="ray://192.0.2.255:8786"))
def my_flow():
    ... 
```

`RayTaskRunner` accepts the following optional parameters:

| Parameter | Description |
| --- | --- |
| address | Address of a currently running Ray instance, starting with the [ray://](https://docs.ray.io/en/master/cluster/ray-client.html) URI. |
| init_kwargs | Additional kwargs to use when calling `ray.init`. |

Note that Ray Client uses the [ray://](https://docs.ray.io/en/master/cluster/ray-client.html) URI to indicate the address of a Ray instance. If you don't provide the `address` of a Ray instance, Prefect creates a temporary instance automatically.

!!! warning "Ray environment limitations"
    Ray support for non-x86/64 architectures such as ARM/M1 processors with installation from `pip` alone and will be skipped during installation of Prefect. It is possible to manually install the blocking component with `conda`. See the [Ray documentation](https://docs.ray.io/en/latest/ray-overview/installation.html#m1-mac-apple-silicon-support) for instructions.

    See the [Ray installation documentation](https://docs.ray.io/en/latest/ray-overview/installation.html) for further compatibility information.

## Running tasks on a Ray remote cluster

When using the `RayTaskRunner` with a remote Ray cluster, you may run into issues that are not seen when using a local Ray instance. To resolve these issues, we recommend taking the following steps when working with a remote Ray cluster:

1. By default, Prefect will not persist any data to the filesystem of the remote ray worker. However, if you want to take advantage of Prefect's caching ability, you will need to configure a remote result storage to persist results across task runs. 

We recommend using the [Prefect UI to configure a storage block](https://docs.prefect.io/concepts/blocks/) to use for remote results storage.

Here's an example of a flow that uses caching and remote result storage:
```python
from typing import List

from prefect import flow, get_run_logger, task
from prefect.filesystems import S3
from prefect.tasks import task_input_hash
from prefect_ray.task_runners import RayTaskRunner


# The result of this task will be cached in the configured result storage
@task(cache_key_fn=task_input_hash)
def say_hello(name: str) -> None:
    logger = get_run_logger()
    # This log statement will print only on the first run. Subsequent runs will be cached.
    logger.info(f"hello {name}!")
    return name


@flow(
    task_runner=RayTaskRunner(
        address="ray://<instance_public_ip_address>:10001",
    ),
    # Using an S3 block that has already been created via the Prefect UI
    result_storage="s3/my-result-storage",
)
def greetings(names: List[str]) -> None:
    for name in names:
        say_hello.submit(name)


if __name__ == "__main__":
    greetings(["arthur", "trillian", "ford", "marvin"])
```

2. If you get an error stating that the module 'prefect' cannot be found, ensure `prefect` is installed on the remote cluster, with:
```bash
pip install prefect
```

3. If you get an error with a message similar to "File system created with scheme 's3' could not be created", ensure the required Python modules are installed on **both local and remote machines**. The required prerequisite modules can be found in the [Prefect documentation](https://docs.prefect.io/guides/deployment/storage-guide). For example, if using S3 for the remote storage:
```bash
pip install s3fs
```

4. If you are seeing timeout or other connection errors, double check the address provided to the `RayTaskRunner`. The address should look similar to: `address='ray://<head_node_ip_address>:10001'`:
```bash
RayTaskRunner(address="ray://1.23.199.255:10001")
```

## Specifying remote options

The `remote_options` context can be used to control the task’s remote options.

For example, we can set the number of CPUs and GPUs to use for the `process` task:

```python
from prefect import flow, task
from prefect_ray.task_runners import RayTaskRunner
from prefect_ray.context import remote_options

@task
def process(x):
    return x + 1


@flow(task_runner=RayTaskRunner())
def my_flow():
    # equivalent to setting @ray.remote(num_cpus=4, num_gpus=2)
    with remote_options(num_cpus=4, num_gpus=2):
        process.submit(42)
```

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "prefect-ray",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.8",
    "maintainer_email": null,
    "keywords": "prefect",
    "author": null,
    "author_email": "\"Prefect Technologies, Inc.\" <help@prefect.io>",
    "download_url": "https://files.pythonhosted.org/packages/57/c7/4530d609a6e5c5fab51fd58d1ed99efe3abb10e6d7d2045e9d374da6f940/prefect_ray-0.3.4.tar.gz",
    "platform": null,
    "description": "# prefect-ray\n\n<p align=\"center\">\n    <!--- Insert a cover image here -->\n    <!--- <br> -->\n    <a href=\"https://pypi.python.org/pypi/prefect-ray/\" alt=\"PyPI version\">\n        <img alt=\"PyPI\" src=\"https://img.shields.io/pypi/v/prefect-ray?color=26272B&labelColor=090422\"></a>\n    <a href=\"https://pepy.tech/badge/prefect-ray/\" alt=\"Downloads\">\n        <img src=\"https://img.shields.io/pypi/dm/prefect-ray?color=26272B&labelColor=090422\" /></a>\n</p>\n\n## Welcome!\nVisit the full docs [here](https://PrefectHQ.github.io/prefect-ray) to see additional examples and the API reference.\n\n`prefect-ray` contains Prefect integrations with the [Ray](https://www.ray.io/) execution framework, a flexible distributed computing framework for Python.\n\nProvides a `RayTaskRunner` that enables Prefect flows to run tasks execute tasks in parallel using Ray.\n\n## Getting Started\n\n### Python setup\n\nRequires an installation of Python 3.8 or newer.\n\nWe recommend using a Python virtual environment manager such as pipenv, conda, or virtualenv.\n\nThese tasks are designed to work with Prefect 2.0+. For more information about how to use Prefect, please refer to the [Prefect documentation](https://docs.prefect.io/).\n\n### Installation\n\nInstall `prefect-ray` with `pip`:\n\n```bash\npip install prefect-ray\n```\n\nUsers running Apple Silicon (such as M1 macs) should check out the Ray docs [here](https://docs.ray.io/en/master/ray-overview/installation.html#m1-mac-apple-silicon-support) for more details.\n\n## Running tasks on Ray\n\nThe `RayTaskRunner` is a [Prefect task runner](https://docs.prefect.io/concepts/task-runners/) that submits tasks to [Ray](https://www.ray.io/) for parallel execution. \n\nBy default, a temporary Ray instance is created for the duration of the flow run.\n\nFor example, this flow counts to 3 in parallel.\n\n```python\nimport time\n\nfrom prefect import flow, task\nfrom prefect_ray import RayTaskRunner\n\n@task\ndef shout(number):\n    time.sleep(0.5)\n    print(f\"#{number}\")\n\n@flow(task_runner=RayTaskRunner)\ndef count_to(highest_number):\n    for number in range(highest_number):\n        shout.submit(number)\n\nif __name__ == \"__main__\":\n    count_to(10)\n\n# outputs\n#3\n#7\n#2\n#6\n#4\n#0\n#1\n#5\n#8\n#9\n```\n\nIf you already have a Ray instance running, you can provide the connection URL via an `address` argument.\n\nTo configure your flow to use the `RayTaskRunner`:\n\n1. Make sure the `prefect-ray` collection is installed as described earlier: `pip install prefect-ray`.\n2. In your flow code, import `RayTaskRunner` from `prefect_ray.task_runners`.\n3. Assign it as the task runner when the flow is defined using the `task_runner=RayTaskRunner` argument.\n\nFor example, this flow uses the `RayTaskRunner` with a local, temporary Ray instance created by Prefect at flow run time.\n\n```python\nfrom prefect import flow\nfrom prefect_ray.task_runners import RayTaskRunner\n\n@flow(task_runner=RayTaskRunner())\ndef my_flow():\n    ... \n```\n\nThis flow uses the `RayTaskRunner` configured to access an existing Ray instance at `ray://192.0.2.255:8786`.\n\n```python\nfrom prefect import flow\nfrom prefect_ray.task_runners import RayTaskRunner\n\n@flow(task_runner=RayTaskRunner(address=\"ray://192.0.2.255:8786\"))\ndef my_flow():\n    ... \n```\n\n`RayTaskRunner` accepts the following optional parameters:\n\n| Parameter | Description |\n| --- | --- |\n| address | Address of a currently running Ray instance, starting with the [ray://](https://docs.ray.io/en/master/cluster/ray-client.html) URI. |\n| init_kwargs | Additional kwargs to use when calling `ray.init`. |\n\nNote that Ray Client uses the [ray://](https://docs.ray.io/en/master/cluster/ray-client.html) URI to indicate the address of a Ray instance. If you don't provide the `address` of a Ray instance, Prefect creates a temporary instance automatically.\n\n!!! warning \"Ray environment limitations\"\n    Ray support for non-x86/64 architectures such as ARM/M1 processors with installation from `pip` alone and will be skipped during installation of Prefect. It is possible to manually install the blocking component with `conda`. See the [Ray documentation](https://docs.ray.io/en/latest/ray-overview/installation.html#m1-mac-apple-silicon-support) for instructions.\n\n    See the [Ray installation documentation](https://docs.ray.io/en/latest/ray-overview/installation.html) for further compatibility information.\n\n## Running tasks on a Ray remote cluster\n\nWhen using the `RayTaskRunner` with a remote Ray cluster, you may run into issues that are not seen when using a local Ray instance. To resolve these issues, we recommend taking the following steps when working with a remote Ray cluster:\n\n1. By default, Prefect will not persist any data to the filesystem of the remote ray worker. However, if you want to take advantage of Prefect's caching ability, you will need to configure a remote result storage to persist results across task runs. \n\nWe recommend using the [Prefect UI to configure a storage block](https://docs.prefect.io/concepts/blocks/) to use for remote results storage.\n\nHere's an example of a flow that uses caching and remote result storage:\n```python\nfrom typing import List\n\nfrom prefect import flow, get_run_logger, task\nfrom prefect.filesystems import S3\nfrom prefect.tasks import task_input_hash\nfrom prefect_ray.task_runners import RayTaskRunner\n\n\n# The result of this task will be cached in the configured result storage\n@task(cache_key_fn=task_input_hash)\ndef say_hello(name: str) -> None:\n    logger = get_run_logger()\n    # This log statement will print only on the first run. Subsequent runs will be cached.\n    logger.info(f\"hello {name}!\")\n    return name\n\n\n@flow(\n    task_runner=RayTaskRunner(\n        address=\"ray://<instance_public_ip_address>:10001\",\n    ),\n    # Using an S3 block that has already been created via the Prefect UI\n    result_storage=\"s3/my-result-storage\",\n)\ndef greetings(names: List[str]) -> None:\n    for name in names:\n        say_hello.submit(name)\n\n\nif __name__ == \"__main__\":\n    greetings([\"arthur\", \"trillian\", \"ford\", \"marvin\"])\n```\n\n2. If you get an error stating that the module 'prefect' cannot be found, ensure `prefect` is installed on the remote cluster, with:\n```bash\npip install prefect\n```\n\n3. If you get an error with a message similar to \"File system created with scheme 's3' could not be created\", ensure the required Python modules are installed on **both local and remote machines**. The required prerequisite modules can be found in the [Prefect documentation](https://docs.prefect.io/guides/deployment/storage-guide). For example, if using S3 for the remote storage:\n```bash\npip install s3fs\n```\n\n4. If you are seeing timeout or other connection errors, double check the address provided to the `RayTaskRunner`. The address should look similar to: `address='ray://<head_node_ip_address>:10001'`:\n```bash\nRayTaskRunner(address=\"ray://1.23.199.255:10001\")\n```\n\n## Specifying remote options\n\nThe `remote_options` context can be used to control the task\u2019s remote options.\n\nFor example, we can set the number of CPUs and GPUs to use for the `process` task:\n\n```python\nfrom prefect import flow, task\nfrom prefect_ray.task_runners import RayTaskRunner\nfrom prefect_ray.context import remote_options\n\n@task\ndef process(x):\n    return x + 1\n\n\n@flow(task_runner=RayTaskRunner())\ndef my_flow():\n    # equivalent to setting @ray.remote(num_cpus=4, num_gpus=2)\n    with remote_options(num_cpus=4, num_gpus=2):\n        process.submit(42)\n```\n",
    "bugtrack_url": null,
    "license": "Apache License 2.0",
    "summary": "Prefect integrations with the Ray execution framework.",
    "version": "0.3.4",
    "project_urls": {
        "Homepage": "https://github.com/PrefectHQ/prefect/tree/main/src/integrations/prefect-ray"
    },
    "split_keywords": [
        "prefect"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "d8868b2c9eb930b1b1478799fc0b7944d5236aa0514ae786321d08928ddf60dc",
                "md5": "cf66fc649335de60a574e7c4a47386f4",
                "sha256": "a7479799a4978ecaaef49e86bd5c46ec155a8a797635f9bca53dad06e8e59f13"
            },
            "downloads": -1,
            "filename": "prefect_ray-0.3.4-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "cf66fc649335de60a574e7c4a47386f4",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.8",
            "size": 9195,
            "upload_time": "2024-04-25T21:20:06",
            "upload_time_iso_8601": "2024-04-25T21:20:06.614354Z",
            "url": "https://files.pythonhosted.org/packages/d8/86/8b2c9eb930b1b1478799fc0b7944d5236aa0514ae786321d08928ddf60dc/prefect_ray-0.3.4-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "57c74530d609a6e5c5fab51fd58d1ed99efe3abb10e6d7d2045e9d374da6f940",
                "md5": "b995947847ef1a3ee21ad1d12693b84f",
                "sha256": "4da2d77986a4c9a4bc61aaeff07b034ecef5d96e61d82bae6e83f22ef61ed2e6"
            },
            "downloads": -1,
            "filename": "prefect_ray-0.3.4.tar.gz",
            "has_sig": false,
            "md5_digest": "b995947847ef1a3ee21ad1d12693b84f",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8",
            "size": 14841,
            "upload_time": "2024-04-25T21:20:13",
            "upload_time_iso_8601": "2024-04-25T21:20:13.539207Z",
            "url": "https://files.pythonhosted.org/packages/57/c7/4530d609a6e5c5fab51fd58d1ed99efe3abb10e6d7d2045e9d374da6f940/prefect_ray-0.3.4.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-04-25 21:20:13",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "PrefectHQ",
    "github_project": "prefect",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "circle": true,
    "requirements": [],
    "lcname": "prefect-ray"
}
        
Elapsed time: 0.24047s