snowpark-checkpoints-validators


Namesnowpark-checkpoints-validators JSON
Version 0.1.3 PyPI version JSON
download
home_pageNone
SummaryMigration tools for Snowpark
upload_time2025-02-07 20:19:32
maintainerNone
docs_urlNone
authorNone
requires_python<3.12,>=3.9
licenseApache License, Version 2.0
keywords snowflake snowpark analytics cloud database db
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # snowpark-checkpoints-validators

---
##### This package is on Public Preview.
---

**snowpark-checkpoints-validators** is a package designed to validate Snowpark DataFrames against predefined schemas and checkpoints. This package ensures data integrity and consistency by performing schema and data validation checks at various stages of a Snowpark pipeline.

## Features

- Validate Snowpark DataFrames against predefined Pandera schemas.
- Perform custom checks and skip specific checks as needed.
- Generate validation results and log them for further analysis.
- Support for sampling strategies to validate large datasets efficiently.
- Integration with PySpark for cross-validation between Snowpark and PySpark DataFrames.

## Functionalities

### Validate DataFrame Schema from File

The `validate_dataframe_checkpoint` function validates a Snowpark DataFrame against a checkpoint schema file or dataframe.

```python
from snowflake.snowpark import DataFrame as SnowparkDataFrame
from snowflake.snowpark_checkpoints.job_context import SnowparkJobContext
from snowflake.snowpark_checkpoints.utils.constant import (
    CheckpointMode,
)
from snowflake.snowpark_checkpoints.spark_migration import SamplingStrategy
from typing import Any, Optional

# Signature of the function
def validate_dataframe_checkpoint(
    df: SnowparkDataFrame,
    checkpoint_name: str,
    job_context: Optional[SnowparkJobContext] = None,
    mode: Optional[CheckpointMode] = CheckpointMode.SCHEMA,
    custom_checks: Optional[dict[Any, Any]] = None,
    skip_checks: Optional[dict[Any, Any]] = None,
    sample_frac: Optional[float] = 1.0,
    sample_number: Optional[int] = None,
    sampling_strategy: Optional[SamplingStrategy] = SamplingStrategy.RANDOM_SAMPLE,
    output_path: Optional[str] = None,
):
...
```

- `df`: Snowpark dataframe to validate.
- `checkpoint_name`: Name of the checkpoint schema file or dataframe.
- `job_context`: Snowpark job context.
- `mode`: Checkpoint mode (schema or data).
- `custom_checks`: Custom checks to perform.
- `skip_checks`: Checks to skip.
- `sample_frac`: Fraction of the dataframe to sample.
- `sample_number`: Number of rows to sample.
- `sampling_strategy`: Sampling strategy to use.
- `output_path`: Output path for the checkpoint report.

### Usage Example

```python
from snowflake.snowpark import Session
from snowflake.snowpark_checkpoints.utils.constant import (
    CheckpointMode,
)
from snowflake.snowpark_checkpoints.checkpoint import validate_dataframe_checkpoint
from snowflake.snowpark_checkpoints.spark_migration import SamplingStrategy
from snowflake.snowpark_checkpoints.job_context import SnowparkJobContext
from pyspark.sql import SparkSession

session = Session.builder.getOrCreate()
job_context = SnowparkJobContext(
    session, SparkSession.builder.getOrCreate(), "job_context", True
)
df = session.read.format("csv").load("data.csv")

validate_dataframe_checkpoint(
    df,
    "schema_checkpoint",
    job_context=job_context,
    mode=CheckpointMode.SCHEMA,
    sample_frac=0.1,
    sampling_strategy=SamplingStrategy.RANDOM_SAMPLE
)
```

### Check with Spark Decorator

The `check_with_spark` decorator converts any Snowpark dataframe arguments to a function, samples them, and converts them to PySpark dataframe. It then executes a provided Spark function and compares the outputs between the two implementations.

