Spark-df-Cleaner


NameSpark-df-Cleaner JSON
Version 0.0.5 PyPI version JSON
download
home_pageNone
Summaryspark dataframe cleaner
upload_time2024-09-03 14:40:41
maintainerNone
docs_urlNone
authorAhmad Muhammad
requires_pythonNone
licenseNone
keywords python spark data cleaning
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # SparkCleaner Documentation

## Overview          
**Disclaimer: this software is not stable yet**
--------------     
SparkCleaner is a Python library for data cleaning using PySpark. It provides various cleaning strategies, a pipeline for applying these strategies, and a logging mechanism for tracking errors and issues.

## Cleaning Strategies

1. **DropDuplicatesStrategy**
   - **Purpose**: Removes duplicate rows based on specified columns.
   - **Method**: `clean(df: DataFrame) -> DataFrame`
   - **Logs**: Logs errors for rows identified as duplicates.

2. **DropMissingValuesStrategy**
   - **Purpose**: Removes rows with missing values in specified columns.
   - **Method**: `clean(df: DataFrame) -> DataFrame`
   - **Logs**: Logs errors for missing values in specified columns.

3. **FilterNegativeValuesStrategy**
   - **Purpose**: Filters out rows where specified columns have non-positive values.
   - **Method**: `clean(df: DataFrame) -> DataFrame`
   - **Logs**: Logs errors for negative values in specified columns.

4. **ValidateColumnTypesStrategy**
   - **Purpose**: Ensures columns have the expected data types.
   - **Method**: `clean(df: DataFrame) -> DataFrame`
   - **Logs**: Logs errors for type mismatches.

5. **ValidateDatesStrategy**
   - **Purpose**: Validates dates in specified columns against a given format.
   - **Method**: `clean(df: DataFrame) -> DataFrame`
   - **Logs**: Logs errors for invalid dates.

6. **ValidateRegexStrategy**
   - **Purpose**: Ensures column values match specified regex patterns.
   - **Method**: `clean(df: DataFrame) -> DataFrame`
   - **Logs**: Logs errors for values not matching the regex.

7. **FilteringStrategy**
   - **Purpose**: Filters rows based on specified boolean conditions.
   - **Method**: `clean(df: DataFrame) -> DataFrame`

8. **FillNaStrategy**
   - **Purpose**: Replaces missing values with specified default values.
   - **Method**: `clean(df: DataFrame) -> DataFrame`

## Cleaning Pipeline

- **Class**: `CleaningPipeline`
- **Methods**:
  - `add_strategy(strategy: BaseCleaningStrategy | list[BaseCleaningStrategy])`: Adds a cleaning strategy or list of strategies.
  - `set_dataframe(df: DataFrame)`: Sets the DataFrame to be cleaned and adds an `__index` column.
  - `run() -> DataFrame`: Applies all strategies to the DataFrame and returns the cleaned DataFrame.
  - `get_report() -> str`: Returns a JSON report of all logged errors.

## Logging
- **Class**: `Logger`
- **Purpose**: Records errors encountered during the cleaning process.
- **Method**: `log_error(index, column, message)`: Logs errors with details.

### Usage Example

