snowpark-checkpoints


Namesnowpark-checkpoints JSON
Version 0.1.0rc1 PyPI version JSON
download
home_pageNone
SummarySnowflake Snowpark Checkpoints
upload_time2025-01-15 19:39:58
maintainerNone
docs_urlNone
authorNone
requires_python<3.13,>=3.9
licenseApache-2.0
keywords snowflake snowpark analytics cloud data data-analysis data-analytics data-engineering data-management data-processing data-science data-visualization data-warehouse database
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # snowpark-checkpoints

Snowpark Python / Spark Migration Testing Tools

---
**NOTE**

This package is on Private Preview.

---

# Data Collection from Spark Pipelines

The `snowpark-checkpoints-collector` package can collect
schema and check information from a spark pipeline and
record those results into a set of JSON files corresponding to different intermediate dataframes. These files can be inspected manually
and handed over to teams implementing the snowpark pipeline. The `snowpark-checkpoints-collector` package is designed to have minimal
dependencies and the generated files are meant to be inspected by security
teams.

On the snowpark side the `snowpark-checkpoints` package can use these files to perform schema and data validation checks against snowpark dataframes at the same, intermediate logical "checkpoints".

## collect_dataframe_schema

```
from snowflake.snowpark_checkpoints_collector import collect_dataframe_schema;
collect_dataframe_schema(df:SparkDataFrame,
                              checkpoint_name,
                              sample=0.1)
```

- df - the spark data frame to collect the schema from
- checkpoint_name - the name of the "checkpoint". Generated JSON files
  will have the name "snowpark-[checkpoint_name]-schema.json"
- sample - sample size of the spark data frame to use to generate the schema

### Validate DataFrame Schema from File

The `validate_dataframe_checkpoint` function validates a Snowpark DataFrame against a checkpoint schema file or dataframe.

```python
from snowflake.snowpark_checkpoints.checkpoint import check_dataframe_schema_file

validate_dataframe_checkpoint(
    df: SnowparkDataFrame,
    checkpoint_name: str,
    job_context: Optional[SnowparkJobContext] = None,
    mode: Optional[CheckpointMode] = CheckpointMode.SCHEMA,
    custom_checks: Optional[dict[Any, Any]] = None,
    skip_checks: Optional[dict[Any, Any]] = None,
    sample_frac: Optional[float] = 1.0,
    sample_number: Optional[int] = None,
    sampling_strategy: Optional[SamplingStrategy] = SamplingStrategy.RANDOM_SAMPLE,
    output_path: Optional[str] = None,
)
```

- `df`: Snowpark DataFrame to validate.
- `checkpoint_name`: Name of the checkpoint schema file or DataFrame.
- `job_context`: Snowpark job context.
- `mode`: Checkpoint mode (schema or data).
- `custom_checks`: Custom checks to perform.
- `skip_checks`: Checks to skip.
- `sample_frac`: Fraction of the DataFrame to sample.
- `sample_number`: Number of rows to sample.
- `sampling_strategy`: Sampling strategy to use.
- `output_path`: Output path for the checkpoint report.

### Check with Spark Decorator

The `check_with_spark` decorator converts any Snowpark DataFrame arguments to a function, samples them, and converts them to PySpark DataFrames. It then executes a provided Spark function and compares the outputs between the two implementations.

```python
from snowflake.snowpark_checkpoints.spark_migration import check_with_spark

@check_with_spark(
    job_context: Optional[SnowparkJobContext],
    spark_function: Callable,
    checkpoint_name: str,
    sample_number: Optional[int] = 100,
    sampling_strategy: Optional[SamplingStrategy] = SamplingStrategy.RANDOM_SAMPLE,
    check_dtypes: Optional[bool] = False,
    check_with_precision: Optional[bool] = False
    output_path: Optional[str] = None,
)
def snowpark_fn(df: SnowparkDataFrame):
    ...
```

- `job_context`: Snowpark job context.
- `spark_function`: PySpark function to execute.
- `checkpoint_name`: Name of the check.
- `sample_number`: Number of rows to sample.
- `sampling_strategy`: Sampling strategy to use.
- `check_dtypes`: Check data types.
- `check_with_precision`: Check with precision.
- `output_path`: Output path for the checkpoint report.