```python
from snowflake.snowpark_checkpoints.job_context import SnowparkJobContext
from snowflake.snowpark_checkpoints.spark_migration import SamplingStrategy
from typing import Callable, Optional, TypeVar

fn = TypeVar("F", bound=Callable)

# Signature of the decorator
def check_with_spark(
    job_context: Optional[SnowparkJobContext],
    spark_function: fn,
    checkpoint_name: str,
    sample_number: Optional[int] = 100,
    sampling_strategy: Optional[SamplingStrategy] = SamplingStrategy.RANDOM_SAMPLE,
    output_path: Optional[str] = None,
) -> Callable[[fn], fn]:
    ...
```

- `job_context`: Snowpark job context.
- `spark_function`: PySpark function to execute.
- `checkpoint_name`: Name of the check.
- `sample_number`: Number of rows to sample.
- `sampling_strategy`: Sampling strategy to use.
- `output_path`: Output path for the checkpoint report.

### Usage Example

```python
from snowflake.snowpark import Session
from snowflake.snowpark import DataFrame as SnowparkDataFrame
from snowflake.snowpark_checkpoints.spark_migration import check_with_spark
from snowflake.snowpark_checkpoints.job_context import SnowparkJobContext
from pyspark.sql import DataFrame as SparkDataFrame, SparkSession

session = Session.builder.getOrCreate()
job_context = SnowparkJobContext(
    session, SparkSession.builder.getOrCreate(), "job_context", True
)

def my_spark_scalar_fn(df: SparkDataFrame):
    return df.count()

@check_with_spark(
    job_context=job_context,
    spark_function=my_spark_scalar_fn,
    checkpoint_name="count_checkpoint",
)
def my_snowpark_scalar_fn(df: SnowparkDataFrame):
    return df.count()

df = job_context.snowpark_session.create_dataframe(
    [[1, 2], [3, 4]], schema=["a", "b"]
)
count = my_snowpark_scalar_fn(df)
```

### Pandera Snowpark Decorators

The decorators `@check_input_schema` and `@check_output_schema` allow for sampled schema validation of Snowpark dataframes in the input arguments or in the return value.

```python
from snowflake.snowpark_checkpoints.spark_migration import SamplingStrategy
from snowflake.snowpark_checkpoints.job_context import SnowparkJobContext
from pandera import DataFrameSchema
from typing import Optional

# Signature of the decorator
def check_input_schema(
    pandera_schema: DataFrameSchema,
    checkpoint_name: str,
    sample_frac: Optional[float] = 1.0,
    sample_number: Optional[int] = None,
    sampling_strategy: Optional[SamplingStrategy] = SamplingStrategy.RANDOM_SAMPLE,
    job_context: Optional[SnowparkJobContext] = None,
    output_path: Optional[str] = None,
):
    ...

# Signature of the decorator
def check_output_schema(
    pandera_schema: DataFrameSchema,
    checkpoint_name: str,
    sample_frac: Optional[float] = 1.0,
    sample_number: Optional[int] = None,
    sampling_strategy: Optional[SamplingStrategy] = SamplingStrategy.RANDOM_SAMPLE,
    job_context: Optional[SnowparkJobContext] = None,
    output_path: Optional[str] = None,
):
    ...
```

- `pandera_schema`: Pandera schema to validate.
- `checkpoint_name`: Name of the checkpoint schema file or DataFrame.
- `sample_frac`: Fraction of the DataFrame to sample.
- `sample_number`: Number of rows to sample.
- `sampling_strategy`: Sampling strategy to use.
- `job_context`: Snowpark job context.
- `output_path`: Output path for the checkpoint report.

### Usage Example