```python
from SparkCleaner import *
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, IntegerType

data = [
    (1, "Alice", 30, "2024-07-01", "alice@example.com"),
    (2, "Bob", None, "2024-07/02", "bob@example"), # null in age col
    (3, "Charlie", 25, "invalid_date", None), # empty email and invalid date
    (4, "David", -5, "2024-07-04", "david@example.com"),
    (5, "Eve", 22, "2024-07-05", "eve@example.com"),
    (5, "Eve", 22, "2024-07-05", "eve@example.com"),
    (5, "Eve", '22', "05-07-2024", "eve@example.com"),  #string value in age column
]
schema = ["id", "name", "age", "date", "email"]

spark = SparkSession.builder \
    .appName("DataCleaningTests") \
    .master("local[*]") \
    .getOrCreate()

df = spark.createDataFrame(data, schema=schema)
cleaning_pipeline = CleaningPipeline()
strategies_pipeline =[
    DropDuplicatesStrategy(columns=df.columns),
    ValidateRegexStrategy(columns=["email"], patterns={'email': '^[\\w.%+-]+@[\\w.-]+\\.[a-zA-Z]{2,}$'}),
    DropMissingValuesStrategy(columns=["age"]),
    FilterNegativeValuesStrategy(columns=["age"]),
    ValidateColumnTypesStrategy(columns=df.columns, 
            expected_types={
                'age': IntegerType(), 
                'email': StringType()
                # add more if needed
    }),
    ValidateDatesStrategy(columns=["date"], date_format='yyyy-MM-dd'),
]


cleaning_pipeline.add_strategy(strategy=strategies_pipeline)
cleaning_pipeline.set_dataframe(df = df)

cleaned_df = cleaning_pipeline.run()
errors_report = cleaning_pipeline.get_report()
```


            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "Spark-df-Cleaner",
    "maintainer": null,
    "docs_url": null,
    "requires_python": null,
    "maintainer_email": null,
    "keywords": "python, spark, data cleaning",
    "author": "Ahmad Muhammad",
    "author_email": "ahmadmuhammadgd@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/47/b8/5444bdbe291072820f55202ad05e49a47e249a3b02aae2fa034d130dd9ca/Spark-df-Cleaner-0.0.5.tar.gz",
    "platform": null,
    "description": "# SparkCleaner Documentation\n\n## Overview          \n**Disclaimer: this software is not stable yet**\n--------------     \nSparkCleaner is a Python library for data cleaning using PySpark. It provides various cleaning strategies, a pipeline for applying these strategies, and a logging mechanism for tracking errors and issues.\n\n## Cleaning Strategies\n\n1. **DropDuplicatesStrategy**\n   - **Purpose**: Removes duplicate rows based on specified columns.\n   - **Method**: `clean(df: DataFrame) -> DataFrame`\n   - **Logs**: Logs errors for rows identified as duplicates.\n\n2. **DropMissingValuesStrategy**\n   - **Purpose**: Removes rows with missing values in specified columns.\n   - **Method**: `clean(df: DataFrame) -> DataFrame`\n   - **Logs**: Logs errors for missing values in specified columns.\n\n3. **FilterNegativeValuesStrategy**\n   - **Purpose**: Filters out rows where specified columns have non-positive values.\n   - **Method**: `clean(df: DataFrame) -> DataFrame`\n   - **Logs**: Logs errors for negative values in specified columns.\n\n4. **ValidateColumnTypesStrategy**\n   - **Purpose**: Ensures columns have the expected data types.\n   - **Method**: `clean(df: DataFrame) -> DataFrame`\n   - **Logs**: Logs errors for type mismatches.\n\n5. **ValidateDatesStrategy**\n   - **Purpose**: Validates dates in specified columns against a given format.\n   - **Method**: `clean(df: DataFrame) -> DataFrame`\n   - **Logs**: Logs errors for invalid dates.\n\n6. **ValidateRegexStrategy**\n   - **Purpose**: Ensures column values match specified regex patterns.\n   - **Method**: `clean(df: DataFrame) -> DataFrame`\n   - **Logs**: Logs errors for values not matching the regex.\n\n7. **FilteringStrategy**\n   - **Purpose**: Filters rows based on specified boolean conditions.\n   - **Method**: `clean(df: DataFrame) -> DataFrame`\n\n8. **FillNaStrategy**\n   - **Purpose**: Replaces missing values with specified default values.\n   - **Method**: `clean(df: DataFrame) -> DataFrame`\n\n## Cleaning Pipeline\n\n- **Class**: `CleaningPipeline`\n- **Methods**:\n  - `add_strategy(strategy: BaseCleaningStrategy | list[BaseCleaningStrategy])`: Adds a cleaning strategy or list of strategies.\n  - `set_dataframe(df: DataFrame)`: Sets the DataFrame to be cleaned and adds an `__index` column.\n  - `run() -> DataFrame`: Applies all strategies to the DataFrame and returns the cleaned DataFrame.\n  - `get_report() -> str`: Returns a JSON report of all logged errors.\n\n## Logging\n- **Class**: `Logger`\n- **Purpose**: Records errors encountered during the cleaning process.\n- **Method**: `log_error(index, column, message)`: Logs errors with details.\n\n### Usage Example\n\n```python\nfrom SparkCleaner import *\nfrom pyspark.sql import SparkSession\nfrom pyspark.sql.types import StringType, IntegerType\n\ndata = [\n    (1, \"Alice\", 30, \"2024-07-01\", \"alice@example.com\"),\n    (2, \"Bob\", None, \"2024-07/02\", \"bob@example\"), # null in age col\n    (3, \"Charlie\", 25, \"invalid_date\", None), # empty email and invalid date\n    (4, \"David\", -5, \"2024-07-04\", \"david@example.com\"),\n    (5, \"Eve\", 22, \"2024-07-05\", \"eve@example.com\"),\n    (5, \"Eve\", 22, \"2024-07-05\", \"eve@example.com\"),\n    (5, \"Eve\", '22', \"05-07-2024\", \"eve@example.com\"),  #string value in age column\n]\nschema = [\"id\", \"name\", \"age\", \"date\", \"email\"]\n\nspark = SparkSession.builder \\\n    .appName(\"DataCleaningTests\") \\\n    .master(\"local[*]\") \\\n    .getOrCreate()\n\ndf = spark.createDataFrame(data, schema=schema)\ncleaning_pipeline = CleaningPipeline()\nstrategies_pipeline =[\n    DropDuplicatesStrategy(columns=df.columns),\n    ValidateRegexStrategy(columns=[\"email\"], patterns={'email': '^[\\\\w.%+-]+@[\\\\w.-]+\\\\.[a-zA-Z]{2,}$'}),\n    DropMissingValuesStrategy(columns=[\"age\"]),\n    FilterNegativeValuesStrategy(columns=[\"age\"]),\n    ValidateColumnTypesStrategy(columns=df.columns, \n            expected_types={\n                'age': IntegerType(), \n                'email': StringType()\n                # add more if needed\n    }),\n    ValidateDatesStrategy(columns=[\"date\"], date_format='yyyy-MM-dd'),\n]\n\n\ncleaning_pipeline.add_strategy(strategy=strategies_pipeline)\ncleaning_pipeline.set_dataframe(df = df)\n\ncleaned_df = cleaning_pipeline.run()\nerrors_report = cleaning_pipeline.get_report()\n```\n\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "spark dataframe cleaner",
    "version": "0.0.5",
    "project_urls": null,
    "split_keywords": [
        "python",
        " spark",
        " data cleaning"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "e3e253e09877da4df0410a3dac3f21c2dc4c44dd33cfe4f6ddc211f9a227349c",
                "md5": "1746ff5435af31272781da146fd78cd8",
                "sha256": "4bcd184680cf6683e60b5334123b5840942fa76667f8a70662714188fb4beb67"
            },
            "downloads": -1,
            "filename": "Spark_df_Cleaner-0.0.5-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "1746ff5435af31272781da146fd78cd8",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": null,
            "size": 9686,
            "upload_time": "2024-09-03T14:40:37",
            "upload_time_iso_8601": "2024-09-03T14:40:37.356823Z",
            "url": "https://files.pythonhosted.org/packages/e3/e2/53e09877da4df0410a3dac3f21c2dc4c44dd33cfe4f6ddc211f9a227349c/Spark_df_Cleaner-0.0.5-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "47b85444bdbe291072820f55202ad05e49a47e249a3b02aae2fa034d130dd9ca",
                "md5": "5e120be33659b532332d012f26c74e44",
                "sha256": "5d1c3b344823ef7bceb58688d9702c249fcc064f776b477a0aca05c01dd90d71"
            },
            "downloads": -1,
            "filename": "Spark-df-Cleaner-0.0.5.tar.gz",
            "has_sig": false,
            "md5_digest": "5e120be33659b532332d012f26c74e44",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": null,
            "size": 7306,
            "upload_time": "2024-09-03T14:40:41",
            "upload_time_iso_8601": "2024-09-03T14:40:41.198373Z",
            "url": "https://files.pythonhosted.org/packages/47/b8/5444bdbe291072820f55202ad05e49a47e249a3b02aae2fa034d130dd9ca/Spark-df-Cleaner-0.0.5.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-09-03 14:40:41",
    "github": false,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "lcname": "spark-df-cleaner"
}
        
Elapsed time: 1.30833s