prefect-snowflake


Nameprefect-snowflake JSON
Version 0.28.0 PyPI version JSON
download
home_pageNone
SummaryPrefect integrations for interacting with Snowflake
upload_time2024-09-03 18:25:21
maintainerNone
docs_urlNone
authorNone
requires_python>=3.9
licenseApache License 2.0
keywords prefect
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # prefect-snowflake

<p align="center">
    <a href="https://pypi.python.org/pypi/prefect-snowflake/" alt="PyPI version">
        <img alt="PyPI" src="https://img.shields.io/pypi/v/prefect-snowflake?color=26272B&labelColor=090422"></a>
    <a href="https://pepy.tech/badge/prefect-snowflake/" alt="Downloads">
        <img src="https://img.shields.io/pypi/dm/prefect-snowflake?color=26272B&labelColor=090422" /></a>
</p>

## Welcome!

The prefect-snowflake collection makes it easy to connect to a Snowflake database in your Prefect flows. Check out the examples below to get started!

## Getting Started

### Integrate with Prefect flows

Prefect works with Snowflake by providing dataflow automation for faster, more efficient data pipeline creation, execution, and monitoring.

This results in reduced errors, increased confidence in your data, and ultimately, faster insights.

To set up a table, use the `execute` and `execute_many` methods. Then, use the `fetch_many` method to retrieve data in a stream until there's no more data.

By using the `SnowflakeConnector` as a context manager, you can make sure that the Snowflake connection and cursors are closed properly after you're done with them.

