# RepartiPy
RepartiPy helps you to elaborately handle PySpark DataFrame partition size.
## Possible Use Cases
- Repartition your DataFrame precisely, **without knowing the whole DataFrame size** (i.e. `Dynamic Repartition`)
- Estimate your DataFrame size **with more accuracy**
## Why RepartiPy
Although Spark [SizeEstimator](https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/util/SizeEstimator.html) can be used to estimate a DataFrame size, it is not accurate sometimes.
RepartiPy uses **Spark's execution plan statistics** in order to provide a roundabout way.
It suggests two approaches to achieve this:
- `reaprtipy.SizeEstimator`
- `reaprtipy.SamplingSizeEstimator`
### reaprtipy.SizeEstimator
Recommended when your executor resource (memory) is affordable to cache the whole DataFrame.
`SizeEstimator` just simply caches the whole Dataframe into the memory and extract the execution plan statistics.
### repartipy.SamplingSizeEstimator
Recommended when your executor resource (memory) is ***NOT*** affordable to cache the whole dataframe.
`SamplingSizeEstimator` uses 'disk write and re-read (HDFS)' approach behind the scene for two reasons:
1. Prevent double read from the source like S3, which might be inefficient -> better performance
2. Reduce partition skewness by reading data again on purpose (leverage [MaxPartitionBytes](https://spark.apache.org/docs/latest/sql-performance-tuning.html#other-configuration-options)) -> better sampling result
Therefore, **you must have HDFS settings on your cluster and enough disk space.**
This may not be accurate compared to `SizeEstimator` due to sampling.
If you want more accurate results, tune the `sample_count` option properly.
Additionally, this approach will be slower than `SizeEstimator` as `SamplingSizeEstimator` requires disk I/O and additional logics.
# How To Use
## Setup
```shell
pip install repartipy
```
### Prerequisite
```python
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
input_data = [
(1, "Seoul"),
(2, "Busan"),
]
df = spark.createDataFrame(data=input_data, schema=["id", "location"])
```
### get_desired_partition_count()
***Calculate ideal number of partitions for a DataFrame***
SizeEstimator will suggest `desired_partition_count`, so that each partition can have `desired_partition_size_in_bytes` (default: 1GiB) after repartition.
`reproduce()` produces exactly the same `df`, but internally reproduced by SizeEstimator for better performance.
`SizeEstimator` reproduces `df` from **Memory** (Cache).
`SamplingSizeEstimator` reproduces `df` from **Disk** (HDFS).
#### with SizeEstimator
```python
import repartipy
one_gib_in_bytes = 1073741824
with repartipy.SizeEstimator(spark=spark, df=df) as se:
desired_partition_count = se.get_desired_partition_count(desired_partition_size_in_bytes=one_gib_in_bytes)
se.reproduce().repartition(desired_partition_count).write.save("your/write/path")
# or
se.reproduce().coalesce(desired_partition_count).write.save("your/write/path")
```
#### with SamplingSizeEstimator
```python
import repartipy
one_gib_in_bytes = 1073741824
with repartipy.SamplingSizeEstimator(spark=spark, df=df, sample_count=10) as se:
desired_partition_count = se.get_desired_partition_count(desired_partition_size_in_bytes=one_gib_in_bytes)
se.reproduce().repartition(desired_partition_count).write.save("your/write/path")
# or
se.reproduce().coalesce(desired_partition_count).write.save("your/write/path")
```
### estimate()
***Estimate size of a DataFrame***
#### with SizeEstimator
```python
import repartipy
with repartipy.SizeEstimator(spark=spark, df=df) as se:
df_size_in_bytes = se.estimate()
```
#### with SamplingSizeEstimator
```python
import repartipy
with repartipy.SamplingSizeEstimator(spark=spark, df=df, sample_count=10) as se:
df_size_in_bytes = se.estimate()
```
# Benchmark
Overall, there appears to be a slight performance loss when employing RepartiPy.
This benchmark compares the **running time of spark jobs** in the following two cases to give a rough estimate:
- **Static** Repartition (repartition without RepartiPy)
```python
# e.g.
df.repartition(123).write.save("your/write/path")
```
- **Dynamic** Repartition (repartition with RepartiPy)
```python
# e.g.
with repartipy.SizeEstimator(spark=spark, df=df) as se:
desired_partition_count = se.get_desired_partition_count(desired_partition_size_in_bytes=one_gib_in_bytes)
se.reproduce().repartition(desired_partition_count).write.save("your/write/path")
```
All the other conditions remain the same **except the usage of RepartiPy**.
> **Note**
>
> Benchmark results provided are for brief reference only, not absolute.
> Actual performance metrics can vary depending on your own circumstances (e.g. your data, your spark code, your cluster resources, ...).
## SizeEstimator
- DataFrame Size ~= 256 MiB (decompressed size)
| | Static | Dynamic |
|:------------:|:-------:|:-------:|
| Running Time | 8.5 min | 8.6 min |
## SamplingSizeEstimator
- DataFrame Size ~= 241 GiB (decompressed size)
| | Static | Dynamic |
|:------------:|:------:|:-------:|
| Running Time | 14 min | 16 min |
Raw data
{
"_id": null,
"home_page": "https://github.com/sakjung/repartipy",
"name": "repartipy",
"maintainer": "",
"docs_url": null,
"requires_python": ">=3.7",
"maintainer_email": "",
"keywords": "apachespark,spark,pyspark",
"author": "sakjung",
"author_email": "ssangyu123@gmail.com",
"download_url": "https://files.pythonhosted.org/packages/13/d5/a6ad73055174b2bc4c037047d38d39e69178d3cf24374755255dec1f82f4/repartipy-0.1.8.tar.gz",
"platform": null,
"description": "# RepartiPy\n\nRepartiPy helps you to elaborately handle PySpark DataFrame partition size. \n\n## Possible Use Cases\n- Repartition your DataFrame precisely, **without knowing the whole DataFrame size** (i.e. `Dynamic Repartition`)\n- Estimate your DataFrame size **with more accuracy**\n\n## Why RepartiPy\n \nAlthough Spark [SizeEstimator](https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/util/SizeEstimator.html) can be used to estimate a DataFrame size, it is not accurate sometimes. \nRepartiPy uses **Spark's execution plan statistics** in order to provide a roundabout way.\nIt suggests two approaches to achieve this:\n\n- `reaprtipy.SizeEstimator`\n- `reaprtipy.SamplingSizeEstimator`\n\n### reaprtipy.SizeEstimator\nRecommended when your executor resource (memory) is affordable to cache the whole DataFrame. \n`SizeEstimator` just simply caches the whole Dataframe into the memory and extract the execution plan statistics.\n\n### repartipy.SamplingSizeEstimator\nRecommended when your executor resource (memory) is ***NOT*** affordable to cache the whole dataframe.\n`SamplingSizeEstimator` uses 'disk write and re-read (HDFS)' approach behind the scene for two reasons:\n\n1. Prevent double read from the source like S3, which might be inefficient -> better performance\n2. Reduce partition skewness by reading data again on purpose (leverage [MaxPartitionBytes](https://spark.apache.org/docs/latest/sql-performance-tuning.html#other-configuration-options)) -> better sampling result\n\nTherefore, **you must have HDFS settings on your cluster and enough disk space.** \n\nThis may not be accurate compared to `SizeEstimator` due to sampling. \nIf you want more accurate results, tune the `sample_count` option properly.\nAdditionally, this approach will be slower than `SizeEstimator` as `SamplingSizeEstimator` requires disk I/O and additional logics.\n\n# How To Use\n## Setup\n```shell\npip install repartipy\n```\n### Prerequisite\n```python\nfrom pyspark.sql import SparkSession\n\nspark = SparkSession.builder.getOrCreate()\ninput_data = [\n (1, \"Seoul\"),\n (2, \"Busan\"),\n ]\ndf = spark.createDataFrame(data=input_data, schema=[\"id\", \"location\"])\n```\n\n### get_desired_partition_count()\n***Calculate ideal number of partitions for a DataFrame*** \n\nSizeEstimator will suggest `desired_partition_count`, so that each partition can have `desired_partition_size_in_bytes` (default: 1GiB) after repartition.\n`reproduce()` produces exactly the same `df`, but internally reproduced by SizeEstimator for better performance. \n`SizeEstimator` reproduces `df` from **Memory** (Cache). \n`SamplingSizeEstimator` reproduces `df` from **Disk** (HDFS).\n\n#### with SizeEstimator\n```python\nimport repartipy\n\none_gib_in_bytes = 1073741824\n\nwith repartipy.SizeEstimator(spark=spark, df=df) as se:\n desired_partition_count = se.get_desired_partition_count(desired_partition_size_in_bytes=one_gib_in_bytes)\n \n se.reproduce().repartition(desired_partition_count).write.save(\"your/write/path\")\n # or \n se.reproduce().coalesce(desired_partition_count).write.save(\"your/write/path\")\n```\n#### with SamplingSizeEstimator\n```python\nimport repartipy\n \none_gib_in_bytes = 1073741824\n\nwith repartipy.SamplingSizeEstimator(spark=spark, df=df, sample_count=10) as se:\n desired_partition_count = se.get_desired_partition_count(desired_partition_size_in_bytes=one_gib_in_bytes)\n \n se.reproduce().repartition(desired_partition_count).write.save(\"your/write/path\")\n # or \n se.reproduce().coalesce(desired_partition_count).write.save(\"your/write/path\")\n```\n\n### estimate()\n***Estimate size of a DataFrame***\n#### with SizeEstimator\n```python\nimport repartipy\n\nwith repartipy.SizeEstimator(spark=spark, df=df) as se:\n df_size_in_bytes = se.estimate()\n```\n#### with SamplingSizeEstimator\n```python\nimport repartipy\n\nwith repartipy.SamplingSizeEstimator(spark=spark, df=df, sample_count=10) as se:\n df_size_in_bytes = se.estimate()\n```\n\n# Benchmark\n\nOverall, there appears to be a slight performance loss when employing RepartiPy. \nThis benchmark compares the **running time of spark jobs** in the following two cases to give a rough estimate:\n- **Static** Repartition (repartition without RepartiPy)\n```python\n# e.g.\ndf.repartition(123).write.save(\"your/write/path\")\n```\n- **Dynamic** Repartition (repartition with RepartiPy)\n```python\n# e.g.\nwith repartipy.SizeEstimator(spark=spark, df=df) as se:\n desired_partition_count = se.get_desired_partition_count(desired_partition_size_in_bytes=one_gib_in_bytes)\n se.reproduce().repartition(desired_partition_count).write.save(\"your/write/path\")\n```\nAll the other conditions remain the same **except the usage of RepartiPy**.\n\n> **Note**\n> \n> Benchmark results provided are for brief reference only, not absolute.\n> Actual performance metrics can vary depending on your own circumstances (e.g. your data, your spark code, your cluster resources, ...).\n\n## SizeEstimator\n- DataFrame Size ~= 256 MiB (decompressed size)\n\n| | Static | Dynamic |\n|:------------:|:-------:|:-------:|\n| Running Time | 8.5 min | 8.6 min |\n \n\n## SamplingSizeEstimator\n- DataFrame Size ~= 241 GiB (decompressed size)\n\n| | Static | Dynamic |\n|:------------:|:------:|:-------:|\n| Running Time | 14 min | 16 min |\n",
"bugtrack_url": null,
"license": "Apache-2.0",
"summary": "Helper for handling PySpark DataFrame partition size \ud83d\udcd1\ud83c\udf9b\ufe0f",
"version": "0.1.8",
"project_urls": {
"Homepage": "https://github.com/sakjung/repartipy"
},
"split_keywords": [
"apachespark",
"spark",
"pyspark"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "840b50f43386e382df837c5bcb2fe31cbb3aa0d92d558c55d15a176ba82d2682",
"md5": "ce07bdf56d638cfb1a0396aaeeb9a806",
"sha256": "286ea8e612b88deadf687c8764d9b34e494e8781bf19612555bf6809a5696256"
},
"downloads": -1,
"filename": "repartipy-0.1.8-py3-none-any.whl",
"has_sig": false,
"md5_digest": "ce07bdf56d638cfb1a0396aaeeb9a806",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.7",
"size": 10877,
"upload_time": "2024-03-08T04:47:35",
"upload_time_iso_8601": "2024-03-08T04:47:35.683905Z",
"url": "https://files.pythonhosted.org/packages/84/0b/50f43386e382df837c5bcb2fe31cbb3aa0d92d558c55d15a176ba82d2682/repartipy-0.1.8-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "13d5a6ad73055174b2bc4c037047d38d39e69178d3cf24374755255dec1f82f4",
"md5": "6768cb1e712ca19334583af6d2ed7e6a",
"sha256": "c81752d36e942cf5659ffeea8fa891e72fb770084f0bef5bb6b5232bd4d41c44"
},
"downloads": -1,
"filename": "repartipy-0.1.8.tar.gz",
"has_sig": false,
"md5_digest": "6768cb1e712ca19334583af6d2ed7e6a",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.7",
"size": 10131,
"upload_time": "2024-03-08T04:47:37",
"upload_time_iso_8601": "2024-03-08T04:47:37.500360Z",
"url": "https://files.pythonhosted.org/packages/13/d5/a6ad73055174b2bc4c037047d38d39e69178d3cf24374755255dec1f82f4/repartipy-0.1.8.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-03-08 04:47:37",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "sakjung",
"github_project": "repartipy",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"lcname": "repartipy"
}