#### Check Input Schema Example
```python
from pandas import DataFrame as PandasDataFrame
from pandera import DataFrameSchema, Column, Check
from snowflake.snowpark import Session
from snowflake.snowpark import DataFrame as SnowparkDataFrame
from snowflake.snowpark_checkpoints.checkpoint import check_input_schema
from numpy import int8

df = PandasDataFrame(
    {
        "COLUMN1": [1, 4, 0, 10, 9],
        "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
    }
)

in_schema = DataFrameSchema(
    {
        "COLUMN1": Column(int8, Check(lambda x: 0 <= x <= 10, element_wise=True)),
        "COLUMN2": Column(float, Check(lambda x: x < -1.2, element_wise=True)),
    }
)

@check_input_schema(in_schema, "input_schema_checkpoint")
def preprocessor(dataframe: SnowparkDataFrame):
    dataframe = dataframe.withColumn(
        "COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"]
    )
    return dataframe

session = Session.builder.getOrCreate()
sp_dataframe = session.create_dataframe(df)

preprocessed_dataframe = preprocessor(sp_dataframe)
```

#### Check Input Schema Example
```python
from pandas import DataFrame as PandasDataFrame
from pandera import DataFrameSchema, Column, Check
from snowflake.snowpark import Session
from snowflake.snowpark import DataFrame as SnowparkDataFrame
from snowflake.snowpark_checkpoints.checkpoint import check_output_schema
from numpy import int8

df = PandasDataFrame(
    {
        "COLUMN1": [1, 4, 0, 10, 9],
        "COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
    }
)

out_schema = DataFrameSchema(
    {
        "COLUMN1": Column(int8, Check.between(0, 10, include_max=True, include_min=True)),
        "COLUMN2": Column(float, Check.less_than_or_equal_to(-1.2)),
        "COLUMN3": Column(float, Check.less_than(10)),
    }
)

@check_output_schema(out_schema, "output_schema_checkpoint")
def preprocessor(dataframe: SnowparkDataFrame):
    return dataframe.with_column(
        "COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"]
    )

session = Session.builder.getOrCreate()
sp_dataframe = session.create_dataframe(df)

preprocessed_dataframe = preprocessor(sp_dataframe)
```