Be sure to install [prefect-snowflake](#installation) and [save to block](#saving-credentials-to-block) to run the examples below!

=== "Sync"

```python
from prefect import flow, task
from prefect_snowflake import SnowflakeConnector


@task
def setup_table(block_name: str) -> None:
    with SnowflakeConnector.load(block_name) as connector:
        connector.execute(
            "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
        )
        connector.execute_many(
            "INSERT INTO customers (name, address) VALUES (%(name)s, %(address)s);",
            seq_of_parameters=[
                {"name": "Ford", "address": "Highway 42"},
                {"name": "Unknown", "address": "Space"},
                {"name": "Me", "address": "Myway 88"},
            ],
        )

@task
def fetch_data(block_name: str) -> list:
    all_rows = []
    with SnowflakeConnector.load(block_name) as connector:
        while True:
            # Repeated fetch* calls using the same operation will
            # skip re-executing and instead return the next set of results
            new_rows = connector.fetch_many("SELECT * FROM customers", size=2)
            if len(new_rows) == 0:
                break
            all_rows.append(new_rows)
    return all_rows

@flow
def snowflake_flow(block_name: str) -> list:
    setup_table(block_name)
    all_rows = fetch_data(block_name)
    return all_rows

snowflake_flow()
```

=== "Async"

```python
from prefect import flow, task
from prefect_snowflake import SnowflakeConnector
import asyncio

@task
async def setup_table(block_name: str) -> None:
    with await SnowflakeConnector.load(block_name) as connector:
        await connector.execute(
            "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
        )
        await connector.execute_many(
            "INSERT INTO customers (name, address) VALUES (%(name)s, %(address)s);",
            seq_of_parameters=[
                {"name": "Ford", "address": "Highway 42"},
                {"name": "Unknown", "address": "Space"},
                {"name": "Me", "address": "Myway 88"},
            ],
        )

@task
async def fetch_data(block_name: str) -> list:
    all_rows = []
    with await SnowflakeConnector.load(block_name) as connector:
        while True:
            # Repeated fetch* calls using the same operation will
            # skip re-executing and instead return the next set of results
            new_rows = await connector.fetch_many("SELECT * FROM customers", size=2)
            if len(new_rows) == 0:
                break
            all_rows.append(new_rows)
    return all_rows

@flow
async def snowflake_flow(block_name: str) -> list:
    await setup_table(block_name)
    all_rows = await fetch_data(block_name)
    return all_rows

asyncio.run(snowflake_flow("example"))
```

### Access underlying Snowflake connection

If the native methods of the block don't meet your requirements, don't worry.

You have the option to access the underlying Snowflake connection and utilize its built-in methods as well.

```python
import pandas as pd
from prefect import flow
from prefect_snowflake.database import SnowflakeConnector
from snowflake.connector.pandas_tools import write_pandas

@flow
def snowflake_write_pandas_flow():
    connector = SnowflakeConnector.load("my-block")
    with connector.get_connection() as connection:
        table_name = "TABLE_NAME"
        ddl = "NAME STRING, NUMBER INT"
        statement = f'CREATE TABLE IF NOT EXISTS {table_name} ({ddl})'
        with connection.cursor() as cursor:
            cursor.execute(statement)

        # case sensitivity matters here!
        df = pd.DataFrame([('Marvin', 42), ('Ford', 88)], columns=['NAME', 'NUMBER'])
        success, num_chunks, num_rows, _ = write_pandas(
            conn=connection,
            df=df,
            table_name=table_name,
            database=snowflake_connector.database,
            schema=snowflake_connector.schema_  # note the "_" suffix
        )
```

## Resources

For more tips on how to use tasks and flows in an integration, check out [Using Collections](https://docs.prefect.io/integrations/usage/)!

### Installation

Install `prefect-snowflake` with `pip`:

```bash
pip install prefect-snowflake
```

A list of available blocks in `prefect-snowflake` and their setup instructions can be found [here](https://PrefectHQ.github.io/prefect-snowflake/blocks_catalog).

Requires an installation of Python 3.9+.

We recommend using a Python virtual environment manager such as pipenv, conda or virtualenv.

These tasks are designed to work with Prefect 2. For more information about how to use Prefect, please refer to the [Prefect documentation](https://docs.prefect.io/).

### Saving credentials to block

Note, to use the `load` method on Blocks, you must already have a block document [saved through code](https://docs.prefect.io/concepts/blocks/#saving-blocks) or saved through the UI.

Below is a walkthrough on saving a `SnowflakeCredentials` block through code.

1. Head over to https://app.snowflake.com/.
2. Login to your Snowflake account, e.g. nh12345.us-east-2.aws, with your username and password.
3. Use those credentials to fill replace the placeholders below.

```python
from prefect_snowflake import SnowflakeCredentials

credentials = SnowflakeCredentials(
    account="ACCOUNT-PLACEHOLDER",  # resembles nh12345.us-east-2.aws
    user="USER-PLACEHOLDER",
    password="PASSWORD-PLACEHOLDER"
)
credentials.save("CREDENTIALS-BLOCK-NAME-PLACEHOLDER")
```

Then, to create a `SnowflakeConnector` block:

1. After logging in, click on any worksheet.
2. On the left side, select a database and schema.
3. On the top right, select a warehouse.
3. Create a short script, replacing the placeholders below.

```python
from prefect_snowflake import SnowflakeCredentials, SnowflakeConnector

credentials = SnowflakeCredentials.load("CREDENTIALS-BLOCK-NAME-PLACEHOLDER")

connector = SnowflakeConnector(
    credentials=credentials,
    database="DATABASE-PLACEHOLDER",
    schema="SCHEMA-PLACEHOLDER",
    warehouse="COMPUTE_WH",
)
connector.save("CONNECTOR-BLOCK-NAME-PLACEHOLDER")
```

Congrats! You can now easily load the saved block, which holds your credentials and connection info:

```python
from prefect_snowflake import SnowflakeCredentials, SnowflakeConnector

SnowflakeCredentials.load("CREDENTIALS-BLOCK-NAME-PLACEHOLDER")
SnowflakeConnector.load("CONNECTOR-BLOCK-NAME-PLACEHOLDER")
```

!!! info "Registering blocks"

Register blocks in this module to
[view and edit them](https://docs.prefect.io/ui/blocks/)
on Prefect Cloud:

```bash
prefect block register -m prefect_snowflake
```

A list of available blocks in `prefect-snowflake` and their setup instructions can be found [here](https://PrefectHQ.github.io/prefect-snowflake/blocks_catalog).

### Feedback

If you encounter any bugs while using `prefect-snowflake`, feel free to open an issue in the [prefect-snowflake](https://github.com/PrefectHQ/prefect-snowflake) repository.

If you have any questions or issues while using `prefect-snowflake`, you can find help in either the [Prefect Discourse forum](https://discourse.prefect.io/) or the [Prefect Slack community](https://prefect.io/slack).

Feel free to star or watch [`prefect-snowflake`](https://github.com/PrefectHQ/prefect-snowflake) for updates too!

### Contributing

If you'd like to help contribute to fix an issue or add a feature to `prefect-snowflake`, please [propose changes through a pull request from a fork of the repository](https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/proposing-changes-to-your-work-with-pull-requests/creating-a-pull-request-from-a-fork).

Here are the steps:

1. [Fork the repository](https://docs.github.com/en/get-started/quickstart/fork-a-repo#forking-a-repository)
2. [Clone the forked repository](https://docs.github.com/en/get-started/quickstart/fork-a-repo#cloning-your-forked-repository)
3. Install the repository and its dependencies:
```
pip install -e ".[dev]"
```
4. Make desired changes
5. Add tests
6. Insert an entry to [CHANGELOG.md](https://github.com/PrefectHQ/prefect-snowflake/blob/main/CHANGELOG.md)
7. Install `pre-commit` to perform quality checks prior to commit:
```
pre-commit install
```
8. `git commit`, `git push`, and create a pull request

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "prefect-snowflake",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.9",
    "maintainer_email": null,
    "keywords": "prefect",
    "author": null,
    "author_email": "\"Prefect Technologies, Inc.\" <help@prefect.io>",
    "download_url": "https://files.pythonhosted.org/packages/31/9a/6bc048cb4b795a8619eb2f91aee451df7e80b563ee4ffca7ba6f5697c2bb/prefect_snowflake-0.28.0.tar.gz",
    "platform": null,
    "description": "# prefect-snowflake\n\n<p align=\"center\">\n    <a href=\"https://pypi.python.org/pypi/prefect-snowflake/\" alt=\"PyPI version\">\n        <img alt=\"PyPI\" src=\"https://img.shields.io/pypi/v/prefect-snowflake?color=26272B&labelColor=090422\"></a>\n    <a href=\"https://pepy.tech/badge/prefect-snowflake/\" alt=\"Downloads\">\n        <img src=\"https://img.shields.io/pypi/dm/prefect-snowflake?color=26272B&labelColor=090422\" /></a>\n</p>\n\n## Welcome!\n\nThe prefect-snowflake collection makes it easy to connect to a Snowflake database in your Prefect flows. Check out the examples below to get started!\n\n## Getting Started\n\n### Integrate with Prefect flows\n\nPrefect works with Snowflake by providing dataflow automation for faster, more efficient data pipeline creation, execution, and monitoring.\n\nThis results in reduced errors, increased confidence in your data, and ultimately, faster insights.\n\nTo set up a table, use the `execute` and `execute_many` methods. Then, use the `fetch_many` method to retrieve data in a stream until there's no more data.\n\nBy using the `SnowflakeConnector` as a context manager, you can make sure that the Snowflake connection and cursors are closed properly after you're done with them.\n\nBe sure to install [prefect-snowflake](#installation) and [save to block](#saving-credentials-to-block) to run the examples below!\n\n=== \"Sync\"\n\n```python\nfrom prefect import flow, task\nfrom prefect_snowflake import SnowflakeConnector\n\n\n@task\ndef setup_table(block_name: str) -> None:\n    with SnowflakeConnector.load(block_name) as connector:\n        connector.execute(\n            \"CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);\"\n        )\n        connector.execute_many(\n            \"INSERT INTO customers (name, address) VALUES (%(name)s, %(address)s);\",\n            seq_of_parameters=[\n                {\"name\": \"Ford\", \"address\": \"Highway 42\"},\n                {\"name\": \"Unknown\", \"address\": \"Space\"},\n                {\"name\": \"Me\", \"address\": \"Myway 88\"},\n            ],\n        )\n\n@task\ndef fetch_data(block_name: str) -> list:\n    all_rows = []\n    with SnowflakeConnector.load(block_name) as connector:\n        while True:\n            # Repeated fetch* calls using the same operation will\n            # skip re-executing and instead return the next set of results\n            new_rows = connector.fetch_many(\"SELECT * FROM customers\", size=2)\n            if len(new_rows) == 0:\n                break\n            all_rows.append(new_rows)\n    return all_rows\n\n@flow\ndef snowflake_flow(block_name: str) -> list:\n    setup_table(block_name)\n    all_rows = fetch_data(block_name)\n    return all_rows\n\nsnowflake_flow()\n```\n\n=== \"Async\"\n\n```python\nfrom prefect import flow, task\nfrom prefect_snowflake import SnowflakeConnector\nimport asyncio\n\n@task\nasync def setup_table(block_name: str) -> None:\n    with await SnowflakeConnector.load(block_name) as connector:\n        await connector.execute(\n            \"CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);\"\n        )\n        await connector.execute_many(\n            \"INSERT INTO customers (name, address) VALUES (%(name)s, %(address)s);\",\n            seq_of_parameters=[\n                {\"name\": \"Ford\", \"address\": \"Highway 42\"},\n                {\"name\": \"Unknown\", \"address\": \"Space\"},\n                {\"name\": \"Me\", \"address\": \"Myway 88\"},\n            ],\n        )\n\n@task\nasync def fetch_data(block_name: str) -> list:\n    all_rows = []\n    with await SnowflakeConnector.load(block_name) as connector:\n        while True:\n            # Repeated fetch* calls using the same operation will\n            # skip re-executing and instead return the next set of results\n            new_rows = await connector.fetch_many(\"SELECT * FROM customers\", size=2)\n            if len(new_rows) == 0:\n                break\n            all_rows.append(new_rows)\n    return all_rows\n\n@flow\nasync def snowflake_flow(block_name: str) -> list:\n    await setup_table(block_name)\n    all_rows = await fetch_data(block_name)\n    return all_rows\n\nasyncio.run(snowflake_flow(\"example\"))\n```\n\n### Access underlying Snowflake connection\n\nIf the native methods of the block don't meet your requirements, don't worry.\n\nYou have the option to access the underlying Snowflake connection and utilize its built-in methods as well.\n\n```python\nimport pandas as pd\nfrom prefect import flow\nfrom prefect_snowflake.database import SnowflakeConnector\nfrom snowflake.connector.pandas_tools import write_pandas\n\n@flow\ndef snowflake_write_pandas_flow():\n    connector = SnowflakeConnector.load(\"my-block\")\n    with connector.get_connection() as connection:\n        table_name = \"TABLE_NAME\"\n        ddl = \"NAME STRING, NUMBER INT\"\n        statement = f'CREATE TABLE IF NOT EXISTS {table_name} ({ddl})'\n        with connection.cursor() as cursor:\n            cursor.execute(statement)\n\n        # case sensitivity matters here!\n        df = pd.DataFrame([('Marvin', 42), ('Ford', 88)], columns=['NAME', 'NUMBER'])\n        success, num_chunks, num_rows, _ = write_pandas(\n            conn=connection,\n            df=df,\n            table_name=table_name,\n            database=snowflake_connector.database,\n            schema=snowflake_connector.schema_  # note the \"_\" suffix\n        )\n```\n\n## Resources\n\nFor more tips on how to use tasks and flows in an integration, check out [Using Collections](https://docs.prefect.io/integrations/usage/)!\n\n### Installation\n\nInstall `prefect-snowflake` with `pip`:\n\n```bash\npip install prefect-snowflake\n```\n\nA list of available blocks in `prefect-snowflake` and their setup instructions can be found [here](https://PrefectHQ.github.io/prefect-snowflake/blocks_catalog).\n\nRequires an installation of Python 3.9+.\n\nWe recommend using a Python virtual environment manager such as pipenv, conda or virtualenv.\n\nThese tasks are designed to work with Prefect 2. For more information about how to use Prefect, please refer to the [Prefect documentation](https://docs.prefect.io/).\n\n### Saving credentials to block\n\nNote, to use the `load` method on Blocks, you must already have a block document [saved through code](https://docs.prefect.io/concepts/blocks/#saving-blocks) or saved through the UI.\n\nBelow is a walkthrough on saving a `SnowflakeCredentials` block through code.\n\n1. Head over to https://app.snowflake.com/.\n2. Login to your Snowflake account, e.g. nh12345.us-east-2.aws, with your username and password.\n3. Use those credentials to fill replace the placeholders below.\n\n```python\nfrom prefect_snowflake import SnowflakeCredentials\n\ncredentials = SnowflakeCredentials(\n    account=\"ACCOUNT-PLACEHOLDER\",  # resembles nh12345.us-east-2.aws\n    user=\"USER-PLACEHOLDER\",\n    password=\"PASSWORD-PLACEHOLDER\"\n)\ncredentials.save(\"CREDENTIALS-BLOCK-NAME-PLACEHOLDER\")\n```\n\nThen, to create a `SnowflakeConnector` block:\n\n1. After logging in, click on any worksheet.\n2. On the left side, select a database and schema.\n3. On the top right, select a warehouse.\n3. Create a short script, replacing the placeholders below.\n\n```python\nfrom prefect_snowflake import SnowflakeCredentials, SnowflakeConnector\n\ncredentials = SnowflakeCredentials.load(\"CREDENTIALS-BLOCK-NAME-PLACEHOLDER\")\n\nconnector = SnowflakeConnector(\n    credentials=credentials,\n    database=\"DATABASE-PLACEHOLDER\",\n    schema=\"SCHEMA-PLACEHOLDER\",\n    warehouse=\"COMPUTE_WH\",\n)\nconnector.save(\"CONNECTOR-BLOCK-NAME-PLACEHOLDER\")\n```\n\nCongrats! You can now easily load the saved block, which holds your credentials and connection info:\n\n```python\nfrom prefect_snowflake import SnowflakeCredentials, SnowflakeConnector\n\nSnowflakeCredentials.load(\"CREDENTIALS-BLOCK-NAME-PLACEHOLDER\")\nSnowflakeConnector.load(\"CONNECTOR-BLOCK-NAME-PLACEHOLDER\")\n```\n\n!!! info \"Registering blocks\"\n\nRegister blocks in this module to\n[view and edit them](https://docs.prefect.io/ui/blocks/)\non Prefect Cloud:\n\n```bash\nprefect block register -m prefect_snowflake\n```\n\nA list of available blocks in `prefect-snowflake` and their setup instructions can be found [here](https://PrefectHQ.github.io/prefect-snowflake/blocks_catalog).\n\n### Feedback\n\nIf you encounter any bugs while using `prefect-snowflake`, feel free to open an issue in the [prefect-snowflake](https://github.com/PrefectHQ/prefect-snowflake) repository.\n\nIf you have any questions or issues while using `prefect-snowflake`, you can find help in either the [Prefect Discourse forum](https://discourse.prefect.io/) or the [Prefect Slack community](https://prefect.io/slack).\n\nFeel free to star or watch [`prefect-snowflake`](https://github.com/PrefectHQ/prefect-snowflake) for updates too!\n\n### Contributing\n\nIf you'd like to help contribute to fix an issue or add a feature to `prefect-snowflake`, please [propose changes through a pull request from a fork of the repository](https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/proposing-changes-to-your-work-with-pull-requests/creating-a-pull-request-from-a-fork).\n\nHere are the steps:\n\n1. [Fork the repository](https://docs.github.com/en/get-started/quickstart/fork-a-repo#forking-a-repository)\n2. [Clone the forked repository](https://docs.github.com/en/get-started/quickstart/fork-a-repo#cloning-your-forked-repository)\n3. Install the repository and its dependencies:\n```\npip install -e \".[dev]\"\n```\n4. Make desired changes\n5. Add tests\n6. Insert an entry to [CHANGELOG.md](https://github.com/PrefectHQ/prefect-snowflake/blob/main/CHANGELOG.md)\n7. Install `pre-commit` to perform quality checks prior to commit:\n```\npre-commit install\n```\n8. `git commit`, `git push`, and create a pull request\n",
    "bugtrack_url": null,
    "license": "Apache License 2.0",
    "summary": "Prefect integrations for interacting with Snowflake",
    "version": "0.28.0",
    "project_urls": {
        "Homepage": "https://github.com/PrefectHQ/prefect/tree/main/src/integrations/prefect-snowflake"
    },
    "split_keywords": [
        "prefect"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "2cafa8461ad8ccdd7a4fd3f7d69309d89851f6d7ba3b1104303a9712ae311a77",
                "md5": "ebc855a50fdecfe50dca48f96f117530",
                "sha256": "7dbda35fa210c031e5ba5d9c60b62b52d600a124dfe15ea6e86b210c017802a7"
            },
            "downloads": -1,
            "filename": "prefect_snowflake-0.28.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "ebc855a50fdecfe50dca48f96f117530",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.9",
            "size": 19331,
            "upload_time": "2024-09-03T18:25:20",
            "upload_time_iso_8601": "2024-09-03T18:25:20.114072Z",
            "url": "https://files.pythonhosted.org/packages/2c/af/a8461ad8ccdd7a4fd3f7d69309d89851f6d7ba3b1104303a9712ae311a77/prefect_snowflake-0.28.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "319a6bc048cb4b795a8619eb2f91aee451df7e80b563ee4ffca7ba6f5697c2bb",
                "md5": "69d545da263f2e491dbe1bc26e8ebbd8",
                "sha256": "41416102be836d92f640f0aabc3d80c5a603b994036d19132cbebb1ff2da24d3"
            },
            "downloads": -1,
            "filename": "prefect_snowflake-0.28.0.tar.gz",
            "has_sig": false,
            "md5_digest": "69d545da263f2e491dbe1bc26e8ebbd8",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.9",
            "size": 29655,
            "upload_time": "2024-09-03T18:25:21",
            "upload_time_iso_8601": "2024-09-03T18:25:21.423334Z",
            "url": "https://files.pythonhosted.org/packages/31/9a/6bc048cb4b795a8619eb2f91aee451df7e80b563ee4ffca7ba6f5697c2bb/prefect_snowflake-0.28.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-09-03 18:25:21",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "PrefectHQ",
    "github_project": "prefect",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "requirements": [],
    "lcname": "prefect-snowflake"
}
        
Elapsed time: 0.34366s