# snowpark-checkpoints
Snowpark Python / Spark Migration Testing Tools
[](https://github.com/snowflakedb/snowpark-checkpoints/actions/workflows/snowpark-checkpoints-all-tests.yml)
[](https://codecov.io/gh/snowflakedb/snowpark-checkpoints)
[](https://pypi.org/project/snowpark-checkpoints)
[](http://www.apache.org/licenses/LICENSE-2.0.txt)
[](https://github.com/psf/black)
The **snowpark-checkpoints** package is a testing library that will help you validate your migrated Snowpark code and discover any behavioral differences with the original Apache PySpark code.
[Source code][source code] | [Snowpark Checkpoints Developer guide][Snowpark Checkpoints Developer guide] | [Snowpark Checkpoints API references][Snowpark Checkpoints API references]
---
##### This package is on Public Preview.
---
---
## Install the library
```bash
pip install snowpark-checkpoints
```
This package requires PySpark to be installed in the same environment. If you do not have it, you can install PySpark alongside Snowpark Checkpoints by running the following command:
```bash
pip install "snowpark-checkpoints[pyspark]"
```
---
## Getting started
This bundle includes:
- **snowpark-checkpoints-collectors**: Extracts information from PySpark dataframes for validation against Snowpark dataframes.
- **snowpark-checkpoints-validators**: Validates Snowpark dataframes against predefined schemas and checkpoints.
- **snowpark-checkpoints-hypothesis**: Generates Snowpark dataframes using the Hypothesis library for testing and data generation.
- **snowpark-checkpoints-configuration**: Loads `checkpoint.json` and provides a model, working automatically with collectors and validators.
---
## snowpark-checkpoints-collectors
**snowpark-checkpoints-collector** package offers a function for extracting information from PySpark dataframes. We can then use that data to validate against the converted Snowpark dataframes to ensure that behavioral equivalence has been achieved.
## Features
- Schema inference collected data mode (Schema): This is the default mode, which leverages Pandera schema inference to obtain the metadata and checks that will be evaluated for the specified dataframe. This mode also collects custom data from columns of the DataFrame based on the PySpark type.
- DataFrame collected data mode (DataFrame): This mode collects the data of the PySpark dataframe. In this case, the mechanism saves all data of the given dataframe in parquet format. Using the default user Snowflake connection, it tries to upload the parquet files into the Snowflake temporal stage and create a table based on the information in the stage. The name of the file and the table is the same as the checkpoint.
## Functionalities
### Collect DataFrame Checkpoint
```python
from pyspark.sql import DataFrame as SparkDataFrame
from snowflake.snowpark_checkpoints_collector.collection_common import CheckpointMode
from typing import Optional
# Signature of the function
def collect_dataframe_checkpoint(
df: SparkDataFrame,
checkpoint_name: str,
sample: Optional[float] = None,
mode: Optional[CheckpointMode] = None,
output_path: Optional[str] = None,
) -> None:
...
```
- `df`: The input Spark dataframe to collect.
- `checkpoint_name`: Name of the checkpoint schema file or dataframe.
- `sample`: Fraction of DataFrame to sample for schema inference, defaults to 1.0.
- `mode`: The mode to execution the collection (Schema or Dataframe), defaults to CheckpointMode.Schema.
- `output_path`: The output path to save the checkpoint, defaults to current working directory.
## Usage Example
### Schema mode
```python
from pyspark.sql import SparkSession
from snowflake.snowpark_checkpoints_collector import collect_dataframe_checkpoint
from snowflake.snowpark_checkpoints_collector.collection_common import CheckpointMode
spark_session = SparkSession.builder.getOrCreate()
sample_size = 1.0
pyspark_df = spark_session.createDataFrame(
[("apple", 21), ("lemon", 34), ("banana", 50)], schema="fruit string, age integer"
)
collect_dataframe_checkpoint(
pyspark_df,
checkpoint_name="collect_checkpoint_mode_1",
sample=sample_size,
mode=CheckpointMode.SCHEMA,
)
```
### Dataframe mode
```python
from pyspark.sql import SparkSession
from snowflake.snowpark_checkpoints_collector import collect_dataframe_checkpoint
from snowflake.snowpark_checkpoints_collector.collection_common import CheckpointMode
from pyspark.sql.types import StructType, StructField, ByteType, StringType, IntegerType
spark_schema = StructType(
[
StructField("BYTE", ByteType(), True),
StructField("STRING", StringType(), True),
StructField("INTEGER", IntegerType(), True)
]
)
data = [(1, "apple", 21), (2, "lemon", 34), (3, "banana", 50)]
spark_session = SparkSession.builder.getOrCreate()
pyspark_df = spark_session.createDataFrame(data, schema=spark_schema).orderBy(
"INTEGER"
)
collect_dataframe_checkpoint(
pyspark_df,
checkpoint_name="collect_checkpoint_mode_2",
mode=CheckpointMode.DATAFRAME,
)
```
---
# snowpark-checkpoints-validators
**snowpark-checkpoints-validators** is a package designed to validate Snowpark DataFrames against predefined schemas and checkpoints. This package ensures data integrity and consistency by performing schema and data validation checks at various stages of a Snowpark pipeline.
## Features
- Validate Snowpark DataFrames against predefined Pandera schemas.
- Perform custom checks and skip specific checks as needed.
- Generate validation results and log them for further analysis.
- Support for sampling strategies to validate large datasets efficiently.
- Integration with PySpark for cross-validation between Snowpark and PySpark DataFrames.
## Functionalities
### Validate DataFrame Schema from File
The `validate_dataframe_checkpoint` function validates a Snowpark DataFrame against a checkpoint schema file or dataframe.
```python
from snowflake.snowpark import DataFrame as SnowparkDataFrame
from snowflake.snowpark_checkpoints.job_context import SnowparkJobContext
from snowflake.snowpark_checkpoints.utils.constant import (
CheckpointMode,
)
from snowflake.snowpark_checkpoints.spark_migration import SamplingStrategy
from typing import Any, Optional
# Signature of the function
def validate_dataframe_checkpoint(
df: SnowparkDataFrame,
checkpoint_name: str,
job_context: Optional[SnowparkJobContext] = None,
mode: Optional[CheckpointMode] = CheckpointMode.SCHEMA,
custom_checks: Optional[dict[Any, Any]] = None,
skip_checks: Optional[dict[Any, Any]] = None,
sample_frac: Optional[float] = 1.0,
sample_number: Optional[int] = None,
sampling_strategy: Optional[SamplingStrategy] = SamplingStrategy.RANDOM_SAMPLE,
output_path: Optional[str] = None,
):
...
```
- `df`: Snowpark dataframe to validate.
- `checkpoint_name`: Name of the checkpoint schema file or dataframe.
- `job_context`: Snowpark job context.
- `mode`: Checkpoint mode (schema or data).
- `custom_checks`: Custom checks to perform.
- `skip_checks`: Checks to skip.
- `sample_frac`: Fraction of the dataframe to sample.
- `sample_number`: Number of rows to sample.
- `sampling_strategy`: Sampling strategy to use.
- `output_path`: Output path for the checkpoint report.
### Usage Example
```python
from snowflake.snowpark import Session
from snowflake.snowpark_checkpoints.utils.constant import (
CheckpointMode,
)
from snowflake.snowpark_checkpoints.checkpoint import validate_dataframe_checkpoint
from snowflake.snowpark_checkpoints.spark_migration import SamplingStrategy
from snowflake.snowpark_checkpoints.job_context import SnowparkJobContext
from pyspark.sql import SparkSession
session = Session.builder.getOrCreate()
job_context = SnowparkJobContext(
session, SparkSession.builder.getOrCreate(), "job_context", True
)
df = session.read.format("csv").load("data.csv")
validate_dataframe_checkpoint(
df,
"schema_checkpoint",
job_context=job_context,
mode=CheckpointMode.SCHEMA,
sample_frac=0.1,
sampling_strategy=SamplingStrategy.RANDOM_SAMPLE
)
```
### Check with Spark Decorator
The `check_with_spark` decorator converts any Snowpark dataframe arguments to a function, samples them, and converts them to PySpark dataframe. It then executes a provided Spark function and compares the outputs between the two implementations.
```python
from snowflake.snowpark_checkpoints.job_context import SnowparkJobContext
from snowflake.snowpark_checkpoints.spark_migration import SamplingStrategy
from typing import Callable, Optional, TypeVar
fn = TypeVar("F", bound=Callable)
# Signature of the decorator
def check_with_spark(
job_context: Optional[SnowparkJobContext],
spark_function: fn,
checkpoint_name: str,
sample_number: Optional[int] = 100,
sampling_strategy: Optional[SamplingStrategy] = SamplingStrategy.RANDOM_SAMPLE,
output_path: Optional[str] = None,
) -> Callable[[fn], fn]:
...
```
- `job_context`: Snowpark job context.
- `spark_function`: PySpark function to execute.
- `checkpoint_name`: Name of the check.
- `sample_number`: Number of rows to sample.
- `sampling_strategy`: Sampling strategy to use.
- `output_path`: Output path for the checkpoint report.
### Usage Example
```python
from snowflake.snowpark import Session
from snowflake.snowpark import DataFrame as SnowparkDataFrame
from snowflake.snowpark_checkpoints.spark_migration import check_with_spark
from snowflake.snowpark_checkpoints.job_context import SnowparkJobContext
from pyspark.sql import DataFrame as SparkDataFrame, SparkSession
session = Session.builder.getOrCreate()
job_context = SnowparkJobContext(
session, SparkSession.builder.getOrCreate(), "job_context", True
)
def my_spark_scalar_fn(df: SparkDataFrame):
return df.count()
@check_with_spark(
job_context=job_context,
spark_function=my_spark_scalar_fn,
checkpoint_name="count_checkpoint",
)
def my_snowpark_scalar_fn(df: SnowparkDataFrame):
return df.count()
df = job_context.snowpark_session.create_dataframe(
[[1, 2], [3, 4]], schema=["a", "b"]
)
count = my_snowpark_scalar_fn(df)
```
### Pandera Snowpark Decorators
The decorators `@check_input_schema` and `@check_output_schema` allow for sampled schema validation of Snowpark dataframes in the input arguments or in the return value.
```python
from snowflake.snowpark_checkpoints.spark_migration import SamplingStrategy
from snowflake.snowpark_checkpoints.job_context import SnowparkJobContext
from pandera import DataFrameSchema
from typing import Optional
# Signature of the decorator
def check_input_schema(
pandera_schema: DataFrameSchema,
checkpoint_name: str,
sample_frac: Optional[float] = 1.0,
sample_number: Optional[int] = None,
sampling_strategy: Optional[SamplingStrategy] = SamplingStrategy.RANDOM_SAMPLE,
job_context: Optional[SnowparkJobContext] = None,
output_path: Optional[str] = None,
):
...
# Signature of the decorator
def check_output_schema(
pandera_schema: DataFrameSchema,
checkpoint_name: str,
sample_frac: Optional[float] = 1.0,
sample_number: Optional[int] = None,
sampling_strategy: Optional[SamplingStrategy] = SamplingStrategy.RANDOM_SAMPLE,
job_context: Optional[SnowparkJobContext] = None,
output_path: Optional[str] = None,
):
...
```
- `pandera_schema`: Pandera schema to validate.
- `checkpoint_name`: Name of the checkpoint schema file or DataFrame.
- `sample_frac`: Fraction of the DataFrame to sample.
- `sample_number`: Number of rows to sample.
- `sampling_strategy`: Sampling strategy to use.
- `job_context`: Snowpark job context.
- `output_path`: Output path for the checkpoint report.
### Usage Example
#### Check Input Schema Example
```python
from pandas import DataFrame as PandasDataFrame
from pandera import DataFrameSchema, Column, Check
from snowflake.snowpark import Session
from snowflake.snowpark import DataFrame as SnowparkDataFrame
from snowflake.snowpark_checkpoints.checkpoint import check_input_schema
from numpy import int8
df = PandasDataFrame(
{
"COLUMN1": [1, 4, 0, 10, 9],
"COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
}
)
in_schema = DataFrameSchema(
{
"COLUMN1": Column(int8, Check(lambda x: 0 <= x <= 10, element_wise=True)),
"COLUMN2": Column(float, Check(lambda x: x < -1.2, element_wise=True)),
}
)
@check_input_schema(in_schema, "input_schema_checkpoint")
def preprocessor(dataframe: SnowparkDataFrame):
dataframe = dataframe.withColumn(
"COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"]
)
return dataframe
session = Session.builder.getOrCreate()
sp_dataframe = session.create_dataframe(df)
preprocessed_dataframe = preprocessor(sp_dataframe)
```
#### Check Input Schema Example
```python
from pandas import DataFrame as PandasDataFrame
from pandera import DataFrameSchema, Column, Check
from snowflake.snowpark import Session
from snowflake.snowpark import DataFrame as SnowparkDataFrame
from snowflake.snowpark_checkpoints.checkpoint import check_output_schema
from numpy import int8
df = PandasDataFrame(
{
"COLUMN1": [1, 4, 0, 10, 9],
"COLUMN2": [-1.3, -1.4, -2.9, -10.1, -20.4],
}
)
out_schema = DataFrameSchema(
{
"COLUMN1": Column(int8, Check.between(0, 10, include_max=True, include_min=True)),
"COLUMN2": Column(float, Check.less_than_or_equal_to(-1.2)),
"COLUMN3": Column(float, Check.less_than(10)),
}
)
@check_output_schema(out_schema, "output_schema_checkpoint")
def preprocessor(dataframe: SnowparkDataFrame):
return dataframe.with_column(
"COLUMN3", dataframe["COLUMN1"] + dataframe["COLUMN2"]
)
session = Session.builder.getOrCreate()
sp_dataframe = session.create_dataframe(df)
preprocessed_dataframe = preprocessor(sp_dataframe)
```
---
# snowpark-checkpoints-hypothesis
**snowpark-checkpoints-hypothesis** is a [Hypothesis](https://hypothesis.readthedocs.io/en/latest/) extension for generating Snowpark DataFrames. This project provides strategies to facilitate testing and data generation for Snowpark DataFrames using the Hypothesis library.
## Installation
You can install this package using either **pip** or **conda**:
```shell
pip install snowpark-checkpoints-hypothesis
--or--
conda install snowpark-checkpoints-hypothesis
```
## Usage
The typical workflow for using the Hypothesis library to generate Snowpark dataframes is as follows:
1. Create a standard Python test function with the different assertions or conditions your code should satisfy for all inputs.
2. Add the Hypothesis `@given` decorator to your test function and pass the `dataframe_strategy` function as an argument.
3. Run the test. When the test is executed, Hypothesis will automatically provide the generated inputs as arguments to the test.
### Example 1: Generate Snowpark DataFrames from a JSON schema file
You can use the `dataframe_strategy` function to create Snowpark DataFrames from a JSON schema file generated by the `collect_dataframe_checkpoint` function of the [snowpark-checkpoints-collectors](https://pypi.org/project/snowpark-checkpoints-collectors/) package:
```python
from hypothesis import given
from snowflake.hypothesis_snowpark import dataframe_strategy
from snowflake.snowpark import DataFrame, Session
@given(
df=dataframe_strategy(
schema="path/to/schema.json",
session=Session.builder.getOrCreate(),
size=10,
)
)
def test_my_function(df: DataFrame):
# Test your function here
...
```
### Example 2: Generate Snowpark DataFrames from a Pandera DataFrameSchema object
You can also use the `dataframe_strategy` function to create Snowpark DataFrames from a Pandera DataFrameSchema object:
```python
import pandera as pa
from hypothesis import given
from snowflake.hypothesis_snowpark import dataframe_strategy
from snowflake.snowpark import DataFrame, Session
@given(
df=dataframe_strategy(
schema=pa.DataFrameSchema(
{
"A": pa.Column(pa.Int, checks=pa.Check.in_range(0, 10)),
"B": pa.Column(pa.Bool),
}
),
session=Session.builder.getOrCreate(),
size=10,
)
)
def test_my_function(df: DataFrame):
# Test your function here
...
```
## Development
### Set up a development environment
To set up a development environment, follow the steps below:
1. Create a virtual environment using **venv** or **conda**. Replace \<env-name\> with the name of your environment.
Using **venv**:
```shell
python3.11 -m venv <env-name>
source <env-name>/bin/activate
```
Using **conda**:
```shell
conda create -n <env-name> python=3.11
conda activate <env-name>
```
2. Configure your IDE to use the previously created virtual environment:
* [Configuring a Python interpreter in PyCharm](https://www.jetbrains.com/help/pycharm/configuring-python-interpreter.html)
* [Configuring a Python interpreter in VS Code](https://code.visualstudio.com/docs/python/environments#_manually-specify-an-interpreter)
3. Install the project dependencies:
```shell
pip install hatch
pip install -e .
```
### Running Tests
To run tests, run the following command.
```shell
hatch run test:check
```
---
# snowpark-checkpoints-configuration
**snowpark-checkpoints-configuration** is a module for loading `checkpoint.json` and provides a model.
This module will work automatically with *snowpark-checkpoints-collector* and *snowpark-checkpoints-validators*. This will try to read the configuration file from the current working directory.
## Usage
To explicit load a file, you can import `CheckpointMetadata` and create an instance as shown below:
```python
from snowflake.snowpark_checkpoints_configuration import CheckpointMetadata
my_checkpoint_metadata = CheckpointMetadata("path/to/checkpoint.json")
checkpoint_model = my_checkpoint_metadata.get_checkpoint("my_checkpoint_name")
...
```
---
## Run Demos
### Requirements
- Python >= 3.9
- OpenJDK 21.0.2
- Snow CLI: The default connection needs to have a database and a schema. After running the app, a table called SNOWPARK_CHECKPOINTS_REPORT will be created.
### Steps
1. Create a Python environment with Python 3.9 or higher in the Demos dir.
2. Build the Python snowpark-checkpoints and snowpark-checkpoints-collector packages. Learn more.
```cmd
cd package_dir
pip install -e .
python3 -m pip install --upgrade build
python3 -m build
```
3. In Demos dir, run:
pip install "snowpark-checkpoints"
4. First, run the PySpark demo:
python demo_pyspark_pipeline.py
This will generate the JSON schema files. Then, run the Snowpark demo:
python demo_snowpark_pipeline.py
## Contributing
Please refer to [CONTRIBUTING.md][contributing].
------
[source code]: https://github.com/snowflakedb/snowpark-checkpoints
[Snowpark Checkpoints Developer guide]: https://docs.snowflake.com/en/developer-guide/snowpark/python/snowpark-checkpoints-library
[Snowpark Checkpoints API references]: https://docs.snowflake.com/en/developer-guide/snowpark-checkpoints-api/reference/latest/index
[contributing]: https://github.com/snowflakedb/snowpark-checkpoints/blob/main/CONTRIBUTING.md
Raw data
{
"_id": null,
"home_page": null,
"name": "snowpark-checkpoints",
"maintainer": null,
"docs_url": null,
"requires_python": "<3.12,>=3.9",
"maintainer_email": null,
"keywords": "Snowflake, Snowpark, analytics, cloud, data, data-analysis, data-analytics, data-engineering, data-management, data-processing, data-science, data-visualization, data-warehouse, database",
"author": null,
"author_email": "\"Snowflake, Inc.\" <snowflake-python-libraries-dl@snowflake.com>",
"download_url": "https://files.pythonhosted.org/packages/b7/77/44d45429644d3d2cdcb3fd2d28abc31c04d101585545e53c2e9dcdf11c75/snowpark_checkpoints-0.2.1.tar.gz",
"platform": null,
"description": "# snowpark-checkpoints\n\nSnowpark Python / Spark Migration Testing Tools\n\n\n[](https://github.com/snowflakedb/snowpark-checkpoints/actions/workflows/snowpark-checkpoints-all-tests.yml)\n[](https://codecov.io/gh/snowflakedb/snowpark-checkpoints)\n[](https://pypi.org/project/snowpark-checkpoints)\n[](http://www.apache.org/licenses/LICENSE-2.0.txt)\n[](https://github.com/psf/black)\n\nThe **snowpark-checkpoints** package is a testing library that will help you validate your migrated Snowpark code and discover any behavioral differences with the original Apache PySpark code.\n\n[Source code][source code] | [Snowpark Checkpoints Developer guide][Snowpark Checkpoints Developer guide] | [Snowpark Checkpoints API references][Snowpark Checkpoints API references] \n\n---\n##### This package is on Public Preview.\n---\n---\n## Install the library \n```bash\npip install snowpark-checkpoints\n```\nThis package requires PySpark to be installed in the same environment. If you do not have it, you can install PySpark alongside Snowpark Checkpoints by running the following command:\n```bash\npip install \"snowpark-checkpoints[pyspark]\"\n```\n---\n\n## Getting started\n\nThis bundle includes:\n- **snowpark-checkpoints-collectors**: Extracts information from PySpark dataframes for validation against Snowpark dataframes.\n- **snowpark-checkpoints-validators**: Validates Snowpark dataframes against predefined schemas and checkpoints.\n- **snowpark-checkpoints-hypothesis**: Generates Snowpark dataframes using the Hypothesis library for testing and data generation.\n- **snowpark-checkpoints-configuration**: Loads `checkpoint.json` and provides a model, working automatically with collectors and validators.\n---\n## snowpark-checkpoints-collectors\n\n\n**snowpark-checkpoints-collector** package offers a function for extracting information from PySpark dataframes. We can then use that data to validate against the converted Snowpark dataframes to ensure that behavioral equivalence has been achieved.\n## Features\n\n- Schema inference collected data mode (Schema): This is the default mode, which leverages Pandera schema inference to obtain the metadata and checks that will be evaluated for the specified dataframe. This mode also collects custom data from columns of the DataFrame based on the PySpark type.\n- DataFrame collected data mode (DataFrame): This mode collects the data of the PySpark dataframe. In this case, the mechanism saves all data of the given dataframe in parquet format. Using the default user Snowflake connection, it tries to upload the parquet files into the Snowflake temporal stage and create a table based on the information in the stage. The name of the file and the table is the same as the checkpoint.\n\n\n\n## Functionalities\n\n### Collect DataFrame Checkpoint\n\n\n\n```python\nfrom pyspark.sql import DataFrame as SparkDataFrame\nfrom snowflake.snowpark_checkpoints_collector.collection_common import CheckpointMode\nfrom typing import Optional\n\n# Signature of the function\ndef collect_dataframe_checkpoint(\n df: SparkDataFrame,\n checkpoint_name: str,\n sample: Optional[float] = None,\n mode: Optional[CheckpointMode] = None,\n output_path: Optional[str] = None,\n) -> None:\n ...\n```\n\n- `df`: The input Spark dataframe to collect.\n- `checkpoint_name`: Name of the checkpoint schema file or dataframe.\n- `sample`: Fraction of DataFrame to sample for schema inference, defaults to 1.0.\n- `mode`: The mode to execution the collection (Schema or Dataframe), defaults to CheckpointMode.Schema.\n- `output_path`: The output path to save the checkpoint, defaults to current working directory.\n\n\n## Usage Example\n\n### Schema mode\n\n```python\nfrom pyspark.sql import SparkSession\nfrom snowflake.snowpark_checkpoints_collector import collect_dataframe_checkpoint\nfrom snowflake.snowpark_checkpoints_collector.collection_common import CheckpointMode\n\nspark_session = SparkSession.builder.getOrCreate()\nsample_size = 1.0\n\npyspark_df = spark_session.createDataFrame(\n [(\"apple\", 21), (\"lemon\", 34), (\"banana\", 50)], schema=\"fruit string, age integer\"\n)\n\ncollect_dataframe_checkpoint(\n pyspark_df,\n checkpoint_name=\"collect_checkpoint_mode_1\",\n sample=sample_size,\n mode=CheckpointMode.SCHEMA,\n)\n```\n\n\n### Dataframe mode\n\n```python\nfrom pyspark.sql import SparkSession\nfrom snowflake.snowpark_checkpoints_collector import collect_dataframe_checkpoint\nfrom snowflake.snowpark_checkpoints_collector.collection_common import CheckpointMode\nfrom pyspark.sql.types import StructType, StructField, ByteType, StringType, IntegerType \n\nspark_schema = StructType(\n [\n StructField(\"BYTE\", ByteType(), True),\n StructField(\"STRING\", StringType(), True),\n StructField(\"INTEGER\", IntegerType(), True)\n ]\n)\n\ndata = [(1, \"apple\", 21), (2, \"lemon\", 34), (3, \"banana\", 50)]\n\nspark_session = SparkSession.builder.getOrCreate()\npyspark_df = spark_session.createDataFrame(data, schema=spark_schema).orderBy(\n \"INTEGER\"\n)\n\ncollect_dataframe_checkpoint(\n pyspark_df,\n checkpoint_name=\"collect_checkpoint_mode_2\",\n mode=CheckpointMode.DATAFRAME,\n)\n```\n\n---\n\n# snowpark-checkpoints-validators\n\n**snowpark-checkpoints-validators** is a package designed to validate Snowpark DataFrames against predefined schemas and checkpoints. This package ensures data integrity and consistency by performing schema and data validation checks at various stages of a Snowpark pipeline.\n\n## Features\n\n- Validate Snowpark DataFrames against predefined Pandera schemas.\n- Perform custom checks and skip specific checks as needed.\n- Generate validation results and log them for further analysis.\n- Support for sampling strategies to validate large datasets efficiently.\n- Integration with PySpark for cross-validation between Snowpark and PySpark DataFrames.\n\n## Functionalities\n\n### Validate DataFrame Schema from File\n\nThe `validate_dataframe_checkpoint` function validates a Snowpark DataFrame against a checkpoint schema file or dataframe.\n\n```python\nfrom snowflake.snowpark import DataFrame as SnowparkDataFrame\nfrom snowflake.snowpark_checkpoints.job_context import SnowparkJobContext\nfrom snowflake.snowpark_checkpoints.utils.constant import (\n CheckpointMode,\n)\nfrom snowflake.snowpark_checkpoints.spark_migration import SamplingStrategy\nfrom typing import Any, Optional\n\n# Signature of the function\ndef validate_dataframe_checkpoint(\n df: SnowparkDataFrame,\n checkpoint_name: str,\n job_context: Optional[SnowparkJobContext] = None,\n mode: Optional[CheckpointMode] = CheckpointMode.SCHEMA,\n custom_checks: Optional[dict[Any, Any]] = None,\n skip_checks: Optional[dict[Any, Any]] = None,\n sample_frac: Optional[float] = 1.0,\n sample_number: Optional[int] = None,\n sampling_strategy: Optional[SamplingStrategy] = SamplingStrategy.RANDOM_SAMPLE,\n output_path: Optional[str] = None,\n):\n ...\n```\n\n- `df`: Snowpark dataframe to validate.\n- `checkpoint_name`: Name of the checkpoint schema file or dataframe.\n- `job_context`: Snowpark job context.\n- `mode`: Checkpoint mode (schema or data).\n- `custom_checks`: Custom checks to perform.\n- `skip_checks`: Checks to skip.\n- `sample_frac`: Fraction of the dataframe to sample.\n- `sample_number`: Number of rows to sample.\n- `sampling_strategy`: Sampling strategy to use.\n- `output_path`: Output path for the checkpoint report.\n\n### Usage Example\n\n```python\nfrom snowflake.snowpark import Session\nfrom snowflake.snowpark_checkpoints.utils.constant import (\n CheckpointMode,\n)\nfrom snowflake.snowpark_checkpoints.checkpoint import validate_dataframe_checkpoint\nfrom snowflake.snowpark_checkpoints.spark_migration import SamplingStrategy\nfrom snowflake.snowpark_checkpoints.job_context import SnowparkJobContext\nfrom pyspark.sql import SparkSession\n\nsession = Session.builder.getOrCreate()\njob_context = SnowparkJobContext(\n session, SparkSession.builder.getOrCreate(), \"job_context\", True\n)\ndf = session.read.format(\"csv\").load(\"data.csv\")\n\nvalidate_dataframe_checkpoint(\n df,\n \"schema_checkpoint\",\n job_context=job_context,\n mode=CheckpointMode.SCHEMA,\n sample_frac=0.1,\n sampling_strategy=SamplingStrategy.RANDOM_SAMPLE\n)\n```\n\n### Check with Spark Decorator\n\nThe `check_with_spark` decorator converts any Snowpark dataframe arguments to a function, samples them, and converts them to PySpark dataframe. It then executes a provided Spark function and compares the outputs between the two implementations.\n\n```python\nfrom snowflake.snowpark_checkpoints.job_context import SnowparkJobContext\nfrom snowflake.snowpark_checkpoints.spark_migration import SamplingStrategy\nfrom typing import Callable, Optional, TypeVar\n\nfn = TypeVar(\"F\", bound=Callable)\n\n# Signature of the decorator\ndef check_with_spark(\n job_context: Optional[SnowparkJobContext],\n spark_function: fn,\n checkpoint_name: str,\n sample_number: Optional[int] = 100,\n sampling_strategy: Optional[SamplingStrategy] = SamplingStrategy.RANDOM_SAMPLE,\n output_path: Optional[str] = None,\n) -> Callable[[fn], fn]:\n ...\n```\n\n- `job_context`: Snowpark job context.\n- `spark_function`: PySpark function to execute.\n- `checkpoint_name`: Name of the check.\n- `sample_number`: Number of rows to sample.\n- `sampling_strategy`: Sampling strategy to use.\n- `output_path`: Output path for the checkpoint report.\n\n### Usage Example\n\n```python\nfrom snowflake.snowpark import Session\nfrom snowflake.snowpark import DataFrame as SnowparkDataFrame\nfrom snowflake.snowpark_checkpoints.spark_migration import check_with_spark\nfrom snowflake.snowpark_checkpoints.job_context import SnowparkJobContext\nfrom pyspark.sql import DataFrame as SparkDataFrame, SparkSession\n\nsession = Session.builder.getOrCreate()\njob_context = SnowparkJobContext(\n session, SparkSession.builder.getOrCreate(), \"job_context\", True\n)\n\ndef my_spark_scalar_fn(df: SparkDataFrame):\n return df.count()\n\n@check_with_spark(\n job_context=job_context,\n spark_function=my_spark_scalar_fn,\n checkpoint_name=\"count_checkpoint\",\n)\ndef my_snowpark_scalar_fn(df: SnowparkDataFrame):\n return df.count()\n\ndf = job_context.snowpark_session.create_dataframe(\n [[1, 2], [3, 4]], schema=[\"a\", \"b\"]\n)\ncount = my_snowpark_scalar_fn(df)\n```\n\n### Pandera Snowpark Decorators\n\nThe decorators `@check_input_schema` and `@check_output_schema` allow for sampled schema validation of Snowpark dataframes in the input arguments or in the return value.\n\n```python\nfrom snowflake.snowpark_checkpoints.spark_migration import SamplingStrategy\nfrom snowflake.snowpark_checkpoints.job_context import SnowparkJobContext\nfrom pandera import DataFrameSchema\nfrom typing import Optional\n\n# Signature of the decorator\ndef check_input_schema(\n pandera_schema: DataFrameSchema,\n checkpoint_name: str,\n sample_frac: Optional[float] = 1.0,\n sample_number: Optional[int] = None,\n sampling_strategy: Optional[SamplingStrategy] = SamplingStrategy.RANDOM_SAMPLE,\n job_context: Optional[SnowparkJobContext] = None,\n output_path: Optional[str] = None,\n):\n ...\n\n# Signature of the decorator\ndef check_output_schema(\n pandera_schema: DataFrameSchema,\n checkpoint_name: str,\n sample_frac: Optional[float] = 1.0,\n sample_number: Optional[int] = None,\n sampling_strategy: Optional[SamplingStrategy] = SamplingStrategy.RANDOM_SAMPLE,\n job_context: Optional[SnowparkJobContext] = None,\n output_path: Optional[str] = None,\n):\n ...\n```\n\n- `pandera_schema`: Pandera schema to validate.\n- `checkpoint_name`: Name of the checkpoint schema file or DataFrame.\n- `sample_frac`: Fraction of the DataFrame to sample.\n- `sample_number`: Number of rows to sample.\n- `sampling_strategy`: Sampling strategy to use.\n- `job_context`: Snowpark job context.\n- `output_path`: Output path for the checkpoint report.\n\n### Usage Example\n\n#### Check Input Schema Example\n```python\nfrom pandas import DataFrame as PandasDataFrame\nfrom pandera import DataFrameSchema, Column, Check\nfrom snowflake.snowpark import Session\nfrom snowflake.snowpark import DataFrame as SnowparkDataFrame\nfrom snowflake.snowpark_checkpoints.checkpoint import check_input_schema\nfrom numpy import int8\n\ndf = PandasDataFrame(\n {\n \"COLUMN1\": [1, 4, 0, 10, 9],\n \"COLUMN2\": [-1.3, -1.4, -2.9, -10.1, -20.4],\n }\n)\n\nin_schema = DataFrameSchema(\n {\n \"COLUMN1\": Column(int8, Check(lambda x: 0 <= x <= 10, element_wise=True)),\n \"COLUMN2\": Column(float, Check(lambda x: x < -1.2, element_wise=True)),\n }\n)\n\n@check_input_schema(in_schema, \"input_schema_checkpoint\")\ndef preprocessor(dataframe: SnowparkDataFrame):\n dataframe = dataframe.withColumn(\n \"COLUMN3\", dataframe[\"COLUMN1\"] + dataframe[\"COLUMN2\"]\n )\n return dataframe\n\nsession = Session.builder.getOrCreate()\nsp_dataframe = session.create_dataframe(df)\n\npreprocessed_dataframe = preprocessor(sp_dataframe)\n```\n\n#### Check Input Schema Example\n```python\nfrom pandas import DataFrame as PandasDataFrame\nfrom pandera import DataFrameSchema, Column, Check\nfrom snowflake.snowpark import Session\nfrom snowflake.snowpark import DataFrame as SnowparkDataFrame\nfrom snowflake.snowpark_checkpoints.checkpoint import check_output_schema\nfrom numpy import int8\n\ndf = PandasDataFrame(\n {\n \"COLUMN1\": [1, 4, 0, 10, 9],\n \"COLUMN2\": [-1.3, -1.4, -2.9, -10.1, -20.4],\n }\n)\n\nout_schema = DataFrameSchema(\n {\n \"COLUMN1\": Column(int8, Check.between(0, 10, include_max=True, include_min=True)),\n \"COLUMN2\": Column(float, Check.less_than_or_equal_to(-1.2)),\n \"COLUMN3\": Column(float, Check.less_than(10)),\n }\n)\n\n@check_output_schema(out_schema, \"output_schema_checkpoint\")\ndef preprocessor(dataframe: SnowparkDataFrame):\n return dataframe.with_column(\n \"COLUMN3\", dataframe[\"COLUMN1\"] + dataframe[\"COLUMN2\"]\n )\n\nsession = Session.builder.getOrCreate()\nsp_dataframe = session.create_dataframe(df)\n\npreprocessed_dataframe = preprocessor(sp_dataframe)\n```\n\n---\n\n# snowpark-checkpoints-hypothesis\n\n**snowpark-checkpoints-hypothesis** is a [Hypothesis](https://hypothesis.readthedocs.io/en/latest/) extension for generating Snowpark DataFrames. This project provides strategies to facilitate testing and data generation for Snowpark DataFrames using the Hypothesis library.\n\n## Installation\n\nYou can install this package using either **pip** or **conda**:\n\n```shell\npip install snowpark-checkpoints-hypothesis\n--or--\nconda install snowpark-checkpoints-hypothesis\n```\n\n## Usage\n\nThe typical workflow for using the Hypothesis library to generate Snowpark dataframes is as follows:\n\n1. Create a standard Python test function with the different assertions or conditions your code should satisfy for all inputs.\n2. Add the Hypothesis `@given` decorator to your test function and pass the `dataframe_strategy` function as an argument.\n3. Run the test. When the test is executed, Hypothesis will automatically provide the generated inputs as arguments to the test.\n\n### Example 1: Generate Snowpark DataFrames from a JSON schema file\n\nYou can use the `dataframe_strategy` function to create Snowpark DataFrames from a JSON schema file generated by the `collect_dataframe_checkpoint` function of the [snowpark-checkpoints-collectors](https://pypi.org/project/snowpark-checkpoints-collectors/) package:\n\n```python\nfrom hypothesis import given\nfrom snowflake.hypothesis_snowpark import dataframe_strategy\nfrom snowflake.snowpark import DataFrame, Session\n\n\n@given(\n df=dataframe_strategy(\n schema=\"path/to/schema.json\",\n session=Session.builder.getOrCreate(),\n size=10,\n )\n)\ndef test_my_function(df: DataFrame):\n # Test your function here\n ...\n```\n\n### Example 2: Generate Snowpark DataFrames from a Pandera DataFrameSchema object\n\nYou can also use the `dataframe_strategy` function to create Snowpark DataFrames from a Pandera DataFrameSchema object:\n\n```python\nimport pandera as pa\nfrom hypothesis import given\nfrom snowflake.hypothesis_snowpark import dataframe_strategy\nfrom snowflake.snowpark import DataFrame, Session\n\n@given(\n df=dataframe_strategy(\n schema=pa.DataFrameSchema(\n {\n \"A\": pa.Column(pa.Int, checks=pa.Check.in_range(0, 10)),\n \"B\": pa.Column(pa.Bool),\n }\n ),\n session=Session.builder.getOrCreate(),\n size=10,\n )\n)\ndef test_my_function(df: DataFrame):\n # Test your function here\n ...\n```\n\n## Development\n\n### Set up a development environment\n\nTo set up a development environment, follow the steps below:\n\n1. Create a virtual environment using **venv** or **conda**. Replace \\<env-name\\> with the name of your environment.\n\n Using **venv**:\n\n ```shell\n python3.11 -m venv <env-name>\n source <env-name>/bin/activate\n ```\n\n Using **conda**:\n\n ```shell\n conda create -n <env-name> python=3.11\n conda activate <env-name>\n ```\n\n2. Configure your IDE to use the previously created virtual environment:\n\n * [Configuring a Python interpreter in PyCharm](https://www.jetbrains.com/help/pycharm/configuring-python-interpreter.html)\n * [Configuring a Python interpreter in VS Code](https://code.visualstudio.com/docs/python/environments#_manually-specify-an-interpreter)\n\n3. Install the project dependencies:\n\n ```shell\n pip install hatch\n pip install -e .\n ```\n\n### Running Tests\n\nTo run tests, run the following command.\n\n```shell\nhatch run test:check\n```\n\n---\n\n# snowpark-checkpoints-configuration\n\n\n**snowpark-checkpoints-configuration** is a module for loading `checkpoint.json` and provides a model. \nThis module will work automatically with *snowpark-checkpoints-collector* and *snowpark-checkpoints-validators*. This will try to read the configuration file from the current working directory.\n\n## Usage\n\nTo explicit load a file, you can import `CheckpointMetadata` and create an instance as shown below:\n\n```python\nfrom snowflake.snowpark_checkpoints_configuration import CheckpointMetadata\n\nmy_checkpoint_metadata = CheckpointMetadata(\"path/to/checkpoint.json\")\n\ncheckpoint_model = my_checkpoint_metadata.get_checkpoint(\"my_checkpoint_name\")\n...\n```\n---\n\n## Run Demos\n\n### Requirements\n\n- Python >= 3.9\n- OpenJDK 21.0.2\n- Snow CLI: The default connection needs to have a database and a schema. After running the app, a table called SNOWPARK_CHECKPOINTS_REPORT will be created.\n\n### Steps\n\n1. Create a Python environment with Python 3.9 or higher in the Demos dir.\n2. Build the Python snowpark-checkpoints and snowpark-checkpoints-collector packages. Learn more.\n\n```cmd\ncd package_dir\npip install -e .\npython3 -m pip install --upgrade build\npython3 -m build\n```\n\n3. In Demos dir, run:\n pip install \"snowpark-checkpoints\"\n4. First, run the PySpark demo:\n python demo_pyspark_pipeline.py\n This will generate the JSON schema files. Then, run the Snowpark demo:\n python demo_snowpark_pipeline.py\n\n## Contributing\nPlease refer to [CONTRIBUTING.md][contributing].\n\n------\n\n[source code]: https://github.com/snowflakedb/snowpark-checkpoints\n[Snowpark Checkpoints Developer guide]: https://docs.snowflake.com/en/developer-guide/snowpark/python/snowpark-checkpoints-library\n[Snowpark Checkpoints API references]: https://docs.snowflake.com/en/developer-guide/snowpark-checkpoints-api/reference/latest/index\n[contributing]: https://github.com/snowflakedb/snowpark-checkpoints/blob/main/CONTRIBUTING.md\n",
"bugtrack_url": null,
"license": "Apache License, Version 2.0",
"summary": "Snowflake Snowpark Checkpoints",
"version": "0.2.1",
"project_urls": {
"Documentation": "https://docs.snowflake.com/en/developer-guide/snowpark/python/snowpark-checkpoints-library",
"Issues": "https://github.com/snowflakedb/snowpark-checkpoints/issues",
"Source": "https://github.com/snowflakedb/snowpark-checkpoints"
},
"split_keywords": [
"snowflake",
" snowpark",
" analytics",
" cloud",
" data",
" data-analysis",
" data-analytics",
" data-engineering",
" data-management",
" data-processing",
" data-science",
" data-visualization",
" data-warehouse",
" database"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "79705bc8dea44f896efa32d112727f004765905330f14f76bbbe21422b6b4b0d",
"md5": "cbe97751f66790e108322698da78c696",
"sha256": "6f73f614d6228c4bec46c3f1ae272738388fdeaf9fb89f4359db35cc77dba571"
},
"downloads": -1,
"filename": "snowpark_checkpoints-0.2.1-py3-none-any.whl",
"has_sig": false,
"md5_digest": "cbe97751f66790e108322698da78c696",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<3.12,>=3.9",
"size": 127570,
"upload_time": "2025-04-07T13:40:37",
"upload_time_iso_8601": "2025-04-07T13:40:37.034681Z",
"url": "https://files.pythonhosted.org/packages/79/70/5bc8dea44f896efa32d112727f004765905330f14f76bbbe21422b6b4b0d/snowpark_checkpoints-0.2.1-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "b77744d45429644d3d2cdcb3fd2d28abc31c04d101585545e53c2e9dcdf11c75",
"md5": "7816128ec98c45f37b76eb8c097c3a4c",
"sha256": "b0c58f47d2b921e94d3bdc12f8531cb34c2a84e50caa401ab6ee7292515aed95"
},
"downloads": -1,
"filename": "snowpark_checkpoints-0.2.1.tar.gz",
"has_sig": false,
"md5_digest": "7816128ec98c45f37b76eb8c097c3a4c",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<3.12,>=3.9",
"size": 72401,
"upload_time": "2025-04-07T13:40:41",
"upload_time_iso_8601": "2025-04-07T13:40:41.923014Z",
"url": "https://files.pythonhosted.org/packages/b7/77/44d45429644d3d2cdcb3fd2d28abc31c04d101585545e53c2e9dcdf11c75/snowpark_checkpoints-0.2.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-04-07 13:40:41",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "snowflakedb",
"github_project": "snowpark-checkpoints",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "snowpark-checkpoints"
}