## Pandera Snowpark Decorators

The decorators `@check_input_schema` and `@check_output_schema` allow
for sampled schema validation of snowpark dataframes in the input arguments or
in the return value.

```python
from snowflake.snowpark_checkpoints.checkpoint import check_input_schema, check_output_schema

@check_input_schema(
    pandera_schema: DataFrameSchema,
    checkpoint_name: Optional[str] = None,
    sample_frac: Optional[float] = 1.0,
    sample_number: Optional[int] = 100,
    sampling_strategy: Optional[SamplingStrategy] = SamplingStrategy.RANDOM_SAMPLE,
    job_context: Optional[SnowparkJobContext] = None,
    output_path: Optional[str] = None,
)
def snowpark_fn(df: SnowparkDataFrame):
    ...

@check_output_schema(
    pandera_schema: DataFrameSchema,
    checkpoint_name: Optional[str] = None,
    sample_frac: Optional[float] = 1.0,
    sample_number: Optional[int] = None,
    sampling_strategy: Optional[SamplingStrategy] = SamplingStrategy.RANDOM_SAMPLE,
    job_context: Optional[SnowparkJobContext] = None,
    output_path: Optional[str] = None,
)
def snowpark_fn(df: SnowparkDataFrame):
    ...
```

## Run Demos

### Requirements

- Python >= 3.9
- OpenJDK 21.0.2
- Snow CLI: The default connection needs to have a database and a schema. After running the app, a table called SNOWPARK_CHECKPOINTS_REPORT will be created.

### Steps

1. Create a Python environment with Python 3.9 or higher in the Demos dir.
2. Build the Python snowpark-checkpoints and snowpark-checkpoints-collector packages. Learn more.

```cmd
cd package_dir
pip install -e .
python3 -m pip install --upgrade build
python3 -m build
```

3. In Demos dir, run:
   pip install "snowflake-connector-python[pandas]"
4. First, run the PySpark demo:
   python demo_pyspark_pipeline.py
   This will generate the JSON schema files. Then, run the Snowpark demo:
   python demo_snowpark_pipeline.py

## References

