raydp-nightly


Nameraydp-nightly JSON
Version 2024.4.25.dev0 PyPI version JSON
download
home_pagehttps://github.com/oap-project/raydp
SummaryRayDP: Distributed Data Processing on Ray
upload_time2024-04-25 00:53:09
maintainerNone
docs_urlNone
authorRayDP Developers
requires_python>=3.6
licenseApache 2.0
keywords raydp spark ray distributed data-processing
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # [RayDP]()
RayDP provides simple APIs for running Spark on [Ray](https://github.com/ray-project/ray) and integrating Spark with AI libraries, making it simple to build distributed data and AI pipeline in a single python program.

# INTRODUCTION 

## Problem Statement

A large-scale AI workflow usually involves multiple systems, for example Spark for data processing and PyTorch or Tensorflow for distributed training. A common setup is to use two separate clusters and stitch together multiple programs using glue code or a workflow orchestrator such as AirFlow or KubeFlow. However, in many cases this adds costs in terms of system efficiency and operations. The setup overhead of the workflow tasks adds latency. Data exchange among frameworks has to rely on external storage system which also adds latency. On operation side, managing two separate clusters introduces additional cost. Writing the pipeline using workflow orchestrator usually is also more complex than writing a single python program. 



## Solution with Ray and RayDP
To solve the above challenges, more and more companies have adopted Ray as a single substrate for data processing, model training, serving and more. Ray makes it simple to build the data and AI pipeline in a single python program and scale from laptop to a cluster seamlessly. Ray has built a rich ecosystem by providing high quality libraries and integrating with other popular ones. 

Spark as a popular big data framework plays an important role in data and AI pipelines. RayDP brings Spark to the Ray ecosystem by supporting running Spark on top of Ray. By using RayDP, you can easily write PySpark code together with other Ray libraries in the same python program which improves productivity and expressivity. RayDP makes it simple to build distributed end-to-end data analytics and AI pipeline. RayDP supports exchanging data between Spark and other frameworks using Ray's in-memory object to provide best performance.


## Who will use RayDP
* ML infrastructure team can build a modern ML platform on top of Ray, utilize RayDP to run Spark on Ray and unify with other AI components.
* Data scientists can use RayDP to write PySpark code together with other AI libraries, scale from laptop to cloud seamlessly.
* Data engineers can use RayDP to run on-demand Spark job in cloud without a need to setup a Spark cluster manually. The Ray cluster launcher helps to start a Ray cluster in cloud and RayDP allows you to run Spark in that cluster with auto scaling.

## Presentations
* Data + AI Summit 2021: [Build Large-Scale Data Analytics and AI Pipeline Using RayDP](https://www.youtube.com/watch?v=B4iXQtxX2cs)
* Ray Summit 2021: [RayDP: Build Large-scale End-to-end Data Analytics and AI Pipelines Using Spark and Ray](https://www.youtube.com/watch?v=ELSrR1Geqg4)


# ARCHITECTURE
RayDP provides simple APIs for running Spark on Ray and APIs for converting a Spark DataFrame to a Ray Dataset which can be consumed by XGBoost, Ray Train, Horovod on Ray, etc. RayDP also provides high level scikit-learn style Estimator APIs for distributed training with PyTorch or Tensorflow.

RayDP supports Ray as a Spark resource manager and runs Spark executors in Ray actors. The communication between Spark executors still uses Spark's internal protocol. 

![image](https://user-images.githubusercontent.com/9278199/199454034-5a87fb0b-069a-47dd-97ba-58e08edd4bb9.png)


# QUICK START

## Installation

You can install latest RayDP using pip. RayDP requires Ray and PySpark. Please also make sure java is installed and JAVA_HOME is set properly.

```shell
pip install raydp
```

Or you can install RayDP nightly build:

```shell
pip install --pre raydp
```

NOTICE: formerly used `raydp-nightly` will no longer be updated.

If you'd like to build and install the latest master, use the following command:

```shell
./build.sh
pip install dist/raydp*.whl
```

## Spark on Ray

RayDP provides an API for starting a Spark job on Ray. To create a Spark session, call the `raydp.init_spark` API. After that, you can use any Spark API as you want. For example:

```python
import ray
import raydp

# connect to ray cluster
ray.init(address='auto')

# create a Spark cluster with specified resource requirements
spark = raydp.init_spark(app_name='RayDP Example',
                         num_executors=2,
                         executor_cores=2,
                         executor_memory='4GB')

# normal data processesing with Spark
df = spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word'])
df.show()
word_count = df.groupBy('word').count()
word_count.show()

# stop the spark cluster
raydp.stop_spark()
```

Spark features such as dynamic resource allocation, spark-submit script, etc are also supported. Please refer to [Spark on Ray](./doc/spark_on_ray.md) for more details.


## Spark + AI Pipeline on Ray

RayDP provides APIs for converting a Spark DataFrame to a Ray Dataset which can be consumed by XGBoost, Ray Train, Horovod on Ray, etc. RayDP also provides high level scikit-learn style Estimator APIs for distributed training with PyTorch or Tensorflow. To get started with end-to-end Spark + AI pipeline, the easiest way is to run the following tutorials on Google Collab. More examples are also available in the `examples` folder.
* [Spark + Ray Train Tutorial on Google Collab](https://colab.research.google.com/github/oap-project/raydp/blob/master/tutorials/raytrain_example.ipynb)
* [Spark + TorchEstimator Tutorial on Google Collab](https://colab.research.google.com/github/oap-project/raydp/blob/master/tutorials/pytorch_example.ipynb)


***Spark DataFrame & Ray Dataset conversion***

You can use `ray.data.from_spark` and `ds.to_spark` to convert between Spark DataFrame and Ray Dataset.
```python
import ray
import raydp

ray.init()
spark = raydp.init_spark(app_name="RayDP Example",
                         num_executors=2,
                         executor_cores=2,
                         executor_memory="4GB")

# Spark Dataframe to Ray Dataset
df1 = spark.range(0, 1000)
ds1 = ray.data.from_spark(df1)

# Ray Dataset to Spark Dataframe
ds2 = ray.data.from_items([{"id": i} for i in range(1000)])
df2 = ds2.to_spark(spark)
```

Ray dataset converted from Spark dataframe this way will be no longer accessible after `raydp.stop_spark()`. If you want to access the data after spark is shutdown, please use `raydp.stop_spark(cleanup_data=False)`. 

Please refer to [Spark+XGBoost on Ray](./examples/xgboost_ray_nyctaxi.py) for a full example.

***Estimator API***

The Estimator APIs allow you to train a deep neural network directly on a Spark DataFrame, leveraging Ray’s ability to scale out across the cluster. The Estimator APIs are wrappers of Ray Train and hide the complexity of converting a Spark DataFrame to a PyTorch/Tensorflow dataset and distributing the training. RayDP provides `raydp.torch.TorchEstimator` for PyTorch and `raydp.tf.TFEstimator` for Tensorflow. The following is an example of using TorchEstimator.

```python
import ray
import raydp
from raydp.torch import TorchEstimator

ray.init(address="auto")
spark = raydp.init_spark(app_name="RayDP Example",
                         num_executors=2,
                         executor_cores=2,
                         executor_memory="4GB")

# Spark DataFrame Code 
df = spark.read.parquet(…) 
train_df = df.withColumn(…)

# PyTorch Code 
model = torch.nn.Sequential(torch.nn.Linear(2, 1)) 
optimizer = torch.optim.Adam(model.parameters())

estimator = TorchEstimator(model=model, optimizer=optimizer, ...) 
estimator.fit_on_spark(train_df)

raydp.stop_spark()
```
Please refer to [NYC Taxi PyTorch Estimator](./examples/pytorch_nyctaxi.py) and [NYC Taxi Tensorflow Estimator](./examples/tensorflow_nyctaxi.py) for full examples.

***Fault Tolerance***

The ray dataset converted from spark dataframe like above is not fault-tolerant. This is because we implement it using `Ray.put` combined with spark `mapPartitions`. Objects created by `Ray.put` is not recoverable in Ray.

RayDP now supports converting data in a way such that the resulting ray dataset is fault-tolerant. This feature is currently *experimental*. Here is how to use it:
```python
import ray
import raydp

ray.init(address="auto")
# set fault_tolerance_mode to True to enable the feature
# this will connect pyspark driver to ray cluster
spark = raydp.init_spark(app_name="RayDP Example",
                         num_executors=2,
                         executor_cores=2,
                         executor_memory="4GB",
                         fault_tolerance_mode=True)
# df should be large enough so that result will be put into plasma
df = spark.range(100000)
# use this API instead of ray.data.from_spark
ds = raydp.spark.from_spark_recoverable(df)
# ds is now fault-tolerant.
```
Notice that `from_spark_recoverable` will persist the converted dataframe. You can provide the storage level through keyword parameter `storage_level`. In addition, this feature is not available in ray client mode. If you need to use ray client, please wrap your application in a ray actor, as described in the ray client chapter.


## Getting Involved
To report bugs or request new features, please open a github issue.



            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/oap-project/raydp",
    "name": "raydp-nightly",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.6",
    "maintainer_email": null,
    "keywords": "raydp spark ray distributed data-processing",
    "author": "RayDP Developers",
    "author_email": "raydp-dev@googlegroups.com",
    "download_url": null,
    "platform": null,
    "description": "# [RayDP]()\nRayDP provides simple APIs for running Spark\u00a0on\u00a0[Ray](https://github.com/ray-project/ray) and integrating Spark with AI libraries, making it simple to build distributed data and AI pipeline in a single python program.\n\n# INTRODUCTION \n\n## Problem Statement\n\nA large-scale AI workflow usually involves multiple systems, for example Spark for data processing and PyTorch or Tensorflow for distributed training. A common setup is to use two separate clusters and stitch together multiple programs using glue code or a workflow orchestrator such as AirFlow or KubeFlow. However, in many cases this adds costs in terms of system efficiency and operations. The setup overhead of the workflow tasks adds latency. Data exchange among frameworks has to rely on external storage system which also adds latency. On operation side, managing two separate clusters introduces additional cost. Writing the pipeline using workflow orchestrator usually is also more complex than writing a single python program. \n\n\n\n## Solution with Ray and RayDP\nTo solve the above challenges, more and more companies have adopted Ray as a single substrate for data processing, model training, serving and more. Ray makes it simple to build the data and AI pipeline in a single python program and scale from laptop to a cluster seamlessly. Ray has built a rich ecosystem by providing high quality libraries and integrating with other popular ones. \n\nSpark as a popular big data framework plays an important role in data and AI pipelines. RayDP brings Spark to the Ray ecosystem by supporting running Spark on top of Ray. By using RayDP, you can easily write PySpark code together with other Ray libraries in the same python program which improves productivity and expressivity. RayDP makes it simple to build distributed end-to-end data analytics and AI pipeline. RayDP supports exchanging data between Spark and other frameworks using Ray's in-memory object to provide best performance.\n\n\n## Who will use RayDP\n* ML infrastructure team can build a modern ML platform on top of Ray, utilize RayDP to run Spark on Ray and unify with other AI components.\n* Data scientists can use RayDP to write PySpark code together with other AI libraries, scale from laptop to cloud seamlessly.\n* Data engineers can use RayDP to run on-demand Spark job in cloud without a need to setup a Spark cluster manually. The Ray cluster launcher helps to start a Ray cluster in cloud and RayDP allows you to run Spark in that cluster with auto scaling.\n\n## Presentations\n* Data + AI Summit 2021: [Build Large-Scale Data Analytics and AI Pipeline Using RayDP](https://www.youtube.com/watch?v=B4iXQtxX2cs)\n* Ray Summit 2021: [RayDP: Build Large-scale End-to-end Data Analytics and AI Pipelines Using Spark and Ray](https://www.youtube.com/watch?v=ELSrR1Geqg4)\n\n\n# ARCHITECTURE\nRayDP provides simple APIs for running Spark on Ray and APIs for converting a Spark DataFrame to a Ray Dataset which can be consumed by XGBoost, Ray Train, Horovod on Ray, etc. RayDP also provides high level scikit-learn style Estimator APIs for distributed training with PyTorch or Tensorflow.\n\nRayDP supports Ray as a Spark resource manager and runs Spark executors in Ray actors. The communication between Spark executors still uses Spark's internal protocol. \n\n![image](https://user-images.githubusercontent.com/9278199/199454034-5a87fb0b-069a-47dd-97ba-58e08edd4bb9.png)\n\n\n# QUICK START\n\n## Installation\n\nYou can install latest RayDP using pip. RayDP requires Ray and PySpark. Please also make sure java is installed and JAVA_HOME is set properly.\n\n```shell\npip install raydp\n```\n\nOr you can install RayDP nightly build:\n\n```shell\npip install --pre raydp\n```\n\nNOTICE: formerly used `raydp-nightly` will no longer be updated.\n\nIf you'd like to build and install the latest master, use the following command:\n\n```shell\n./build.sh\npip install dist/raydp*.whl\n```\n\n## Spark on Ray\n\nRayDP provides an API for starting a Spark job on Ray. To create a Spark session, call the `raydp.init_spark` API. After that, you can use any Spark API as you want. For example:\n\n```python\nimport ray\nimport raydp\n\n# connect to ray cluster\nray.init(address='auto')\n\n# create a Spark cluster with specified resource requirements\nspark = raydp.init_spark(app_name='RayDP Example',\n                         num_executors=2,\n                         executor_cores=2,\n                         executor_memory='4GB')\n\n# normal data processesing with Spark\ndf = spark.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word'])\ndf.show()\nword_count = df.groupBy('word').count()\nword_count.show()\n\n# stop the spark cluster\nraydp.stop_spark()\n```\n\nSpark features such as dynamic resource allocation, spark-submit script, etc are also supported. Please refer to [Spark on Ray](./doc/spark_on_ray.md) for more details.\n\n\n## Spark + AI Pipeline on Ray\n\nRayDP provides APIs for converting a Spark DataFrame to a Ray Dataset which can be consumed by XGBoost, Ray Train, Horovod on Ray, etc. RayDP also provides high level scikit-learn style Estimator APIs for distributed training with PyTorch or Tensorflow. To get started with end-to-end Spark + AI pipeline, the easiest way is to run the following tutorials on Google Collab. More examples are also available in the `examples` folder.\n* [Spark + Ray Train Tutorial on Google Collab](https://colab.research.google.com/github/oap-project/raydp/blob/master/tutorials/raytrain_example.ipynb)\n* [Spark + TorchEstimator Tutorial on Google Collab](https://colab.research.google.com/github/oap-project/raydp/blob/master/tutorials/pytorch_example.ipynb)\n\n\n***Spark DataFrame & Ray Dataset conversion***\n\nYou can use `ray.data.from_spark` and `ds.to_spark` to convert between Spark DataFrame and Ray Dataset.\n```python\nimport ray\nimport raydp\n\nray.init()\nspark = raydp.init_spark(app_name=\"RayDP Example\",\n                         num_executors=2,\n                         executor_cores=2,\n                         executor_memory=\"4GB\")\n\n# Spark Dataframe to Ray Dataset\ndf1 = spark.range(0, 1000)\nds1 = ray.data.from_spark(df1)\n\n# Ray Dataset to Spark Dataframe\nds2 = ray.data.from_items([{\"id\": i} for i in range(1000)])\ndf2 = ds2.to_spark(spark)\n```\n\nRay dataset converted from Spark dataframe this way will be no longer accessible after `raydp.stop_spark()`. If you want to access the data after spark is shutdown, please use `raydp.stop_spark(cleanup_data=False)`. \n\nPlease refer to [Spark+XGBoost on Ray](./examples/xgboost_ray_nyctaxi.py) for a full example.\n\n***Estimator API***\n\nThe Estimator APIs allow you to train a deep neural network directly on a Spark DataFrame, leveraging Ray\u2019s ability to scale out across the cluster. The Estimator APIs are wrappers of Ray Train and hide the complexity of converting a Spark DataFrame to a PyTorch/Tensorflow dataset and distributing the training. RayDP provides `raydp.torch.TorchEstimator` for PyTorch and `raydp.tf.TFEstimator` for Tensorflow. The following is an example of using TorchEstimator.\n\n```python\nimport ray\nimport raydp\nfrom raydp.torch import TorchEstimator\n\nray.init(address=\"auto\")\nspark = raydp.init_spark(app_name=\"RayDP Example\",\n                         num_executors=2,\n                         executor_cores=2,\n                         executor_memory=\"4GB\")\n\n# Spark DataFrame Code \ndf = spark.read.parquet(\u2026) \ntrain_df = df.withColumn(\u2026)\n\n# PyTorch Code \nmodel = torch.nn.Sequential(torch.nn.Linear(2, 1)) \noptimizer = torch.optim.Adam(model.parameters())\n\nestimator = TorchEstimator(model=model, optimizer=optimizer, ...) \nestimator.fit_on_spark(train_df)\n\nraydp.stop_spark()\n```\nPlease refer to [NYC Taxi PyTorch Estimator](./examples/pytorch_nyctaxi.py) and [NYC Taxi Tensorflow Estimator](./examples/tensorflow_nyctaxi.py) for full examples.\n\n***Fault Tolerance***\n\nThe ray dataset converted from spark dataframe like above is not fault-tolerant. This is because we implement it using `Ray.put` combined with spark `mapPartitions`. Objects created by `Ray.put` is not recoverable in Ray.\n\nRayDP now supports converting data in a way such that the resulting ray dataset is fault-tolerant. This feature is currently *experimental*. Here is how to use it:\n```python\nimport ray\nimport raydp\n\nray.init(address=\"auto\")\n# set fault_tolerance_mode to True to enable the feature\n# this will connect pyspark driver to ray cluster\nspark = raydp.init_spark(app_name=\"RayDP Example\",\n                         num_executors=2,\n                         executor_cores=2,\n                         executor_memory=\"4GB\",\n                         fault_tolerance_mode=True)\n# df should be large enough so that result will be put into plasma\ndf = spark.range(100000)\n# use this API instead of ray.data.from_spark\nds = raydp.spark.from_spark_recoverable(df)\n# ds is now fault-tolerant.\n```\nNotice that `from_spark_recoverable` will persist the converted dataframe. You can provide the storage level through keyword parameter `storage_level`. In addition, this feature is not available in ray client mode. If you need to use ray client, please wrap your application in a ray actor, as described in the ray client chapter.\n\n\n## Getting Involved\nTo report bugs or request new features, please open a github issue.\n\n\n",
    "bugtrack_url": null,
    "license": "Apache 2.0",
    "summary": "RayDP: Distributed Data Processing on Ray",
    "version": "2024.4.25.dev0",
    "project_urls": {
        "Homepage": "https://github.com/oap-project/raydp"
    },
    "split_keywords": [
        "raydp",
        "spark",
        "ray",
        "distributed",
        "data-processing"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "105419960ae10418901a68d3deada45b89ed738bed83bd7f8a7e15ee06d8da22",
                "md5": "9f3084bab88b6641239ca3f45d5844eb",
                "sha256": "5ace9e102e389c981dec6fff71b87a32803617005b29decbe7b2931649665122"
            },
            "downloads": -1,
            "filename": "raydp_nightly-2024.4.25.dev0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "9f3084bab88b6641239ca3f45d5844eb",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.6",
            "size": 407245,
            "upload_time": "2024-04-25T00:53:09",
            "upload_time_iso_8601": "2024-04-25T00:53:09.256657Z",
            "url": "https://files.pythonhosted.org/packages/10/54/19960ae10418901a68d3deada45b89ed738bed83bd7f8a7e15ee06d8da22/raydp_nightly-2024.4.25.dev0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-04-25 00:53:09",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "oap-project",
    "github_project": "raydp",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "raydp-nightly"
}
        
Elapsed time: 0.24211s