# SparkCleaner Documentation
## Overview
**Disclaimer: this software is not stable yet**
--------------
SparkCleaner is a Python library for data cleaning using PySpark. It provides various cleaning strategies, a pipeline for applying these strategies, and a logging mechanism for tracking errors and issues.
## Cleaning Strategies
1. **DropDuplicatesStrategy**
- **Purpose**: Removes duplicate rows based on specified columns.
- **Method**: `clean(df: DataFrame) -> DataFrame`
- **Logs**: Logs errors for rows identified as duplicates.
2. **DropMissingValuesStrategy**
- **Purpose**: Removes rows with missing values in specified columns.
- **Method**: `clean(df: DataFrame) -> DataFrame`
- **Logs**: Logs errors for missing values in specified columns.
3. **FilterNegativeValuesStrategy**
- **Purpose**: Filters out rows where specified columns have non-positive values.
- **Method**: `clean(df: DataFrame) -> DataFrame`
- **Logs**: Logs errors for negative values in specified columns.
4. **ValidateColumnTypesStrategy**
- **Purpose**: Ensures columns have the expected data types.
- **Method**: `clean(df: DataFrame) -> DataFrame`
- **Logs**: Logs errors for type mismatches.
5. **ValidateDatesStrategy**
- **Purpose**: Validates dates in specified columns against a given format.
- **Method**: `clean(df: DataFrame) -> DataFrame`
- **Logs**: Logs errors for invalid dates.
6. **ValidateRegexStrategy**
- **Purpose**: Ensures column values match specified regex patterns.
- **Method**: `clean(df: DataFrame) -> DataFrame`
- **Logs**: Logs errors for values not matching the regex.
7. **FilteringStrategy**
- **Purpose**: Filters rows based on specified boolean conditions.
- **Method**: `clean(df: DataFrame) -> DataFrame`
8. **FillNaStrategy**
- **Purpose**: Replaces missing values with specified default values.
- **Method**: `clean(df: DataFrame) -> DataFrame`
## Cleaning Pipeline
- **Class**: `CleaningPipeline`
- **Methods**:
- `add_strategy(strategy: BaseCleaningStrategy | list[BaseCleaningStrategy])`: Adds a cleaning strategy or list of strategies.
- `set_dataframe(df: DataFrame)`: Sets the DataFrame to be cleaned and adds an `__index` column.
- `run() -> DataFrame`: Applies all strategies to the DataFrame and returns the cleaned DataFrame.
- `get_report() -> str`: Returns a JSON report of all logged errors.
## Logging
- **Class**: `Logger`
- **Purpose**: Records errors encountered during the cleaning process.
- **Method**: `log_error(index, column, message)`: Logs errors with details.
### Usage Example
```python
from SparkCleaner import *
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, IntegerType
data = [
(1, "Alice", 30, "2024-07-01", "alice@example.com"),
(2, "Bob", None, "2024-07/02", "bob@example"), # null in age col
(3, "Charlie", 25, "invalid_date", None), # empty email and invalid date
(4, "David", -5, "2024-07-04", "david@example.com"),
(5, "Eve", 22, "2024-07-05", "eve@example.com"),
(5, "Eve", 22, "2024-07-05", "eve@example.com"),
(5, "Eve", '22', "05-07-2024", "eve@example.com"), #string value in age column
]
schema = ["id", "name", "age", "date", "email"]
spark = SparkSession.builder \
.appName("DataCleaningTests") \
.master("local[*]") \
.getOrCreate()
df = spark.createDataFrame(data, schema=schema)
cleaning_pipeline = CleaningPipeline()
strategies_pipeline =[
DropDuplicatesStrategy(columns=df.columns),
ValidateRegexStrategy(columns=["email"], patterns={'email': '^[\\w.%+-]+@[\\w.-]+\\.[a-zA-Z]{2,}$'}),
DropMissingValuesStrategy(columns=["age"]),
FilterNegativeValuesStrategy(columns=["age"]),
ValidateColumnTypesStrategy(columns=df.columns,
expected_types={
'age': IntegerType(),
'email': StringType()
# add more if needed
}),
ValidateDatesStrategy(columns=["date"], date_format='yyyy-MM-dd'),
]
cleaning_pipeline.add_strategy(strategy=strategies_pipeline)
cleaning_pipeline.set_dataframe(df = df)
cleaned_df = cleaning_pipeline.run()
errors_report = cleaning_pipeline.get_report()
```
Raw data
{
"_id": null,
"home_page": null,
"name": "Spark-df-Cleaner",
"maintainer": null,
"docs_url": null,
"requires_python": null,
"maintainer_email": null,
"keywords": "python, spark, data cleaning",
"author": "Ahmad Muhammad",
"author_email": "ahmadmuhammadgd@gmail.com",
"download_url": "https://files.pythonhosted.org/packages/47/b8/5444bdbe291072820f55202ad05e49a47e249a3b02aae2fa034d130dd9ca/Spark-df-Cleaner-0.0.5.tar.gz",
"platform": null,
"description": "# SparkCleaner Documentation\n\n## Overview \n**Disclaimer: this software is not stable yet**\n-------------- \nSparkCleaner is a Python library for data cleaning using PySpark. It provides various cleaning strategies, a pipeline for applying these strategies, and a logging mechanism for tracking errors and issues.\n\n## Cleaning Strategies\n\n1. **DropDuplicatesStrategy**\n - **Purpose**: Removes duplicate rows based on specified columns.\n - **Method**: `clean(df: DataFrame) -> DataFrame`\n - **Logs**: Logs errors for rows identified as duplicates.\n\n2. **DropMissingValuesStrategy**\n - **Purpose**: Removes rows with missing values in specified columns.\n - **Method**: `clean(df: DataFrame) -> DataFrame`\n - **Logs**: Logs errors for missing values in specified columns.\n\n3. **FilterNegativeValuesStrategy**\n - **Purpose**: Filters out rows where specified columns have non-positive values.\n - **Method**: `clean(df: DataFrame) -> DataFrame`\n - **Logs**: Logs errors for negative values in specified columns.\n\n4. **ValidateColumnTypesStrategy**\n - **Purpose**: Ensures columns have the expected data types.\n - **Method**: `clean(df: DataFrame) -> DataFrame`\n - **Logs**: Logs errors for type mismatches.\n\n5. **ValidateDatesStrategy**\n - **Purpose**: Validates dates in specified columns against a given format.\n - **Method**: `clean(df: DataFrame) -> DataFrame`\n - **Logs**: Logs errors for invalid dates.\n\n6. **ValidateRegexStrategy**\n - **Purpose**: Ensures column values match specified regex patterns.\n - **Method**: `clean(df: DataFrame) -> DataFrame`\n - **Logs**: Logs errors for values not matching the regex.\n\n7. **FilteringStrategy**\n - **Purpose**: Filters rows based on specified boolean conditions.\n - **Method**: `clean(df: DataFrame) -> DataFrame`\n\n8. **FillNaStrategy**\n - **Purpose**: Replaces missing values with specified default values.\n - **Method**: `clean(df: DataFrame) -> DataFrame`\n\n## Cleaning Pipeline\n\n- **Class**: `CleaningPipeline`\n- **Methods**:\n - `add_strategy(strategy: BaseCleaningStrategy | list[BaseCleaningStrategy])`: Adds a cleaning strategy or list of strategies.\n - `set_dataframe(df: DataFrame)`: Sets the DataFrame to be cleaned and adds an `__index` column.\n - `run() -> DataFrame`: Applies all strategies to the DataFrame and returns the cleaned DataFrame.\n - `get_report() -> str`: Returns a JSON report of all logged errors.\n\n## Logging\n- **Class**: `Logger`\n- **Purpose**: Records errors encountered during the cleaning process.\n- **Method**: `log_error(index, column, message)`: Logs errors with details.\n\n### Usage Example\n\n```python\nfrom SparkCleaner import *\nfrom pyspark.sql import SparkSession\nfrom pyspark.sql.types import StringType, IntegerType\n\ndata = [\n (1, \"Alice\", 30, \"2024-07-01\", \"alice@example.com\"),\n (2, \"Bob\", None, \"2024-07/02\", \"bob@example\"), # null in age col\n (3, \"Charlie\", 25, \"invalid_date\", None), # empty email and invalid date\n (4, \"David\", -5, \"2024-07-04\", \"david@example.com\"),\n (5, \"Eve\", 22, \"2024-07-05\", \"eve@example.com\"),\n (5, \"Eve\", 22, \"2024-07-05\", \"eve@example.com\"),\n (5, \"Eve\", '22', \"05-07-2024\", \"eve@example.com\"), #string value in age column\n]\nschema = [\"id\", \"name\", \"age\", \"date\", \"email\"]\n\nspark = SparkSession.builder \\\n .appName(\"DataCleaningTests\") \\\n .master(\"local[*]\") \\\n .getOrCreate()\n\ndf = spark.createDataFrame(data, schema=schema)\ncleaning_pipeline = CleaningPipeline()\nstrategies_pipeline =[\n DropDuplicatesStrategy(columns=df.columns),\n ValidateRegexStrategy(columns=[\"email\"], patterns={'email': '^[\\\\w.%+-]+@[\\\\w.-]+\\\\.[a-zA-Z]{2,}$'}),\n DropMissingValuesStrategy(columns=[\"age\"]),\n FilterNegativeValuesStrategy(columns=[\"age\"]),\n ValidateColumnTypesStrategy(columns=df.columns, \n expected_types={\n 'age': IntegerType(), \n 'email': StringType()\n # add more if needed\n }),\n ValidateDatesStrategy(columns=[\"date\"], date_format='yyyy-MM-dd'),\n]\n\n\ncleaning_pipeline.add_strategy(strategy=strategies_pipeline)\ncleaning_pipeline.set_dataframe(df = df)\n\ncleaned_df = cleaning_pipeline.run()\nerrors_report = cleaning_pipeline.get_report()\n```\n\n",
"bugtrack_url": null,
"license": null,
"summary": "spark dataframe cleaner",
"version": "0.0.5",
"project_urls": null,
"split_keywords": [
"python",
" spark",
" data cleaning"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "e3e253e09877da4df0410a3dac3f21c2dc4c44dd33cfe4f6ddc211f9a227349c",
"md5": "1746ff5435af31272781da146fd78cd8",
"sha256": "4bcd184680cf6683e60b5334123b5840942fa76667f8a70662714188fb4beb67"
},
"downloads": -1,
"filename": "Spark_df_Cleaner-0.0.5-py3-none-any.whl",
"has_sig": false,
"md5_digest": "1746ff5435af31272781da146fd78cd8",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": null,
"size": 9686,
"upload_time": "2024-09-03T14:40:37",
"upload_time_iso_8601": "2024-09-03T14:40:37.356823Z",
"url": "https://files.pythonhosted.org/packages/e3/e2/53e09877da4df0410a3dac3f21c2dc4c44dd33cfe4f6ddc211f9a227349c/Spark_df_Cleaner-0.0.5-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "47b85444bdbe291072820f55202ad05e49a47e249a3b02aae2fa034d130dd9ca",
"md5": "5e120be33659b532332d012f26c74e44",
"sha256": "5d1c3b344823ef7bceb58688d9702c249fcc064f776b477a0aca05c01dd90d71"
},
"downloads": -1,
"filename": "Spark-df-Cleaner-0.0.5.tar.gz",
"has_sig": false,
"md5_digest": "5e120be33659b532332d012f26c74e44",
"packagetype": "sdist",
"python_version": "source",
"requires_python": null,
"size": 7306,
"upload_time": "2024-09-03T14:40:41",
"upload_time_iso_8601": "2024-09-03T14:40:41.198373Z",
"url": "https://files.pythonhosted.org/packages/47/b8/5444bdbe291072820f55202ad05e49a47e249a3b02aae2fa034d130dd9ca/Spark-df-Cleaner-0.0.5.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-09-03 14:40:41",
"github": false,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"lcname": "spark-df-cleaner"
}