repartipy


Namerepartipy JSON
Version 0.1.8 PyPI version JSON
download
home_pagehttps://github.com/sakjung/repartipy
SummaryHelper for handling PySpark DataFrame partition size 📑🎛️
upload_time2024-03-08 04:47:37
maintainer
docs_urlNone
authorsakjung
requires_python>=3.7
licenseApache-2.0
keywords apachespark spark pyspark
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # RepartiPy

RepartiPy helps you to elaborately handle PySpark DataFrame partition size.  

## Possible Use Cases
- Repartition your DataFrame precisely, **without knowing the whole DataFrame size** (i.e. `Dynamic Repartition`)
- Estimate your DataFrame size **with more accuracy**

## Why RepartiPy
 
Although Spark [SizeEstimator](https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/util/SizeEstimator.html) can be used to estimate a DataFrame size, it is not accurate sometimes. 
RepartiPy uses **Spark's execution plan statistics** in order to provide a roundabout way.
It suggests two approaches to achieve this:

- `reaprtipy.SizeEstimator`
- `reaprtipy.SamplingSizeEstimator`

### reaprtipy.SizeEstimator
Recommended when your executor resource (memory) is affordable to cache the whole DataFrame. 
`SizeEstimator` just simply caches the whole Dataframe into the memory and extract the execution plan statistics.

### repartipy.SamplingSizeEstimator
Recommended when your executor resource (memory) is ***NOT*** affordable to cache the whole dataframe.
`SamplingSizeEstimator` uses 'disk write and re-read (HDFS)' approach behind the scene for two reasons:

1. Prevent double read from the source like S3, which might be inefficient -> better performance
2. Reduce partition skewness by reading data again on purpose (leverage [MaxPartitionBytes](https://spark.apache.org/docs/latest/sql-performance-tuning.html#other-configuration-options)) -> better sampling result

Therefore, **you must have HDFS settings on your cluster and enough disk space.** 

This may not be accurate compared to `SizeEstimator` due to sampling. 
If you want more accurate results, tune the `sample_count` option properly.
Additionally, this approach will be slower than `SizeEstimator` as `SamplingSizeEstimator` requires disk I/O and additional logics.

# How To Use
## Setup
```shell
pip install repartipy
```
### Prerequisite
```python
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
input_data = [
        (1, "Seoul"),
        (2, "Busan"),
    ]
df = spark.createDataFrame(data=input_data, schema=["id", "location"])
```

### get_desired_partition_count()
***Calculate ideal number of partitions for a DataFrame*** 

SizeEstimator will suggest `desired_partition_count`, so that each partition can have `desired_partition_size_in_bytes` (default: 1GiB) after repartition.
`reproduce()` produces exactly the same `df`, but internally reproduced by SizeEstimator for better performance. 
`SizeEstimator` reproduces `df` from **Memory** (Cache). 
`SamplingSizeEstimator` reproduces `df` from **Disk** (HDFS).

#### with SizeEstimator
```python
import repartipy

one_gib_in_bytes = 1073741824

with repartipy.SizeEstimator(spark=spark, df=df) as se:
    desired_partition_count = se.get_desired_partition_count(desired_partition_size_in_bytes=one_gib_in_bytes)
    
    se.reproduce().repartition(desired_partition_count).write.save("your/write/path")
    # or 
    se.reproduce().coalesce(desired_partition_count).write.save("your/write/path")
```
#### with SamplingSizeEstimator
```python
import repartipy
    
one_gib_in_bytes = 1073741824

with repartipy.SamplingSizeEstimator(spark=spark, df=df, sample_count=10) as se:
    desired_partition_count = se.get_desired_partition_count(desired_partition_size_in_bytes=one_gib_in_bytes)
    
    se.reproduce().repartition(desired_partition_count).write.save("your/write/path")
    # or 
    se.reproduce().coalesce(desired_partition_count).write.save("your/write/path")
```

### estimate()
***Estimate size of a DataFrame***
#### with SizeEstimator
```python
import repartipy

with repartipy.SizeEstimator(spark=spark, df=df) as se:
    df_size_in_bytes = se.estimate()
```
#### with SamplingSizeEstimator
```python
import repartipy

with repartipy.SamplingSizeEstimator(spark=spark, df=df, sample_count=10) as se:
    df_size_in_bytes = se.estimate()
```

# Benchmark

Overall, there appears to be a slight performance loss when employing RepartiPy. 
This benchmark compares the **running time of spark jobs** in the following two cases to give a rough estimate:
- **Static** Repartition (repartition without RepartiPy)
```python
# e.g.
df.repartition(123).write.save("your/write/path")
```
- **Dynamic** Repartition (repartition with RepartiPy)
```python
# e.g.
with repartipy.SizeEstimator(spark=spark, df=df) as se:
    desired_partition_count = se.get_desired_partition_count(desired_partition_size_in_bytes=one_gib_in_bytes)
    se.reproduce().repartition(desired_partition_count).write.save("your/write/path")
```
All the other conditions remain the same **except the usage of RepartiPy**.

> **Note**
> 
> Benchmark results provided are for brief reference only, not absolute.
> Actual performance metrics can vary depending on your own circumstances (e.g. your data, your spark code, your cluster resources, ...).

## SizeEstimator
- DataFrame Size ~= 256 MiB (decompressed size)

|              | Static  | Dynamic |
|:------------:|:-------:|:-------:|
| Running Time | 8.5 min | 8.6 min |
 

## SamplingSizeEstimator
- DataFrame Size ~= 241 GiB (decompressed size)

|              | Static | Dynamic |
|:------------:|:------:|:-------:|
| Running Time | 14 min | 16 min  |

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/sakjung/repartipy",
    "name": "repartipy",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.7",
    "maintainer_email": "",
    "keywords": "apachespark,spark,pyspark",
    "author": "sakjung",
    "author_email": "ssangyu123@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/13/d5/a6ad73055174b2bc4c037047d38d39e69178d3cf24374755255dec1f82f4/repartipy-0.1.8.tar.gz",
    "platform": null,
    "description": "# RepartiPy\n\nRepartiPy helps you to elaborately handle PySpark DataFrame partition size.  \n\n## Possible Use Cases\n- Repartition your DataFrame precisely, **without knowing the whole DataFrame size** (i.e. `Dynamic Repartition`)\n- Estimate your DataFrame size **with more accuracy**\n\n## Why RepartiPy\n \nAlthough Spark [SizeEstimator](https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/util/SizeEstimator.html) can be used to estimate a DataFrame size, it is not accurate sometimes. \nRepartiPy uses **Spark's execution plan statistics** in order to provide a roundabout way.\nIt suggests two approaches to achieve this:\n\n- `reaprtipy.SizeEstimator`\n- `reaprtipy.SamplingSizeEstimator`\n\n### reaprtipy.SizeEstimator\nRecommended when your executor resource (memory) is affordable to cache the whole DataFrame. \n`SizeEstimator` just simply caches the whole Dataframe into the memory and extract the execution plan statistics.\n\n### repartipy.SamplingSizeEstimator\nRecommended when your executor resource (memory) is ***NOT*** affordable to cache the whole dataframe.\n`SamplingSizeEstimator` uses 'disk write and re-read (HDFS)' approach behind the scene for two reasons:\n\n1. Prevent double read from the source like S3, which might be inefficient -> better performance\n2. Reduce partition skewness by reading data again on purpose (leverage [MaxPartitionBytes](https://spark.apache.org/docs/latest/sql-performance-tuning.html#other-configuration-options)) -> better sampling result\n\nTherefore, **you must have HDFS settings on your cluster and enough disk space.** \n\nThis may not be accurate compared to `SizeEstimator` due to sampling. \nIf you want more accurate results, tune the `sample_count` option properly.\nAdditionally, this approach will be slower than `SizeEstimator` as `SamplingSizeEstimator` requires disk I/O and additional logics.\n\n# How To Use\n## Setup\n```shell\npip install repartipy\n```\n### Prerequisite\n```python\nfrom pyspark.sql import SparkSession\n\nspark = SparkSession.builder.getOrCreate()\ninput_data = [\n        (1, \"Seoul\"),\n        (2, \"Busan\"),\n    ]\ndf = spark.createDataFrame(data=input_data, schema=[\"id\", \"location\"])\n```\n\n### get_desired_partition_count()\n***Calculate ideal number of partitions for a DataFrame*** \n\nSizeEstimator will suggest `desired_partition_count`, so that each partition can have `desired_partition_size_in_bytes` (default: 1GiB) after repartition.\n`reproduce()` produces exactly the same `df`, but internally reproduced by SizeEstimator for better performance. \n`SizeEstimator` reproduces `df` from **Memory** (Cache). \n`SamplingSizeEstimator` reproduces `df` from **Disk** (HDFS).\n\n#### with SizeEstimator\n```python\nimport repartipy\n\none_gib_in_bytes = 1073741824\n\nwith repartipy.SizeEstimator(spark=spark, df=df) as se:\n    desired_partition_count = se.get_desired_partition_count(desired_partition_size_in_bytes=one_gib_in_bytes)\n    \n    se.reproduce().repartition(desired_partition_count).write.save(\"your/write/path\")\n    # or \n    se.reproduce().coalesce(desired_partition_count).write.save(\"your/write/path\")\n```\n#### with SamplingSizeEstimator\n```python\nimport repartipy\n    \none_gib_in_bytes = 1073741824\n\nwith repartipy.SamplingSizeEstimator(spark=spark, df=df, sample_count=10) as se:\n    desired_partition_count = se.get_desired_partition_count(desired_partition_size_in_bytes=one_gib_in_bytes)\n    \n    se.reproduce().repartition(desired_partition_count).write.save(\"your/write/path\")\n    # or \n    se.reproduce().coalesce(desired_partition_count).write.save(\"your/write/path\")\n```\n\n### estimate()\n***Estimate size of a DataFrame***\n#### with SizeEstimator\n```python\nimport repartipy\n\nwith repartipy.SizeEstimator(spark=spark, df=df) as se:\n    df_size_in_bytes = se.estimate()\n```\n#### with SamplingSizeEstimator\n```python\nimport repartipy\n\nwith repartipy.SamplingSizeEstimator(spark=spark, df=df, sample_count=10) as se:\n    df_size_in_bytes = se.estimate()\n```\n\n# Benchmark\n\nOverall, there appears to be a slight performance loss when employing RepartiPy. \nThis benchmark compares the **running time of spark jobs** in the following two cases to give a rough estimate:\n- **Static** Repartition (repartition without RepartiPy)\n```python\n# e.g.\ndf.repartition(123).write.save(\"your/write/path\")\n```\n- **Dynamic** Repartition (repartition with RepartiPy)\n```python\n# e.g.\nwith repartipy.SizeEstimator(spark=spark, df=df) as se:\n    desired_partition_count = se.get_desired_partition_count(desired_partition_size_in_bytes=one_gib_in_bytes)\n    se.reproduce().repartition(desired_partition_count).write.save(\"your/write/path\")\n```\nAll the other conditions remain the same **except the usage of RepartiPy**.\n\n> **Note**\n> \n> Benchmark results provided are for brief reference only, not absolute.\n> Actual performance metrics can vary depending on your own circumstances (e.g. your data, your spark code, your cluster resources, ...).\n\n## SizeEstimator\n- DataFrame Size ~= 256 MiB (decompressed size)\n\n|              | Static  | Dynamic |\n|:------------:|:-------:|:-------:|\n| Running Time | 8.5 min | 8.6 min |\n \n\n## SamplingSizeEstimator\n- DataFrame Size ~= 241 GiB (decompressed size)\n\n|              | Static | Dynamic |\n|:------------:|:------:|:-------:|\n| Running Time | 14 min | 16 min  |\n",
    "bugtrack_url": null,
    "license": "Apache-2.0",
    "summary": "Helper for handling PySpark DataFrame partition size \ud83d\udcd1\ud83c\udf9b\ufe0f",
    "version": "0.1.8",
    "project_urls": {
        "Homepage": "https://github.com/sakjung/repartipy"
    },
    "split_keywords": [
        "apachespark",
        "spark",
        "pyspark"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "840b50f43386e382df837c5bcb2fe31cbb3aa0d92d558c55d15a176ba82d2682",
                "md5": "ce07bdf56d638cfb1a0396aaeeb9a806",
                "sha256": "286ea8e612b88deadf687c8764d9b34e494e8781bf19612555bf6809a5696256"
            },
            "downloads": -1,
            "filename": "repartipy-0.1.8-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "ce07bdf56d638cfb1a0396aaeeb9a806",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.7",
            "size": 10877,
            "upload_time": "2024-03-08T04:47:35",
            "upload_time_iso_8601": "2024-03-08T04:47:35.683905Z",
            "url": "https://files.pythonhosted.org/packages/84/0b/50f43386e382df837c5bcb2fe31cbb3aa0d92d558c55d15a176ba82d2682/repartipy-0.1.8-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "13d5a6ad73055174b2bc4c037047d38d39e69178d3cf24374755255dec1f82f4",
                "md5": "6768cb1e712ca19334583af6d2ed7e6a",
                "sha256": "c81752d36e942cf5659ffeea8fa891e72fb770084f0bef5bb6b5232bd4d41c44"
            },
            "downloads": -1,
            "filename": "repartipy-0.1.8.tar.gz",
            "has_sig": false,
            "md5_digest": "6768cb1e712ca19334583af6d2ed7e6a",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.7",
            "size": 10131,
            "upload_time": "2024-03-08T04:47:37",
            "upload_time_iso_8601": "2024-03-08T04:47:37.500360Z",
            "url": "https://files.pythonhosted.org/packages/13/d5/a6ad73055174b2bc4c037047d38d39e69178d3cf24374755255dec1f82f4/repartipy-0.1.8.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-03-08 04:47:37",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "sakjung",
    "github_project": "repartipy",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": false,
    "lcname": "repartipy"
}
        
Elapsed time: 0.27162s