- #spark-lift-and-shift
- #snowpark-migration-discussion
- One-Pager [Checkpoints for Spark / Snowpark](https://docs.google.com/document/d/1obeiwm2qjIA2CCCjP_2U4gaZ6wXe0NkJoLIyMFAhnOM/edit)

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "snowpark-checkpoints",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<3.13,>=3.9",
    "maintainer_email": null,
    "keywords": "Snowflake, Snowpark, analytics, cloud, data, data-analysis, data-analytics, data-engineering, data-management, data-processing, data-science, data-visualization, data-warehouse, database",
    "author": null,
    "author_email": "\"Snowflake, Inc.\" <snowflake-python-libraries-dl@snowflake.com>",
    "download_url": "https://files.pythonhosted.org/packages/27/ac/afd099bc8f2ac127b84146a4669cc7c634f4867829a3bf7b02b3202f7a42/snowpark_checkpoints-0.1.0rc1.tar.gz",
    "platform": null,
    "description": "# snowpark-checkpoints\n\nSnowpark Python / Spark Migration Testing Tools\n\n---\n**NOTE**\n\nThis package is on Private Preview.\n\n---\n\n# Data Collection from Spark Pipelines\n\nThe `snowpark-checkpoints-collector` package can collect\nschema and check information from a spark pipeline and\nrecord those results into a set of JSON files corresponding to different intermediate dataframes. These files can be inspected manually\nand handed over to teams implementing the snowpark pipeline. The `snowpark-checkpoints-collector` package is designed to have minimal\ndependencies and the generated files are meant to be inspected by security\nteams.\n\nOn the snowpark side the `snowpark-checkpoints` package can use these files to perform schema and data validation checks against snowpark dataframes at the same, intermediate logical \"checkpoints\".\n\n## collect_dataframe_schema\n\n```\nfrom snowflake.snowpark_checkpoints_collector import collect_dataframe_schema;\ncollect_dataframe_schema(df:SparkDataFrame,\n                              checkpoint_name,\n                              sample=0.1)\n```\n\n- df - the spark data frame to collect the schema from\n- checkpoint_name - the name of the \"checkpoint\". Generated JSON files\n  will have the name \"snowpark-[checkpoint_name]-schema.json\"\n- sample - sample size of the spark data frame to use to generate the schema\n\n### Validate DataFrame Schema from File\n\nThe `validate_dataframe_checkpoint` function validates a Snowpark DataFrame against a checkpoint schema file or dataframe.\n\n```python\nfrom snowflake.snowpark_checkpoints.checkpoint import check_dataframe_schema_file\n\nvalidate_dataframe_checkpoint(\n    df: SnowparkDataFrame,\n    checkpoint_name: str,\n    job_context: Optional[SnowparkJobContext] = None,\n    mode: Optional[CheckpointMode] = CheckpointMode.SCHEMA,\n    custom_checks: Optional[dict[Any, Any]] = None,\n    skip_checks: Optional[dict[Any, Any]] = None,\n    sample_frac: Optional[float] = 1.0,\n    sample_number: Optional[int] = None,\n    sampling_strategy: Optional[SamplingStrategy] = SamplingStrategy.RANDOM_SAMPLE,\n    output_path: Optional[str] = None,\n)\n```\n\n- `df`: Snowpark DataFrame to validate.\n- `checkpoint_name`: Name of the checkpoint schema file or DataFrame.\n- `job_context`: Snowpark job context.\n- `mode`: Checkpoint mode (schema or data).\n- `custom_checks`: Custom checks to perform.\n- `skip_checks`: Checks to skip.\n- `sample_frac`: Fraction of the DataFrame to sample.\n- `sample_number`: Number of rows to sample.\n- `sampling_strategy`: Sampling strategy to use.\n- `output_path`: Output path for the checkpoint report.\n\n### Check with Spark Decorator\n\nThe `check_with_spark` decorator converts any Snowpark DataFrame arguments to a function, samples them, and converts them to PySpark DataFrames. It then executes a provided Spark function and compares the outputs between the two implementations.\n\n```python\nfrom snowflake.snowpark_checkpoints.spark_migration import check_with_spark\n\n@check_with_spark(\n    job_context: Optional[SnowparkJobContext],\n    spark_function: Callable,\n    checkpoint_name: str,\n    sample_number: Optional[int] = 100,\n    sampling_strategy: Optional[SamplingStrategy] = SamplingStrategy.RANDOM_SAMPLE,\n    check_dtypes: Optional[bool] = False,\n    check_with_precision: Optional[bool] = False\n    output_path: Optional[str] = None,\n)\ndef snowpark_fn(df: SnowparkDataFrame):\n    ...\n```\n\n- `job_context`: Snowpark job context.\n- `spark_function`: PySpark function to execute.\n- `checkpoint_name`: Name of the check.\n- `sample_number`: Number of rows to sample.\n- `sampling_strategy`: Sampling strategy to use.\n- `check_dtypes`: Check data types.\n- `check_with_precision`: Check with precision.\n- `output_path`: Output path for the checkpoint report.\n\n## Pandera Snowpark Decorators\n\nThe decorators `@check_input_schema` and `@check_output_schema` allow\nfor sampled schema validation of snowpark dataframes in the input arguments or\nin the return value.\n\n```python\nfrom snowflake.snowpark_checkpoints.checkpoint import check_input_schema, check_output_schema\n\n@check_input_schema(\n    pandera_schema: DataFrameSchema,\n    checkpoint_name: Optional[str] = None,\n    sample_frac: Optional[float] = 1.0,\n    sample_number: Optional[int] = 100,\n    sampling_strategy: Optional[SamplingStrategy] = SamplingStrategy.RANDOM_SAMPLE,\n    job_context: Optional[SnowparkJobContext] = None,\n    output_path: Optional[str] = None,\n)\ndef snowpark_fn(df: SnowparkDataFrame):\n    ...\n\n@check_output_schema(\n    pandera_schema: DataFrameSchema,\n    checkpoint_name: Optional[str] = None,\n    sample_frac: Optional[float] = 1.0,\n    sample_number: Optional[int] = None,\n    sampling_strategy: Optional[SamplingStrategy] = SamplingStrategy.RANDOM_SAMPLE,\n    job_context: Optional[SnowparkJobContext] = None,\n    output_path: Optional[str] = None,\n)\ndef snowpark_fn(df: SnowparkDataFrame):\n    ...\n```\n\n## Run Demos\n\n### Requirements\n\n- Python >= 3.9\n- OpenJDK 21.0.2\n- Snow CLI: The default connection needs to have a database and a schema. After running the app, a table called SNOWPARK_CHECKPOINTS_REPORT will be created.\n\n### Steps\n\n1. Create a Python environment with Python 3.9 or higher in the Demos dir.\n2. Build the Python snowpark-checkpoints and snowpark-checkpoints-collector packages. Learn more.\n\n```cmd\ncd package_dir\npip install -e .\npython3 -m pip install --upgrade build\npython3 -m build\n```\n\n3. In Demos dir, run:\n   pip install \"snowflake-connector-python[pandas]\"\n4. First, run the PySpark demo:\n   python demo_pyspark_pipeline.py\n   This will generate the JSON schema files. Then, run the Snowpark demo:\n   python demo_snowpark_pipeline.py\n\n## References\n\n- #spark-lift-and-shift\n- #snowpark-migration-discussion\n- One-Pager [Checkpoints for Spark / Snowpark](https://docs.google.com/document/d/1obeiwm2qjIA2CCCjP_2U4gaZ6wXe0NkJoLIyMFAhnOM/edit)\n",
    "bugtrack_url": null,
    "license": "Apache-2.0",
    "summary": "Snowflake Snowpark Checkpoints",
    "version": "0.1.0rc1",
    "project_urls": {
        "Source": "https://github.com/snowflakedb/snowpark-checkpoints"
    },
    "split_keywords": [
        "snowflake",
        " snowpark",
        " analytics",
        " cloud",
        " data",
        " data-analysis",
        " data-analytics",
        " data-engineering",
        " data-management",
        " data-processing",
        " data-science",
        " data-visualization",
        " data-warehouse",
        " database"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "0cbeb87fbfe64d48503eec4296ca84f15f31670202b97348e389ff9c6f4b6832",
                "md5": "d485dbab268bd1de62e092dc4444f094",
                "sha256": "40e8638b73f6a89a002b1353bfdbe001d2c354b066b31f7b750bda07f84f4851"
            },
            "downloads": -1,
            "filename": "snowpark_checkpoints-0.1.0rc1-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "d485dbab268bd1de62e092dc4444f094",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<3.13,>=3.9",
            "size": 88164,
            "upload_time": "2025-01-15T19:39:47",
            "upload_time_iso_8601": "2025-01-15T19:39:47.807957Z",
            "url": "https://files.pythonhosted.org/packages/0c/be/b87fbfe64d48503eec4296ca84f15f31670202b97348e389ff9c6f4b6832/snowpark_checkpoints-0.1.0rc1-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "27acafd099bc8f2ac127b84146a4669cc7c634f4867829a3bf7b02b3202f7a42",
                "md5": "227bee058113a54c4917e0b1161103dd",
                "sha256": "bec39e4e021ae8ff8fec6340f3801cb0a7b95a615bfd8742f51b8c7e4cbdfb59"
            },
            "downloads": -1,
            "filename": "snowpark_checkpoints-0.1.0rc1.tar.gz",
            "has_sig": false,
            "md5_digest": "227bee058113a54c4917e0b1161103dd",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<3.13,>=3.9",
            "size": 47128,
            "upload_time": "2025-01-15T19:39:58",
            "upload_time_iso_8601": "2025-01-15T19:39:58.058878Z",
            "url": "https://files.pythonhosted.org/packages/27/ac/afd099bc8f2ac127b84146a4669cc7c634f4867829a3bf7b02b3202f7a42/snowpark_checkpoints-0.1.0rc1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-01-15 19:39:58",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "snowflakedb",
    "github_project": "snowpark-checkpoints",
    "github_not_found": true,
    "lcname": "snowpark-checkpoints"
}
        
Elapsed time: 0.51368s