# Flake8-pyspark-with-column
[![Upload Python Package](https://github.com/SemyonSinchenko/flake8-pyspark-with-column/actions/workflows/python-publish.yml/badge.svg)](https://github.com/SemyonSinchenko/flake8-pyspark-with-column/actions/workflows/python-publish.yml) ![PyPI - Downloads](https://img.shields.io/pypi/dm/flake8-pyspark-with-column)
## Getting started
```sh
pip install flake8-pyspark-with-column
flake8 --select PSRPK001,PSPRT002,PSPRK003,PSPRK004
```
Alternatively you can add the following `tox.ini` file to the root of your project:
```
[flake8]
select =
PSPRK001,
PSPRK002,
PSPRK003,
PSPRK004
```
## About
A flake8 plugin that detects of usage `withColumn` in a loop or inside `reduce`. From the PySpark documentation about `withColumn` method:
> This method introduces a projection internally. Therefore, calling it multiple times, for instance, via loops in order to add multiple columns can generate big plans which can cause performance issues and even StackOverflowException. To avoid this, use select() with multiple columns at once.
### What happens under the hood?
When you run a PySpark application the following happens:
1. Spark creates `Unresolved Logical Plan` that is a result of parsing SQL
2. Spark do analysis of this plan to create an `Analyzed Logical Plan`
3. Spark apply optimization rules to create an `Optimized Logical Plan`
<p align="center">
<img src="https://www.databricks.com/wp-content/uploads/2018/05/Catalyst-Optimizer-diagram.png" alt="spark-flow" width="800" align="middle"/>
</p>
What is the problem with `withColumn`? It creates a single node in the unresolved plan. So, calling `withColumn` 500 times will create an unresolved plan with 500 nodes. During the analysis Spark should visit each node to check that column exists and has a right data type. After that Spark will start applying rules, but rules are applyed once per plan recursively, so concatenation of 500 calls to `withColumn` will require 500 applies of the corresponding rule. All of that may significantly increase the amount of time from `Unresolved Logical Plan` to `Optimized Logical Plan`:
<p align="center">
<img src="https://raw.githubusercontent.com/SemyonSinchenko/flake8-pyspark-with-column/refs/heads/main/static/with_column_performance.png" alt="bechmark" width="600" align="middle"/>
</p>
From the other side, both `withColumns` and `select(*cols)` create only one node in the plan doesn't matter how many columns we want to add.
## Rules
This plugin contains the following rules:
- `PSPRK001`: Usage of withColumn in a loop detected
- `PSPRK002`: Usage of withColumn inside reduce is detected
- `PSPRK003`: Usage of withColumnRenamed in a loop detected
- `PSPRK004`: Usage of withColumnRenamed inside reduce is detected
### Examples
Let's imagine we want to apply an ML model to our data but our Model expects double values and our table contain decimal values. The goal is to cast all `Decimal` columns to `Double`.
Implementation with `withColumn` (bad example):
```python
def cast_to_double(df: DataFrame) -> DataFrame:
for field in df.schema.fields:
if isinstance(field.dataType, DecimalType):
df = df.withColumn(field.name, col(field.name).cast(DoubleType()))
return df
```
Implementation without `withColumn` (good example):
```python
def cast_to_double(df: DataFrame) -> DataFrame:
cols_to_select = []
for field in df.schema.fields:
if isinstance(field.dataType, DecimalType):
cols_to_select.append(col(field.name).cast(DoubleType()).alias(field.name))
else:
cols_to_select.append(col(field.name))
return df.select(*cols_to_select)
```
## Usage
`flake8 %your-code-here%`
<p align="center">
<img src="https://raw.githubusercontent.com/SemyonSinchenko/flake8-pyspark-with-column/refs/heads/main/static/usage.png" alt="screenshot of how it works" width="800" align="middle"/>
</p>
Raw data
{
"_id": null,
"home_page": null,
"name": "flake8-pyspark-with-column",
"maintainer": null,
"docs_url": null,
"requires_python": null,
"maintainer_email": null,
"keywords": "flake8, linter, pyspark, quality",
"author": null,
"author_email": "Sem Sinchenko <ssinchenko@apache.org>",
"download_url": "https://files.pythonhosted.org/packages/27/fe/d64091f77768945bc16d281bb261a261a78968a0ecdb661ac8ca0f296e2e/flake8_pyspark_with_column-0.0.4.tar.gz",
"platform": null,
"description": "# Flake8-pyspark-with-column\n\n[![Upload Python Package](https://github.com/SemyonSinchenko/flake8-pyspark-with-column/actions/workflows/python-publish.yml/badge.svg)](https://github.com/SemyonSinchenko/flake8-pyspark-with-column/actions/workflows/python-publish.yml) ![PyPI - Downloads](https://img.shields.io/pypi/dm/flake8-pyspark-with-column)\n\n## Getting started\n\n```sh\npip install flake8-pyspark-with-column\nflake8 --select PSRPK001,PSPRT002,PSPRK003,PSPRK004\n```\n\nAlternatively you can add the following `tox.ini` file to the root of your project:\n\n```\n[flake8]\nselect = \n PSPRK001,\n PSPRK002,\n PSPRK003,\n PSPRK004\n```\n\n## About\n\nA flake8 plugin that detects of usage `withColumn` in a loop or inside `reduce`. From the PySpark documentation about `withColumn` method:\n\n> This method introduces a projection internally. Therefore, calling it multiple times, for instance, via loops in order to add multiple columns can generate big plans which can cause performance issues and even StackOverflowException. To avoid this, use select() with multiple columns at once.\n\n### What happens under the hood?\n\nWhen you run a PySpark application the following happens:\n\n1. Spark creates `Unresolved Logical Plan` that is a result of parsing SQL\n2. Spark do analysis of this plan to create an `Analyzed Logical Plan`\n3. Spark apply optimization rules to create an `Optimized Logical Plan`\n\n<p align=\"center\">\n <img src=\"https://www.databricks.com/wp-content/uploads/2018/05/Catalyst-Optimizer-diagram.png\" alt=\"spark-flow\" width=\"800\" align=\"middle\"/>\n</p>\n\nWhat is the problem with `withColumn`? It creates a single node in the unresolved plan. So, calling `withColumn` 500 times will create an unresolved plan with 500 nodes. During the analysis Spark should visit each node to check that column exists and has a right data type. After that Spark will start applying rules, but rules are applyed once per plan recursively, so concatenation of 500 calls to `withColumn` will require 500 applies of the corresponding rule. All of that may significantly increase the amount of time from `Unresolved Logical Plan` to `Optimized Logical Plan`:\n\n<p align=\"center\">\n <img src=\"https://raw.githubusercontent.com/SemyonSinchenko/flake8-pyspark-with-column/refs/heads/main/static/with_column_performance.png\" alt=\"bechmark\" width=\"600\" align=\"middle\"/>\n</p>\n\nFrom the other side, both `withColumns` and `select(*cols)` create only one node in the plan doesn't matter how many columns we want to add.\n\n## Rules\nThis plugin contains the following rules:\n\n- `PSPRK001`: Usage of withColumn in a loop detected\n- `PSPRK002`: Usage of withColumn inside reduce is detected\n- `PSPRK003`: Usage of withColumnRenamed in a loop detected\n- `PSPRK004`: Usage of withColumnRenamed inside reduce is detected\n\n### Examples\n\nLet's imagine we want to apply an ML model to our data but our Model expects double values and our table contain decimal values. The goal is to cast all `Decimal` columns to `Double`.\n\nImplementation with `withColumn` (bad example):\n\n```python\ndef cast_to_double(df: DataFrame) -> DataFrame:\n for field in df.schema.fields:\n if isinstance(field.dataType, DecimalType):\n df = df.withColumn(field.name, col(field.name).cast(DoubleType()))\n return df\n```\n\nImplementation without `withColumn` (good example):\n\n```python\ndef cast_to_double(df: DataFrame) -> DataFrame:\n cols_to_select = []\n for field in df.schema.fields:\n if isinstance(field.dataType, DecimalType):\n cols_to_select.append(col(field.name).cast(DoubleType()).alias(field.name))\n else:\n cols_to_select.append(col(field.name))\n return df.select(*cols_to_select)\n```\n\n## Usage\n\n`flake8 %your-code-here%`\n\n<p align=\"center\">\n <img src=\"https://raw.githubusercontent.com/SemyonSinchenko/flake8-pyspark-with-column/refs/heads/main/static/usage.png\" alt=\"screenshot of how it works\" width=\"800\" align=\"middle\"/>\n</p>\n",
"bugtrack_url": null,
"license": null,
"summary": "A Flake8 plugin to check for PySpark withColumn usage in loops",
"version": "0.0.4",
"project_urls": {
"Homepage": "https://github.com/SemyonSinchenko/flake8-pyspark-with-column",
"Repository": "https://github.com/SemyonSinchenko/flake8-pyspark-with-column.git"
},
"split_keywords": [
"flake8",
" linter",
" pyspark",
" quality"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "8a63af4e5ec2d63b93ad6ac3c1a89396dde995cadd1916394af01512a691f97c",
"md5": "5c7eb9fcc39525908583bc5d5c7a34ed",
"sha256": "c3ca754f8fe3888244b73c837ccdafecefef2fbdd6f002983cde5c7a5d9013f1"
},
"downloads": -1,
"filename": "flake8_pyspark_with_column-0.0.4-py2.py3-none-any.whl",
"has_sig": false,
"md5_digest": "5c7eb9fcc39525908583bc5d5c7a34ed",
"packagetype": "bdist_wheel",
"python_version": "py2.py3",
"requires_python": null,
"size": 8234,
"upload_time": "2024-09-24T08:36:54",
"upload_time_iso_8601": "2024-09-24T08:36:54.448198Z",
"url": "https://files.pythonhosted.org/packages/8a/63/af4e5ec2d63b93ad6ac3c1a89396dde995cadd1916394af01512a691f97c/flake8_pyspark_with_column-0.0.4-py2.py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "27fed64091f77768945bc16d281bb261a261a78968a0ecdb661ac8ca0f296e2e",
"md5": "dc9fd1d85c2cb4b7ba1892d3680c520d",
"sha256": "14e221e267aaaa570a302c05111944985e9f69669f0a974afb8594468dcc117a"
},
"downloads": -1,
"filename": "flake8_pyspark_with_column-0.0.4.tar.gz",
"has_sig": false,
"md5_digest": "dc9fd1d85c2cb4b7ba1892d3680c520d",
"packagetype": "sdist",
"python_version": "source",
"requires_python": null,
"size": 8631,
"upload_time": "2024-09-24T08:36:55",
"upload_time_iso_8601": "2024-09-24T08:36:55.828743Z",
"url": "https://files.pythonhosted.org/packages/27/fe/d64091f77768945bc16d281bb261a261a78968a0ecdb661ac8ca0f296e2e/flake8_pyspark_with_column-0.0.4.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-09-24 08:36:55",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "SemyonSinchenko",
"github_project": "flake8-pyspark-with-column",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"tox": true,
"lcname": "flake8-pyspark-with-column"
}