# `dagster-ray`
[![image](https://img.shields.io/pypi/v/dagster-ray.svg)](https://pypi.python.org/pypi/dagster-ray)
[![image](https://img.shields.io/pypi/l/dagster-ray.svg)](https://pypi.python.org/pypi/dagster-ray)
[![image](https://img.shields.io/pypi/pyversions/dagster-ray.svg)](https://pypi.python.org/pypi/dagster-ray)
[![CI](https://github.com/danielgafni/dagster-ray/actions/workflows/ci.yml/badge.svg)](https://github.com/danielgafni/dagster-ray/actions/workflows/CI.yml)
[![pre-commit](https://img.shields.io/badge/pre--commit-enabled-brightgreen?logo=pre-commit&logoColor=white)](https://github.com/pre-commit/pre-commit)
[![Checked with pyright](https://microsoft.github.io/pyright/img/pyright_badge.svg)](https://microsoft.github.io/pyright/)
[![Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff)
[Ray](https://github.com/ray-project/ray) integration library for [Dagster](https://github.com/dagster-io/dagster).
`dagster-ray` allows running Ray computations in Dagster pipelines. It provides various Dagster abstractions, the most important being `Resource`, and helper `@op`s and `@schedule`s, for multiple backends.
The following backends are implemented:
- local
- `KubeRay` (kubernetes)
`dagster-ray` is tested across multiple version combinations of components such as `ray`, `dagster`, `KubeRay Operator`, and `Python`.
`dagster-ray` integrates with [Dagster+](https://dagster.io/plus) out of the box.
Documentation can be found below.
> [!NOTE]
> This project is in early development. Contributions are very welcome! See the [Development](#development) section below.
# Backends
`dagster-ray` provides a `RayResource` class, which does not implement any specific backend.
It defines the common interface for all `Ray` resources.
It can be used for type annotations in your `@op` and `@asset` definitions.
Examples:
```python
from dagster import asset
from dagster_ray import RayResource
import ray
@asset
def my_asset(
ray_cluster: RayResource, # RayResource is only used as a type annotation
):
return ray.get(ray.put(42))
```
The other resources below are the actual backends that implement the `RayResource` interface.
## Local
These resources can be used for development and testing purposes.
They provide the same interface as the other `*Ray` resources, but don't require any external infrastructure.
The public objects can be imported from `dagster_ray.local` module.
### Resources
#### `LocalRay`
A dummy resource which is useful for testing and development.
It doesn't do anything, but provides the same interface as the other `*Ray` resources.
Examples:
Using the `LocalRay` resource
```python
from dagster import asset, Definitions
from dagster_ray import RayResource
from dagster_ray.local import LocalRay
import ray
@asset
def my_asset(
ray_cluster: RayResource, # RayResource is only used as a type annotation
): # this type annotation only defines the interface
return ray.get(ray.put(42))
definitions = Definitions(resources={"ray_cluster": LocalRay()}, assets=[my_asset])
```
Conditionally using the `LocalRay` resource in development and `KubeRayCluster` in production:
```python
from dagster import asset, Definitions
from dagster_ray import RayResource
from dagster_ray.local import LocalRay
from dagster_ray.kuberay import KubeRayCluster
import ray
@asset
def my_asset(
ray_cluster: RayResource, # RayResource is only used as a type annotation
): # this type annotation only defines the interface
return ray.get(ray.put(42))
IN_K8s = ...
definitions = Definitions(
resources={"ray_cluster": KubeRayCluster() if IN_K8s else LocalRay()},
assets=[my_asset],
)
```
## KubeRay
This backend requires a Kubernetes cluster with the `KubeRay Operator` installed.
Integrates with [Dagster+](https://dagster.io/plus) by injecting environment variables such as `DAGSTER_CLOUD_DEPLOYMENT_NAME` and tags such as `dagster/user` into default configuration values and `RayCluster` labels.
The public objects can be imported from `dagster_ray.kuberay` module.
### Resources
#### `KubeRayCluster`
`KubeRayCluster` can be used for running Ray computations on Kubernetes.
When added as resource dependency to an `@op/@asset`, the `KubeRayCluster`:
- Starts a dedicated `RayCluster` for it
- Connects to the cluster in client mode with `ray.init()` (unless `skip_init` is set to `True`)
- Tears down the cluster after the step is executed (unless `skip_cleanup` is set to `True`)
`RayCluster` comes with minimal default configuration, matching `KubeRay` defaults.
Examples:
Basic usage (will create a single-node, non-scaling `RayCluster`):
```python
from dagster import asset, Definitions
from dagster_ray import RayResource
from dagster_ray.kuberay import KubeRayCluster
import ray
@asset
def my_asset(
ray_cluster: RayResource, # RayResource is only used as a type annotation
): # this type annotation only defines the interface
return ray.get(ray.put(42))
definitions = Definitions(
resources={"ray_cluster": KubeRayCluster()}, assets=[my_asset]
)
```
Larger cluster with auto-scaling enabled:
```python
from dagster_ray.kuberay import KubeRayCluster, RayClusterConfig
ray_cluster = KubeRayCluster(
ray_cluster=RayClusterConfig(
enable_in_tree_autoscaling=True,
worker_group_specs=[
{
"groupName": "workers",
"replicas": 2,
"minReplicas": 1,
"maxReplicas": 10,
# ...
}
],
)
)
```
#### `KubeRayAPI`
This resource can be used to interact with the Kubernetes API Server.
Examples:
Listing currently running `RayClusters`:
```python
from dagster import op, Definitions
from dagster_ray.kuberay import KubeRayAPI
@op
def list_ray_clusters(
kube_ray_api: KubeRayAPI,
):
return kube_ray_api.kuberay.list_ray_clusters(k8s_namespace="kuberay")
```
### Jobs
#### `delete_kuberay_clusters`
This `job` can be used to delete `RayClusters` from a given list of names.
#### `cleanup_old_ray_clusters`
This `job` can be used to delete old `RayClusters` which no longer correspond to any active Dagster Runs.
They may be left behind if the automatic cluster cleanup was disabled or failed.
### Schedules
Cleanup schedules can be trivially created using the `cleanup_old_ray_clusters` or `delete_kuberay_clusters` jobs.
#### `cleanup_old_ray_clusters`
`dagster-ray` provides an example daily cleanup schedule.
## Executor
WIP
# Development
```shell
poetry install --all-extras
poetry shell
pre-commit install
```
## Testing
### KubeRay
Required tools:
- `docker`
- `kubectl`
- `helm`
- `minikube`
Running `pytest` will **automatically**:
- build an image with the local `dagster-ray` code
- start a `minikube` Kubernetes cluster
- load the built `dagster-ray` and loaded `kuberay-operator` images into the cluster
- install the `KubeRay Operator` in the cluster with `helm`
- run the tests
Thus, no manual setup is required, just the presence of the tools listed above. This makes testing a breeze!
> [!NOTE]
> Specifying a comma-separated list of `KubeRay Operator` versions in the `KUBE_RAY_OPERATOR_VERSIONS` environment variable will spawn a new test for each version.
> [!NOTE]
> it may take a while to download `minikube` and `kuberay-operator` images and build the local `dagster-ray` image during the first tests invocation
Raw data
{
"_id": null,
"home_page": "https://github.com/danielgafni/dagster-ray",
"name": "dagster-ray",
"maintainer": null,
"docs_url": null,
"requires_python": "<3.13,>=3.8.1",
"maintainer_email": null,
"keywords": "dagster, ray, ETL, distributed",
"author": "Daniel Gafni",
"author_email": "danielgafni16@gmail.com",
"download_url": "https://files.pythonhosted.org/packages/bf/7a/0f84e9716e0d15f09e6c9ede937c9f26d87ca4bdd5035b0788e0868944f2/dagster_ray-0.0.2.tar.gz",
"platform": null,
"description": "# `dagster-ray`\n\n\n[![image](https://img.shields.io/pypi/v/dagster-ray.svg)](https://pypi.python.org/pypi/dagster-ray)\n[![image](https://img.shields.io/pypi/l/dagster-ray.svg)](https://pypi.python.org/pypi/dagster-ray)\n[![image](https://img.shields.io/pypi/pyversions/dagster-ray.svg)](https://pypi.python.org/pypi/dagster-ray)\n[![CI](https://github.com/danielgafni/dagster-ray/actions/workflows/ci.yml/badge.svg)](https://github.com/danielgafni/dagster-ray/actions/workflows/CI.yml)\n[![pre-commit](https://img.shields.io/badge/pre--commit-enabled-brightgreen?logo=pre-commit&logoColor=white)](https://github.com/pre-commit/pre-commit)\n[![Checked with pyright](https://microsoft.github.io/pyright/img/pyright_badge.svg)](https://microsoft.github.io/pyright/)\n[![Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff)\n\n[Ray](https://github.com/ray-project/ray) integration library for [Dagster](https://github.com/dagster-io/dagster).\n\n`dagster-ray` allows running Ray computations in Dagster pipelines. It provides various Dagster abstractions, the most important being `Resource`, and helper `@op`s and `@schedule`s, for multiple backends.\n\nThe following backends are implemented:\n- local\n- `KubeRay` (kubernetes)\n\n`dagster-ray` is tested across multiple version combinations of components such as `ray`, `dagster`, `KubeRay Operator`, and `Python`.\n\n`dagster-ray` integrates with [Dagster+](https://dagster.io/plus) out of the box.\n\nDocumentation can be found below.\n\n> [!NOTE]\n> This project is in early development. Contributions are very welcome! See the [Development](#development) section below.\n\n# Backends\n\n`dagster-ray` provides a `RayResource` class, which does not implement any specific backend.\nIt defines the common interface for all `Ray` resources.\nIt can be used for type annotations in your `@op` and `@asset` definitions.\n\nExamples:\n\n```python\nfrom dagster import asset\nfrom dagster_ray import RayResource\nimport ray\n\n\n@asset\ndef my_asset(\n ray_cluster: RayResource, # RayResource is only used as a type annotation\n):\n return ray.get(ray.put(42))\n```\n\nThe other resources below are the actual backends that implement the `RayResource` interface.\n\n## Local\n\nThese resources can be used for development and testing purposes.\nThey provide the same interface as the other `*Ray` resources, but don't require any external infrastructure.\n\nThe public objects can be imported from `dagster_ray.local` module.\n\n### Resources\n\n#### `LocalRay`\n\nA dummy resource which is useful for testing and development.\nIt doesn't do anything, but provides the same interface as the other `*Ray` resources.\n\nExamples:\n\n\nUsing the `LocalRay` resource\n\n```python\nfrom dagster import asset, Definitions\nfrom dagster_ray import RayResource\nfrom dagster_ray.local import LocalRay\nimport ray\n\n\n@asset\ndef my_asset(\n ray_cluster: RayResource, # RayResource is only used as a type annotation\n): # this type annotation only defines the interface\n return ray.get(ray.put(42))\n\n\ndefinitions = Definitions(resources={\"ray_cluster\": LocalRay()}, assets=[my_asset])\n```\n\nConditionally using the `LocalRay` resource in development and `KubeRayCluster` in production:\n\n```python\nfrom dagster import asset, Definitions\nfrom dagster_ray import RayResource\nfrom dagster_ray.local import LocalRay\nfrom dagster_ray.kuberay import KubeRayCluster\nimport ray\n\n\n@asset\ndef my_asset(\n ray_cluster: RayResource, # RayResource is only used as a type annotation\n): # this type annotation only defines the interface\n return ray.get(ray.put(42))\n\n\nIN_K8s = ...\n\n\ndefinitions = Definitions(\n resources={\"ray_cluster\": KubeRayCluster() if IN_K8s else LocalRay()},\n assets=[my_asset],\n)\n```\n\n## KubeRay\n\nThis backend requires a Kubernetes cluster with the `KubeRay Operator` installed.\n\nIntegrates with [Dagster+](https://dagster.io/plus) by injecting environment variables such as `DAGSTER_CLOUD_DEPLOYMENT_NAME` and tags such as `dagster/user` into default configuration values and `RayCluster` labels.\n\nThe public objects can be imported from `dagster_ray.kuberay` module.\n\n### Resources\n\n#### `KubeRayCluster`\n\n`KubeRayCluster` can be used for running Ray computations on Kubernetes.\n\nWhen added as resource dependency to an `@op/@asset`, the `KubeRayCluster`:\n - Starts a dedicated `RayCluster` for it\n - Connects to the cluster in client mode with `ray.init()` (unless `skip_init` is set to `True`)\n - Tears down the cluster after the step is executed (unless `skip_cleanup` is set to `True`)\n\n`RayCluster` comes with minimal default configuration, matching `KubeRay` defaults.\n\nExamples:\n\nBasic usage (will create a single-node, non-scaling `RayCluster`):\n\n```python\nfrom dagster import asset, Definitions\nfrom dagster_ray import RayResource\nfrom dagster_ray.kuberay import KubeRayCluster\nimport ray\n\n\n@asset\ndef my_asset(\n ray_cluster: RayResource, # RayResource is only used as a type annotation\n): # this type annotation only defines the interface\n return ray.get(ray.put(42))\n\n\ndefinitions = Definitions(\n resources={\"ray_cluster\": KubeRayCluster()}, assets=[my_asset]\n)\n```\n\nLarger cluster with auto-scaling enabled:\n\n```python\nfrom dagster_ray.kuberay import KubeRayCluster, RayClusterConfig\n\nray_cluster = KubeRayCluster(\n ray_cluster=RayClusterConfig(\n enable_in_tree_autoscaling=True,\n worker_group_specs=[\n {\n \"groupName\": \"workers\",\n \"replicas\": 2,\n \"minReplicas\": 1,\n \"maxReplicas\": 10,\n # ...\n }\n ],\n )\n)\n```\n#### `KubeRayAPI`\n\nThis resource can be used to interact with the Kubernetes API Server.\n\nExamples:\n\nListing currently running `RayClusters`:\n\n```python\nfrom dagster import op, Definitions\nfrom dagster_ray.kuberay import KubeRayAPI\n\n\n@op\ndef list_ray_clusters(\n kube_ray_api: KubeRayAPI,\n):\n return kube_ray_api.kuberay.list_ray_clusters(k8s_namespace=\"kuberay\")\n```\n\n### Jobs\n\n#### `delete_kuberay_clusters`\n\nThis `job` can be used to delete `RayClusters` from a given list of names.\n\n#### `cleanup_old_ray_clusters`\n\nThis `job` can be used to delete old `RayClusters` which no longer correspond to any active Dagster Runs.\nThey may be left behind if the automatic cluster cleanup was disabled or failed.\n\n### Schedules\n\nCleanup schedules can be trivially created using the `cleanup_old_ray_clusters` or `delete_kuberay_clusters` jobs.\n\n#### `cleanup_old_ray_clusters`\n`dagster-ray` provides an example daily cleanup schedule.\n\n## Executor\nWIP\n\n# Development\n\n```shell\npoetry install --all-extras\npoetry shell\npre-commit install\n```\n\n## Testing\n\n### KubeRay\n\nRequired tools:\n\n- `docker`\n- `kubectl`\n- `helm`\n- `minikube`\n\nRunning `pytest` will **automatically**:\n - build an image with the local `dagster-ray` code\n - start a `minikube` Kubernetes cluster\n - load the built `dagster-ray` and loaded `kuberay-operator` images into the cluster\n - install the `KubeRay Operator` in the cluster with `helm`\n - run the tests\n\nThus, no manual setup is required, just the presence of the tools listed above. This makes testing a breeze!\n\n> [!NOTE]\n> Specifying a comma-separated list of `KubeRay Operator` versions in the `KUBE_RAY_OPERATOR_VERSIONS` environment variable will spawn a new test for each version.\n\n> [!NOTE]\n> it may take a while to download `minikube` and `kuberay-operator` images and build the local `dagster-ray` image during the first tests invocation\n",
"bugtrack_url": null,
"license": "Apache-2.0",
"summary": "Dagster integration library for Ray",
"version": "0.0.2",
"project_urls": {
"Homepage": "https://github.com/danielgafni/dagster-ray",
"Repository": "https://github.com/danielgafni/dagster-ray"
},
"split_keywords": [
"dagster",
" ray",
" etl",
" distributed"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "0c7e99e4011d5c4a000b5426af3a52258c160c66670f278c1aa964ccaba8985d",
"md5": "1487d92285f22f2ca6d437f2b364aaec",
"sha256": "4e91cf1082c654175c9aa62d4d57717a718d5daa11205160a0a58e4c1a8e7a5e"
},
"downloads": -1,
"filename": "dagster_ray-0.0.2-py3-none-any.whl",
"has_sig": false,
"md5_digest": "1487d92285f22f2ca6d437f2b364aaec",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<3.13,>=3.8.1",
"size": 21187,
"upload_time": "2024-04-19T15:55:57",
"upload_time_iso_8601": "2024-04-19T15:55:57.136389Z",
"url": "https://files.pythonhosted.org/packages/0c/7e/99e4011d5c4a000b5426af3a52258c160c66670f278c1aa964ccaba8985d/dagster_ray-0.0.2-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "bf7a0f84e9716e0d15f09e6c9ede937c9f26d87ca4bdd5035b0788e0868944f2",
"md5": "315f7a2127d53b44a9c0733aeebfc9e8",
"sha256": "0e425bf0fe0061939ca170117d91cbdcecd46e59352cbcd8c2d58c573a8af861"
},
"downloads": -1,
"filename": "dagster_ray-0.0.2.tar.gz",
"has_sig": false,
"md5_digest": "315f7a2127d53b44a9c0733aeebfc9e8",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<3.13,>=3.8.1",
"size": 19006,
"upload_time": "2024-04-19T15:55:58",
"upload_time_iso_8601": "2024-04-19T15:55:58.838088Z",
"url": "https://files.pythonhosted.org/packages/bf/7a/0f84e9716e0d15f09e6c9ede937c9f26d87ca4bdd5035b0788e0868944f2/dagster_ray-0.0.2.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-04-19 15:55:58",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "danielgafni",
"github_project": "dagster-ray",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "dagster-ray"
}