------

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "snowpark-checkpoints-validators",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<3.12,>=3.9",
    "maintainer_email": null,
    "keywords": "Snowflake, Snowpark, analytics, cloud, database, db",
    "author": null,
    "author_email": "\"Snowflake, Inc.\" <snowflake-python-libraries-dl@snowflake.com>",
    "download_url": "https://files.pythonhosted.org/packages/95/4f/05be58c373f12acdc82aca73dd167be32db97d834ae7cd70616ebcc2f78e/snowpark_checkpoints_validators-0.1.3.tar.gz",
    "platform": null,
    "description": "# snowpark-checkpoints-validators\n\n---\n##### This package is on Public Preview.\n---\n\n**snowpark-checkpoints-validators** is a package designed to validate Snowpark DataFrames against predefined schemas and checkpoints. This package ensures data integrity and consistency by performing schema and data validation checks at various stages of a Snowpark pipeline.\n\n## Features\n\n- Validate Snowpark DataFrames against predefined Pandera schemas.\n- Perform custom checks and skip specific checks as needed.\n- Generate validation results and log them for further analysis.\n- Support for sampling strategies to validate large datasets efficiently.\n- Integration with PySpark for cross-validation between Snowpark and PySpark DataFrames.\n\n## Functionalities\n\n### Validate DataFrame Schema from File\n\nThe `validate_dataframe_checkpoint` function validates a Snowpark DataFrame against a checkpoint schema file or dataframe.\n\n```python\nfrom snowflake.snowpark import DataFrame as SnowparkDataFrame\nfrom snowflake.snowpark_checkpoints.job_context import SnowparkJobContext\nfrom snowflake.snowpark_checkpoints.utils.constant import (\n    CheckpointMode,\n)\nfrom snowflake.snowpark_checkpoints.spark_migration import SamplingStrategy\nfrom typing import Any, Optional\n\n# Signature of the function\ndef validate_dataframe_checkpoint(\n    df: SnowparkDataFrame,\n    checkpoint_name: str,\n    job_context: Optional[SnowparkJobContext] = None,\n    mode: Optional[CheckpointMode] = CheckpointMode.SCHEMA,\n    custom_checks: Optional[dict[Any, Any]] = None,\n    skip_checks: Optional[dict[Any, Any]] = None,\n    sample_frac: Optional[float] = 1.0,\n    sample_number: Optional[int] = None,\n    sampling_strategy: Optional[SamplingStrategy] = SamplingStrategy.RANDOM_SAMPLE,\n    output_path: Optional[str] = None,\n):\n...\n```\n\n- `df`: Snowpark dataframe to validate.\n- `checkpoint_name`: Name of the checkpoint schema file or dataframe.\n- `job_context`: Snowpark job context.\n- `mode`: Checkpoint mode (schema or data).\n- `custom_checks`: Custom checks to perform.\n- `skip_checks`: Checks to skip.\n- `sample_frac`: Fraction of the dataframe to sample.\n- `sample_number`: Number of rows to sample.\n- `sampling_strategy`: Sampling strategy to use.\n- `output_path`: Output path for the checkpoint report.\n\n### Usage Example\n\n```python\nfrom snowflake.snowpark import Session\nfrom snowflake.snowpark_checkpoints.utils.constant import (\n    CheckpointMode,\n)\nfrom snowflake.snowpark_checkpoints.checkpoint import validate_dataframe_checkpoint\nfrom snowflake.snowpark_checkpoints.spark_migration import SamplingStrategy\nfrom snowflake.snowpark_checkpoints.job_context import SnowparkJobContext\nfrom pyspark.sql import SparkSession\n\nsession = Session.builder.getOrCreate()\njob_context = SnowparkJobContext(\n    session, SparkSession.builder.getOrCreate(), \"job_context\", True\n)\ndf = session.read.format(\"csv\").load(\"data.csv\")\n\nvalidate_dataframe_checkpoint(\n    df,\n    \"schema_checkpoint\",\n    job_context=job_context,\n    mode=CheckpointMode.SCHEMA,\n    sample_frac=0.1,\n    sampling_strategy=SamplingStrategy.RANDOM_SAMPLE\n)\n```\n\n### Check with Spark Decorator\n\nThe `check_with_spark` decorator converts any Snowpark dataframe arguments to a function, samples them, and converts them to PySpark dataframe. It then executes a provided Spark function and compares the outputs between the two implementations.\n\n```python\nfrom snowflake.snowpark_checkpoints.job_context import SnowparkJobContext\nfrom snowflake.snowpark_checkpoints.spark_migration import SamplingStrategy\nfrom typing import Callable, Optional, TypeVar\n\nfn = TypeVar(\"F\", bound=Callable)\n\n# Signature of the decorator\ndef check_with_spark(\n    job_context: Optional[SnowparkJobContext],\n    spark_function: fn,\n    checkpoint_name: str,\n    sample_number: Optional[int] = 100,\n    sampling_strategy: Optional[SamplingStrategy] = SamplingStrategy.RANDOM_SAMPLE,\n    output_path: Optional[str] = None,\n) -> Callable[[fn], fn]:\n    ...\n```\n\n- `job_context`: Snowpark job context.\n- `spark_function`: PySpark function to execute.\n- `checkpoint_name`: Name of the check.\n- `sample_number`: Number of rows to sample.\n- `sampling_strategy`: Sampling strategy to use.\n- `output_path`: Output path for the checkpoint report.\n\n### Usage Example\n\n```python\nfrom snowflake.snowpark import Session\nfrom snowflake.snowpark import DataFrame as SnowparkDataFrame\nfrom snowflake.snowpark_checkpoints.spark_migration import check_with_spark\nfrom snowflake.snowpark_checkpoints.job_context import SnowparkJobContext\nfrom pyspark.sql import DataFrame as SparkDataFrame, SparkSession\n\nsession = Session.builder.getOrCreate()\njob_context = SnowparkJobContext(\n    session, SparkSession.builder.getOrCreate(), \"job_context\", True\n)\n\ndef my_spark_scalar_fn(df: SparkDataFrame):\n    return df.count()\n\n@check_with_spark(\n    job_context=job_context,\n    spark_function=my_spark_scalar_fn,\n    checkpoint_name=\"count_checkpoint\",\n)\ndef my_snowpark_scalar_fn(df: SnowparkDataFrame):\n    return df.count()\n\ndf = job_context.snowpark_session.create_dataframe(\n    [[1, 2], [3, 4]], schema=[\"a\", \"b\"]\n)\ncount = my_snowpark_scalar_fn(df)\n```\n\n### Pandera Snowpark Decorators\n\nThe decorators `@check_input_schema` and `@check_output_schema` allow for sampled schema validation of Snowpark dataframes in the input arguments or in the return value.\n\n```python\nfrom snowflake.snowpark_checkpoints.spark_migration import SamplingStrategy\nfrom snowflake.snowpark_checkpoints.job_context import SnowparkJobContext\nfrom pandera import DataFrameSchema\nfrom typing import Optional\n\n# Signature of the decorator\ndef check_input_schema(\n    pandera_schema: DataFrameSchema,\n    checkpoint_name: str,\n    sample_frac: Optional[float] = 1.0,\n    sample_number: Optional[int] = None,\n    sampling_strategy: Optional[SamplingStrategy] = SamplingStrategy.RANDOM_SAMPLE,\n    job_context: Optional[SnowparkJobContext] = None,\n    output_path: Optional[str] = None,\n):\n    ...\n\n# Signature of the decorator\ndef check_output_schema(\n    pandera_schema: DataFrameSchema,\n    checkpoint_name: str,\n    sample_frac: Optional[float] = 1.0,\n    sample_number: Optional[int] = None,\n    sampling_strategy: Optional[SamplingStrategy] = SamplingStrategy.RANDOM_SAMPLE,\n    job_context: Optional[SnowparkJobContext] = None,\n    output_path: Optional[str] = None,\n):\n    ...\n```\n\n- `pandera_schema`: Pandera schema to validate.\n- `checkpoint_name`: Name of the checkpoint schema file or DataFrame.\n- `sample_frac`: Fraction of the DataFrame to sample.\n- `sample_number`: Number of rows to sample.\n- `sampling_strategy`: Sampling strategy to use.\n- `job_context`: Snowpark job context.\n- `output_path`: Output path for the checkpoint report.\n\n### Usage Example\n\n#### Check Input Schema Example\n```python\nfrom pandas import DataFrame as PandasDataFrame\nfrom pandera import DataFrameSchema, Column, Check\nfrom snowflake.snowpark import Session\nfrom snowflake.snowpark import DataFrame as SnowparkDataFrame\nfrom snowflake.snowpark_checkpoints.checkpoint import check_input_schema\nfrom numpy import int8\n\ndf = PandasDataFrame(\n    {\n        \"COLUMN1\": [1, 4, 0, 10, 9],\n        \"COLUMN2\": [-1.3, -1.4, -2.9, -10.1, -20.4],\n    }\n)\n\nin_schema = DataFrameSchema(\n    {\n        \"COLUMN1\": Column(int8, Check(lambda x: 0 <= x <= 10, element_wise=True)),\n        \"COLUMN2\": Column(float, Check(lambda x: x < -1.2, element_wise=True)),\n    }\n)\n\n@check_input_schema(in_schema, \"input_schema_checkpoint\")\ndef preprocessor(dataframe: SnowparkDataFrame):\n    dataframe = dataframe.withColumn(\n        \"COLUMN3\", dataframe[\"COLUMN1\"] + dataframe[\"COLUMN2\"]\n    )\n    return dataframe\n\nsession = Session.builder.getOrCreate()\nsp_dataframe = session.create_dataframe(df)\n\npreprocessed_dataframe = preprocessor(sp_dataframe)\n```\n\n#### Check Input Schema Example\n```python\nfrom pandas import DataFrame as PandasDataFrame\nfrom pandera import DataFrameSchema, Column, Check\nfrom snowflake.snowpark import Session\nfrom snowflake.snowpark import DataFrame as SnowparkDataFrame\nfrom snowflake.snowpark_checkpoints.checkpoint import check_output_schema\nfrom numpy import int8\n\ndf = PandasDataFrame(\n    {\n        \"COLUMN1\": [1, 4, 0, 10, 9],\n        \"COLUMN2\": [-1.3, -1.4, -2.9, -10.1, -20.4],\n    }\n)\n\nout_schema = DataFrameSchema(\n    {\n        \"COLUMN1\": Column(int8, Check.between(0, 10, include_max=True, include_min=True)),\n        \"COLUMN2\": Column(float, Check.less_than_or_equal_to(-1.2)),\n        \"COLUMN3\": Column(float, Check.less_than(10)),\n    }\n)\n\n@check_output_schema(out_schema, \"output_schema_checkpoint\")\ndef preprocessor(dataframe: SnowparkDataFrame):\n    return dataframe.with_column(\n        \"COLUMN3\", dataframe[\"COLUMN1\"] + dataframe[\"COLUMN2\"]\n    )\n\nsession = Session.builder.getOrCreate()\nsp_dataframe = session.create_dataframe(df)\n\npreprocessed_dataframe = preprocessor(sp_dataframe)\n```\n\n------\n",
    "bugtrack_url": null,
    "license": "Apache License, Version 2.0",
    "summary": "Migration tools for Snowpark",
    "version": "0.1.3",
    "project_urls": {
        "Bug Tracker": "https://github.com/snowflakedb/snowpark-checkpoints/issues",
        "Source code": "https://github.com/snowflakedb/snowpark-checkpoints/"
    },
    "split_keywords": [
        "snowflake",
        " snowpark",
        " analytics",
        " cloud",
        " database",
        " db"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "d0610a361f2ca887978aa3a23984b927765ee4848648723f9ec3c8339707f129",
                "md5": "123a8f0a785e3144ef0fd31e555b0b50",
                "sha256": "33dd12148c07223d5f2c13c2557cbec516700d8d3e548a6be81d5246637fe25f"
            },
            "downloads": -1,
            "filename": "snowpark_checkpoints_validators-0.1.3-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "123a8f0a785e3144ef0fd31e555b0b50",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<3.12,>=3.9",
            "size": 41251,
            "upload_time": "2025-02-07T20:19:22",
            "upload_time_iso_8601": "2025-02-07T20:19:22.244270Z",
            "url": "https://files.pythonhosted.org/packages/d0/61/0a361f2ca887978aa3a23984b927765ee4848648723f9ec3c8339707f129/snowpark_checkpoints_validators-0.1.3-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "954f05be58c373f12acdc82aca73dd167be32db97d834ae7cd70616ebcc2f78e",
                "md5": "ce1f5c3519b05f279dde83e12d0c3197",
                "sha256": "e3331a967516b54fbbf91ea2667791358409704da8b1d9f054c96a5566a2f9be"
            },
            "downloads": -1,
            "filename": "snowpark_checkpoints_validators-0.1.3.tar.gz",
            "has_sig": false,
            "md5_digest": "ce1f5c3519b05f279dde83e12d0c3197",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<3.12,>=3.9",
            "size": 46611,
            "upload_time": "2025-02-07T20:19:32",
            "upload_time_iso_8601": "2025-02-07T20:19:32.363368Z",
            "url": "https://files.pythonhosted.org/packages/95/4f/05be58c373f12acdc82aca73dd167be32db97d834ae7cd70616ebcc2f78e/snowpark_checkpoints_validators-0.1.3.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-02-07 20:19:32",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "snowflakedb",
    "github_project": "snowpark-checkpoints",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "snowpark-checkpoints-validators"
}
        
Elapsed time: 0.40234s