dagster-ray


Namedagster-ray JSON
Version 0.0.2 PyPI version JSON
download
home_pagehttps://github.com/danielgafni/dagster-ray
SummaryDagster integration library for Ray
upload_time2024-04-19 15:55:58
maintainerNone
docs_urlNone
authorDaniel Gafni
requires_python<3.13,>=3.8.1
licenseApache-2.0
keywords dagster ray etl distributed
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # `dagster-ray`


[![image](https://img.shields.io/pypi/v/dagster-ray.svg)](https://pypi.python.org/pypi/dagster-ray)
[![image](https://img.shields.io/pypi/l/dagster-ray.svg)](https://pypi.python.org/pypi/dagster-ray)
[![image](https://img.shields.io/pypi/pyversions/dagster-ray.svg)](https://pypi.python.org/pypi/dagster-ray)
[![CI](https://github.com/danielgafni/dagster-ray/actions/workflows/ci.yml/badge.svg)](https://github.com/danielgafni/dagster-ray/actions/workflows/CI.yml)
[![pre-commit](https://img.shields.io/badge/pre--commit-enabled-brightgreen?logo=pre-commit&logoColor=white)](https://github.com/pre-commit/pre-commit)
[![Checked with pyright](https://microsoft.github.io/pyright/img/pyright_badge.svg)](https://microsoft.github.io/pyright/)
[![Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff)

[Ray](https://github.com/ray-project/ray) integration library for [Dagster](https://github.com/dagster-io/dagster).

`dagster-ray` allows running Ray computations in Dagster pipelines. It provides various Dagster abstractions, the most important being `Resource`, and helper `@op`s and `@schedule`s, for multiple backends.

The following backends are implemented:
- local
- `KubeRay` (kubernetes)

`dagster-ray` is tested across multiple version combinations of components such as `ray`, `dagster`, `KubeRay Operator`, and `Python`.

`dagster-ray` integrates with [Dagster+](https://dagster.io/plus) out of the box.

Documentation can be found below.

> [!NOTE]
> This project is in early development. Contributions are very welcome! See the [Development](#development) section below.

# Backends

`dagster-ray` provides a `RayResource` class, which does not implement any specific backend.
It defines the common interface for all `Ray` resources.
It can be used for type annotations in your `@op` and `@asset` definitions.

Examples:

```python
from dagster import asset
from dagster_ray import RayResource
import ray


@asset
def my_asset(
    ray_cluster: RayResource,  # RayResource is only used as a type annotation
):
    return ray.get(ray.put(42))
```

The other resources below are the actual backends that implement the `RayResource` interface.

## Local

These resources can be used for development and testing purposes.
They provide the same interface as the other `*Ray` resources, but don't require any external infrastructure.

The public objects can be imported from `dagster_ray.local` module.

### Resources

#### `LocalRay`

A dummy resource which is useful for testing and development.
It doesn't do anything, but provides the same interface as the other `*Ray` resources.

Examples:


Using the `LocalRay` resource

```python
from dagster import asset, Definitions
from dagster_ray import RayResource
from dagster_ray.local import LocalRay
import ray


@asset
def my_asset(
    ray_cluster: RayResource,  # RayResource is only used as a type annotation
):  # this type annotation only defines the interface
    return ray.get(ray.put(42))


definitions = Definitions(resources={"ray_cluster": LocalRay()}, assets=[my_asset])
```

Conditionally using the `LocalRay` resource in development and `KubeRayCluster` in production:

```python
from dagster import asset, Definitions
from dagster_ray import RayResource
from dagster_ray.local import LocalRay
from dagster_ray.kuberay import KubeRayCluster
import ray


@asset
def my_asset(
    ray_cluster: RayResource,  # RayResource is only used as a type annotation
):  # this type annotation only defines the interface
    return ray.get(ray.put(42))


IN_K8s = ...


definitions = Definitions(
    resources={"ray_cluster": KubeRayCluster() if IN_K8s else LocalRay()},
    assets=[my_asset],
)
```

## KubeRay

This backend requires a Kubernetes cluster with the `KubeRay Operator` installed.

Integrates with [Dagster+](https://dagster.io/plus) by injecting environment variables such as `DAGSTER_CLOUD_DEPLOYMENT_NAME` and tags such as `dagster/user` into default configuration values and `RayCluster` labels.

The public objects can be imported from `dagster_ray.kuberay` module.

### Resources

#### `KubeRayCluster`

`KubeRayCluster` can be used for running Ray computations on Kubernetes.

When added as resource dependency to an `@op/@asset`, the `KubeRayCluster`:
 - Starts a dedicated `RayCluster` for it
 - Connects  to the cluster in client mode with `ray.init()` (unless `skip_init` is set to `True`)
 - Tears down the cluster after the step is executed (unless `skip_cleanup` is set to `True`)

`RayCluster` comes with minimal default configuration, matching `KubeRay` defaults.

Examples:

Basic usage (will create a single-node, non-scaling `RayCluster`):

```python
from dagster import asset, Definitions
from dagster_ray import RayResource
from dagster_ray.kuberay import KubeRayCluster
import ray


@asset
def my_asset(
    ray_cluster: RayResource,  # RayResource is only used as a type annotation
):  # this type annotation only defines the interface
    return ray.get(ray.put(42))


definitions = Definitions(
    resources={"ray_cluster": KubeRayCluster()}, assets=[my_asset]
)
```

Larger cluster with auto-scaling enabled:

```python
from dagster_ray.kuberay import KubeRayCluster, RayClusterConfig

ray_cluster = KubeRayCluster(
    ray_cluster=RayClusterConfig(
        enable_in_tree_autoscaling=True,
        worker_group_specs=[
            {
                "groupName": "workers",
                "replicas": 2,
                "minReplicas": 1,
                "maxReplicas": 10,
                # ...
            }
        ],
    )
)
```
#### `KubeRayAPI`

This resource can be used to interact with the Kubernetes API Server.

Examples:

Listing currently running `RayClusters`:

```python
from dagster import op, Definitions
from dagster_ray.kuberay import KubeRayAPI


@op
def list_ray_clusters(
    kube_ray_api: KubeRayAPI,
):
    return kube_ray_api.kuberay.list_ray_clusters(k8s_namespace="kuberay")
```

### Jobs

#### `delete_kuberay_clusters`

This `job` can be used to delete `RayClusters` from a given list of names.

#### `cleanup_old_ray_clusters`

This `job` can be used to delete old `RayClusters` which no longer correspond to any active Dagster Runs.
They may be left behind if the automatic cluster cleanup was disabled or failed.

### Schedules

Cleanup schedules can be trivially created using the `cleanup_old_ray_clusters` or `delete_kuberay_clusters` jobs.

#### `cleanup_old_ray_clusters`
`dagster-ray` provides an example daily cleanup schedule.

## Executor
WIP

# Development

```shell
poetry install --all-extras
poetry shell
pre-commit install
```

## Testing

### KubeRay

Required tools:

- `docker`
- `kubectl`
- `helm`
- `minikube`

Running `pytest` will **automatically**:
 - build an image with the local `dagster-ray` code
 - start a `minikube` Kubernetes cluster
 - load the built `dagster-ray` and loaded `kuberay-operator` images into the cluster
 - install the `KubeRay Operator` in the cluster with `helm`
 - run the tests

Thus, no manual setup is required, just the presence of the tools listed above. This makes testing a breeze!

> [!NOTE]
> Specifying a comma-separated list of `KubeRay Operator` versions in the `KUBE_RAY_OPERATOR_VERSIONS` environment variable will spawn a new test for each version.

> [!NOTE]
> it may take a while to download `minikube` and `kuberay-operator` images and build the local `dagster-ray` image during the first tests invocation

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/danielgafni/dagster-ray",
    "name": "dagster-ray",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<3.13,>=3.8.1",
    "maintainer_email": null,
    "keywords": "dagster, ray, ETL, distributed",
    "author": "Daniel Gafni",
    "author_email": "danielgafni16@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/bf/7a/0f84e9716e0d15f09e6c9ede937c9f26d87ca4bdd5035b0788e0868944f2/dagster_ray-0.0.2.tar.gz",
    "platform": null,
    "description": "# `dagster-ray`\n\n\n[![image](https://img.shields.io/pypi/v/dagster-ray.svg)](https://pypi.python.org/pypi/dagster-ray)\n[![image](https://img.shields.io/pypi/l/dagster-ray.svg)](https://pypi.python.org/pypi/dagster-ray)\n[![image](https://img.shields.io/pypi/pyversions/dagster-ray.svg)](https://pypi.python.org/pypi/dagster-ray)\n[![CI](https://github.com/danielgafni/dagster-ray/actions/workflows/ci.yml/badge.svg)](https://github.com/danielgafni/dagster-ray/actions/workflows/CI.yml)\n[![pre-commit](https://img.shields.io/badge/pre--commit-enabled-brightgreen?logo=pre-commit&logoColor=white)](https://github.com/pre-commit/pre-commit)\n[![Checked with pyright](https://microsoft.github.io/pyright/img/pyright_badge.svg)](https://microsoft.github.io/pyright/)\n[![Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff)\n\n[Ray](https://github.com/ray-project/ray) integration library for [Dagster](https://github.com/dagster-io/dagster).\n\n`dagster-ray` allows running Ray computations in Dagster pipelines. It provides various Dagster abstractions, the most important being `Resource`, and helper `@op`s and `@schedule`s, for multiple backends.\n\nThe following backends are implemented:\n- local\n- `KubeRay` (kubernetes)\n\n`dagster-ray` is tested across multiple version combinations of components such as `ray`, `dagster`, `KubeRay Operator`, and `Python`.\n\n`dagster-ray` integrates with [Dagster+](https://dagster.io/plus) out of the box.\n\nDocumentation can be found below.\n\n> [!NOTE]\n> This project is in early development. Contributions are very welcome! See the [Development](#development) section below.\n\n# Backends\n\n`dagster-ray` provides a `RayResource` class, which does not implement any specific backend.\nIt defines the common interface for all `Ray` resources.\nIt can be used for type annotations in your `@op` and `@asset` definitions.\n\nExamples:\n\n```python\nfrom dagster import asset\nfrom dagster_ray import RayResource\nimport ray\n\n\n@asset\ndef my_asset(\n    ray_cluster: RayResource,  # RayResource is only used as a type annotation\n):\n    return ray.get(ray.put(42))\n```\n\nThe other resources below are the actual backends that implement the `RayResource` interface.\n\n## Local\n\nThese resources can be used for development and testing purposes.\nThey provide the same interface as the other `*Ray` resources, but don't require any external infrastructure.\n\nThe public objects can be imported from `dagster_ray.local` module.\n\n### Resources\n\n#### `LocalRay`\n\nA dummy resource which is useful for testing and development.\nIt doesn't do anything, but provides the same interface as the other `*Ray` resources.\n\nExamples:\n\n\nUsing the `LocalRay` resource\n\n```python\nfrom dagster import asset, Definitions\nfrom dagster_ray import RayResource\nfrom dagster_ray.local import LocalRay\nimport ray\n\n\n@asset\ndef my_asset(\n    ray_cluster: RayResource,  # RayResource is only used as a type annotation\n):  # this type annotation only defines the interface\n    return ray.get(ray.put(42))\n\n\ndefinitions = Definitions(resources={\"ray_cluster\": LocalRay()}, assets=[my_asset])\n```\n\nConditionally using the `LocalRay` resource in development and `KubeRayCluster` in production:\n\n```python\nfrom dagster import asset, Definitions\nfrom dagster_ray import RayResource\nfrom dagster_ray.local import LocalRay\nfrom dagster_ray.kuberay import KubeRayCluster\nimport ray\n\n\n@asset\ndef my_asset(\n    ray_cluster: RayResource,  # RayResource is only used as a type annotation\n):  # this type annotation only defines the interface\n    return ray.get(ray.put(42))\n\n\nIN_K8s = ...\n\n\ndefinitions = Definitions(\n    resources={\"ray_cluster\": KubeRayCluster() if IN_K8s else LocalRay()},\n    assets=[my_asset],\n)\n```\n\n## KubeRay\n\nThis backend requires a Kubernetes cluster with the `KubeRay Operator` installed.\n\nIntegrates with [Dagster+](https://dagster.io/plus) by injecting environment variables such as `DAGSTER_CLOUD_DEPLOYMENT_NAME` and tags such as `dagster/user` into default configuration values and `RayCluster` labels.\n\nThe public objects can be imported from `dagster_ray.kuberay` module.\n\n### Resources\n\n#### `KubeRayCluster`\n\n`KubeRayCluster` can be used for running Ray computations on Kubernetes.\n\nWhen added as resource dependency to an `@op/@asset`, the `KubeRayCluster`:\n - Starts a dedicated `RayCluster` for it\n - Connects  to the cluster in client mode with `ray.init()` (unless `skip_init` is set to `True`)\n - Tears down the cluster after the step is executed (unless `skip_cleanup` is set to `True`)\n\n`RayCluster` comes with minimal default configuration, matching `KubeRay` defaults.\n\nExamples:\n\nBasic usage (will create a single-node, non-scaling `RayCluster`):\n\n```python\nfrom dagster import asset, Definitions\nfrom dagster_ray import RayResource\nfrom dagster_ray.kuberay import KubeRayCluster\nimport ray\n\n\n@asset\ndef my_asset(\n    ray_cluster: RayResource,  # RayResource is only used as a type annotation\n):  # this type annotation only defines the interface\n    return ray.get(ray.put(42))\n\n\ndefinitions = Definitions(\n    resources={\"ray_cluster\": KubeRayCluster()}, assets=[my_asset]\n)\n```\n\nLarger cluster with auto-scaling enabled:\n\n```python\nfrom dagster_ray.kuberay import KubeRayCluster, RayClusterConfig\n\nray_cluster = KubeRayCluster(\n    ray_cluster=RayClusterConfig(\n        enable_in_tree_autoscaling=True,\n        worker_group_specs=[\n            {\n                \"groupName\": \"workers\",\n                \"replicas\": 2,\n                \"minReplicas\": 1,\n                \"maxReplicas\": 10,\n                # ...\n            }\n        ],\n    )\n)\n```\n#### `KubeRayAPI`\n\nThis resource can be used to interact with the Kubernetes API Server.\n\nExamples:\n\nListing currently running `RayClusters`:\n\n```python\nfrom dagster import op, Definitions\nfrom dagster_ray.kuberay import KubeRayAPI\n\n\n@op\ndef list_ray_clusters(\n    kube_ray_api: KubeRayAPI,\n):\n    return kube_ray_api.kuberay.list_ray_clusters(k8s_namespace=\"kuberay\")\n```\n\n### Jobs\n\n#### `delete_kuberay_clusters`\n\nThis `job` can be used to delete `RayClusters` from a given list of names.\n\n#### `cleanup_old_ray_clusters`\n\nThis `job` can be used to delete old `RayClusters` which no longer correspond to any active Dagster Runs.\nThey may be left behind if the automatic cluster cleanup was disabled or failed.\n\n### Schedules\n\nCleanup schedules can be trivially created using the `cleanup_old_ray_clusters` or `delete_kuberay_clusters` jobs.\n\n#### `cleanup_old_ray_clusters`\n`dagster-ray` provides an example daily cleanup schedule.\n\n## Executor\nWIP\n\n# Development\n\n```shell\npoetry install --all-extras\npoetry shell\npre-commit install\n```\n\n## Testing\n\n### KubeRay\n\nRequired tools:\n\n- `docker`\n- `kubectl`\n- `helm`\n- `minikube`\n\nRunning `pytest` will **automatically**:\n - build an image with the local `dagster-ray` code\n - start a `minikube` Kubernetes cluster\n - load the built `dagster-ray` and loaded `kuberay-operator` images into the cluster\n - install the `KubeRay Operator` in the cluster with `helm`\n - run the tests\n\nThus, no manual setup is required, just the presence of the tools listed above. This makes testing a breeze!\n\n> [!NOTE]\n> Specifying a comma-separated list of `KubeRay Operator` versions in the `KUBE_RAY_OPERATOR_VERSIONS` environment variable will spawn a new test for each version.\n\n> [!NOTE]\n> it may take a while to download `minikube` and `kuberay-operator` images and build the local `dagster-ray` image during the first tests invocation\n",
    "bugtrack_url": null,
    "license": "Apache-2.0",
    "summary": "Dagster integration library for Ray",
    "version": "0.0.2",
    "project_urls": {
        "Homepage": "https://github.com/danielgafni/dagster-ray",
        "Repository": "https://github.com/danielgafni/dagster-ray"
    },
    "split_keywords": [
        "dagster",
        " ray",
        " etl",
        " distributed"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "0c7e99e4011d5c4a000b5426af3a52258c160c66670f278c1aa964ccaba8985d",
                "md5": "1487d92285f22f2ca6d437f2b364aaec",
                "sha256": "4e91cf1082c654175c9aa62d4d57717a718d5daa11205160a0a58e4c1a8e7a5e"
            },
            "downloads": -1,
            "filename": "dagster_ray-0.0.2-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "1487d92285f22f2ca6d437f2b364aaec",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<3.13,>=3.8.1",
            "size": 21187,
            "upload_time": "2024-04-19T15:55:57",
            "upload_time_iso_8601": "2024-04-19T15:55:57.136389Z",
            "url": "https://files.pythonhosted.org/packages/0c/7e/99e4011d5c4a000b5426af3a52258c160c66670f278c1aa964ccaba8985d/dagster_ray-0.0.2-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "bf7a0f84e9716e0d15f09e6c9ede937c9f26d87ca4bdd5035b0788e0868944f2",
                "md5": "315f7a2127d53b44a9c0733aeebfc9e8",
                "sha256": "0e425bf0fe0061939ca170117d91cbdcecd46e59352cbcd8c2d58c573a8af861"
            },
            "downloads": -1,
            "filename": "dagster_ray-0.0.2.tar.gz",
            "has_sig": false,
            "md5_digest": "315f7a2127d53b44a9c0733aeebfc9e8",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<3.13,>=3.8.1",
            "size": 19006,
            "upload_time": "2024-04-19T15:55:58",
            "upload_time_iso_8601": "2024-04-19T15:55:58.838088Z",
            "url": "https://files.pythonhosted.org/packages/bf/7a/0f84e9716e0d15f09e6c9ede937c9f26d87ca4bdd5035b0788e0868944f2/dagster_ray-0.0.2.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-04-19 15:55:58",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "danielgafni",
    "github_project": "dagster-ray",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "dagster-ray"
}
        
Elapsed time: 0.23884s