prefect-sqlalchemy


Nameprefect-sqlalchemy JSON
Version 0.4.1 PyPI version JSON
download
home_pageNone
SummaryPrefect integrations for working with databases
upload_time2024-04-25 16:20:17
maintainerNone
docs_urlNone
authorNone
requires_python>=3.8
licenseApache License 2.0
keywords prefect
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # prefect-sqlalchemy

<p align="center">
    <a href="https://pypi.python.org/pypi/prefect-sqlalchemy/" alt="PyPI version">
        <img alt="PyPI" src="https://img.shields.io/pypi/v/prefect-sqlalchemy?color=0052FF&labelColor=090422"></a>
    <a href="https://pepy.tech/badge/prefect-sqlalchemy/" alt="Downloads">
        <img src="https://img.shields.io/pypi/dm/prefect-sqlalchemy?color=0052FF&labelColor=090422" /></a>
</p>

Visit the full docs [here](https://PrefectHQ.github.io/prefect-sqlalchemy) to see additional examples and the API reference.

The prefect-sqlalchemy collection makes it easy to connect to a database in your Prefect flows. Check out the examples below to get started!

## Getting started

### Integrate with Prefect flows

Prefect and SQLAlchemy are a data powerhouse duo. With Prefect, your workflows are orchestratable and observable, and with SQLAlchemy, your databases are a snap to handle! Get ready to experience the ultimate data "flow-chemistry"!

To set up a table, use the `execute` and `execute_many` methods. Then, use the `fetch_many` method to retrieve data in a stream until there's no more data.

By using the `SqlAlchemyConnector` as a context manager, you can make sure that the SQLAlchemy engine and any connected resources are closed properly after you're done with them.

Be sure to install [prefect-sqlalchemy](#installation) and [save your credentials to a Prefect block](#saving-credentials-to-block) to run the examples below!

!!! note "Async support"

    `SqlAlchemyConnector` also supports async workflows! Just be sure to save, load, and use an async driver as in the example below.

    ```python
    from prefect_sqlalchemy import SqlAlchemyConnector, ConnectionComponents, AsyncDriver

    connector = SqlAlchemyConnector(
        connection_info=ConnectionComponents(
            driver=AsyncDriver.SQLITE_AIOSQLITE,
            database="DATABASE-PLACEHOLDER.db"
        )
    )

    connector.save("BLOCK_NAME-PLACEHOLDER")
    ```

=== "Sync"

    ```python
    from prefect import flow, task
    from prefect_sqlalchemy import SqlAlchemyConnector


    @task
    def setup_table(block_name: str) -> None:
        with SqlAlchemyConnector.load(block_name) as connector:
            connector.execute(
                "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
            )
            connector.execute(
                "INSERT INTO customers (name, address) VALUES (:name, :address);",
                parameters={"name": "Marvin", "address": "Highway 42"},
            )
            connector.execute_many(
                "INSERT INTO customers (name, address) VALUES (:name, :address);",
                seq_of_parameters=[
                    {"name": "Ford", "address": "Highway 42"},
                    {"name": "Unknown", "address": "Highway 42"},
                ],
            )

    @task
    def fetch_data(block_name: str) -> list:
        all_rows = []
        with SqlAlchemyConnector.load(block_name) as connector:
            while True:
                # Repeated fetch* calls using the same operation will
                # skip re-executing and instead return the next set of results
                new_rows = connector.fetch_many("SELECT * FROM customers", size=2)
                if len(new_rows) == 0:
                    break
                all_rows.append(new_rows)
        return all_rows

    @flow
    def sqlalchemy_flow(block_name: str) -> list:
        setup_table(block_name)
        all_rows = fetch_data(block_name)
        return all_rows


    sqlalchemy_flow("BLOCK-NAME-PLACEHOLDER")
    ```

=== "Async"

    ```python
    from prefect import flow, task
    from prefect_sqlalchemy import SqlAlchemyConnector
    import asyncio

    @task
    async def setup_table(block_name: str) -> None:
        async with await SqlAlchemyConnector.load(block_name) as connector:
            await connector.execute(
                "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
            )
            await connector.execute(
                "INSERT INTO customers (name, address) VALUES (:name, :address);",
                parameters={"name": "Marvin", "address": "Highway 42"},
            )
            await connector.execute_many(
                "INSERT INTO customers (name, address) VALUES (:name, :address);",
                seq_of_parameters=[
                    {"name": "Ford", "address": "Highway 42"},
                    {"name": "Unknown", "address": "Highway 42"},
                ],
            )

    @task
    async def fetch_data(block_name: str) -> list:
        all_rows = []
        async with await SqlAlchemyConnector.load(block_name) as connector:
            while True:
                # Repeated fetch* calls using the same operation will
                # skip re-executing and instead return the next set of results
                new_rows = await connector.fetch_many("SELECT * FROM customers", size=2)
                if len(new_rows) == 0:
                    break
                all_rows.append(new_rows)
        return all_rows

    @flow
    async def sqlalchemy_flow(block_name: str) -> list:
        await setup_table(block_name)
        all_rows = await fetch_data(block_name)
        return all_rows


    asyncio.run(sqlalchemy_flow("BLOCK-NAME-PLACEHOLDER"))
    ```

## Resources

For more tips on how to use tasks and flows provided in a Prefect integration library, check out the [Prefect docs on using integrations](https://docs.prefect.io/integrations/usage/).

### Installation

Install `prefect-sqlalchemy` with `pip`:

```bash
pip install prefect-sqlalchemy
```

Requires an installation of Python 3.8 or higher.

We recommend using a Python virtual environment manager such as pipenv, conda, or virtualenv.

The tasks in this library are designed to work with Prefect 2. For more information about how to use Prefect, please refer to the [Prefect documentation](https://docs.prefect.io/).

### Saving credentials to a block

To use the `load` method on Blocks, you must have a block document [saved through code](https://docs.prefect.io/concepts/blocks/#saving-blocks) or saved through the UI.

Below is a walkthrough on saving block documents through code; simply create a short script, replacing the placeholders.

```python
from prefect_sqlalchemy import SqlAlchemyConnector, ConnectionComponents, SyncDriver

connector = SqlAlchemyConnector(
    connection_info=ConnectionComponents(
        driver=SyncDriver.POSTGRESQL_PSYCOPG2,
        username="USERNAME-PLACEHOLDER",
        password="PASSWORD-PLACEHOLDER",
        host="localhost",
        port=5432,
        database="DATABASE-PLACEHOLDER",
    )
)

connector.save("BLOCK_NAME-PLACEHOLDER")
```

Congrats! You can now easily load the saved block, which holds your credentials:

```python
from prefect_sqlalchemy import SqlAlchemyConnector

SqlAlchemyConnector.load("BLOCK_NAME-PLACEHOLDER")
```

The required keywords depend upon the desired driver. For example, SQLite requires only the `driver` and `database` arguments:

```python
from prefect_sqlalchemy import SqlAlchemyConnector, ConnectionComponents, SyncDriver

connector = SqlAlchemyConnector(
    connection_info=ConnectionComponents(
        driver=SyncDriver.SQLITE_PYSQLITE,
        database="DATABASE-PLACEHOLDER.db"
    )
)

connector.save("BLOCK_NAME-PLACEHOLDER")
```

!!! info "Registering blocks"

    Register blocks in this module to
    [view and edit them](https://orion-docs.prefect.io/ui/blocks/)
    on Prefect Cloud:

    ```bash
    prefect block register -m prefect_sqlalchemy
    ```

### Feedback

If you encounter any bugs while using `prefect-sqlalchemy`, please open an issue in the [prefect](https://github.com/PrefectHQ/prefect) repository.

If you have any questions or issues while using `prefect-sqlalchemy`, you can find help in the [Prefect Community Slack ](https://prefect.io/slack).


### Contributing

If you'd like to help contribute to fix an issue or add a feature to `prefect-sqlalchemy`, please [propose changes through a pull request from a fork of the repository](https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/proposing-changes-to-your-work-with-pull-requests/creating-a-pull-request-from-a-fork).

Here are the steps:

1. [Fork the repository](https://docs.github.com/en/get-started/quickstart/fork-a-repo#forking-a-repository)
2. [Clone the forked repository](https://docs.github.com/en/get-started/quickstart/fork-a-repo#cloning-your-forked-repository)
3. Install the repository and its dependencies:
```
pip install -e ".[dev]"
```
4. Make desired changes
5. Add tests
6. Install `pre-commit` to perform quality checks prior to commit:
```
pre-commit install
```
7. `git commit`, `git push`, and create a pull request

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "prefect-sqlalchemy",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.8",
    "maintainer_email": null,
    "keywords": "prefect",
    "author": null,
    "author_email": "\"Prefect Technologies, Inc.\" <help@prefect.io>",
    "download_url": "https://files.pythonhosted.org/packages/44/84/d99bc107ed90b298894ed13d64221871348ceb85d474777c20c68a1ee230/prefect_sqlalchemy-0.4.1.tar.gz",
    "platform": null,
    "description": "# prefect-sqlalchemy\n\n<p align=\"center\">\n    <a href=\"https://pypi.python.org/pypi/prefect-sqlalchemy/\" alt=\"PyPI version\">\n        <img alt=\"PyPI\" src=\"https://img.shields.io/pypi/v/prefect-sqlalchemy?color=0052FF&labelColor=090422\"></a>\n    <a href=\"https://pepy.tech/badge/prefect-sqlalchemy/\" alt=\"Downloads\">\n        <img src=\"https://img.shields.io/pypi/dm/prefect-sqlalchemy?color=0052FF&labelColor=090422\" /></a>\n</p>\n\nVisit the full docs [here](https://PrefectHQ.github.io/prefect-sqlalchemy) to see additional examples and the API reference.\n\nThe prefect-sqlalchemy collection makes it easy to connect to a database in your Prefect flows. Check out the examples below to get started!\n\n## Getting started\n\n### Integrate with Prefect flows\n\nPrefect and SQLAlchemy are a data powerhouse duo. With Prefect, your workflows are orchestratable and observable, and with SQLAlchemy, your databases are a snap to handle! Get ready to experience the ultimate data \"flow-chemistry\"!\n\nTo set up a table, use the `execute` and `execute_many` methods. Then, use the `fetch_many` method to retrieve data in a stream until there's no more data.\n\nBy using the `SqlAlchemyConnector` as a context manager, you can make sure that the SQLAlchemy engine and any connected resources are closed properly after you're done with them.\n\nBe sure to install [prefect-sqlalchemy](#installation) and [save your credentials to a Prefect block](#saving-credentials-to-block) to run the examples below!\n\n!!! note \"Async support\"\n\n    `SqlAlchemyConnector` also supports async workflows! Just be sure to save, load, and use an async driver as in the example below.\n\n    ```python\n    from prefect_sqlalchemy import SqlAlchemyConnector, ConnectionComponents, AsyncDriver\n\n    connector = SqlAlchemyConnector(\n        connection_info=ConnectionComponents(\n            driver=AsyncDriver.SQLITE_AIOSQLITE,\n            database=\"DATABASE-PLACEHOLDER.db\"\n        )\n    )\n\n    connector.save(\"BLOCK_NAME-PLACEHOLDER\")\n    ```\n\n=== \"Sync\"\n\n    ```python\n    from prefect import flow, task\n    from prefect_sqlalchemy import SqlAlchemyConnector\n\n\n    @task\n    def setup_table(block_name: str) -> None:\n        with SqlAlchemyConnector.load(block_name) as connector:\n            connector.execute(\n                \"CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);\"\n            )\n            connector.execute(\n                \"INSERT INTO customers (name, address) VALUES (:name, :address);\",\n                parameters={\"name\": \"Marvin\", \"address\": \"Highway 42\"},\n            )\n            connector.execute_many(\n                \"INSERT INTO customers (name, address) VALUES (:name, :address);\",\n                seq_of_parameters=[\n                    {\"name\": \"Ford\", \"address\": \"Highway 42\"},\n                    {\"name\": \"Unknown\", \"address\": \"Highway 42\"},\n                ],\n            )\n\n    @task\n    def fetch_data(block_name: str) -> list:\n        all_rows = []\n        with SqlAlchemyConnector.load(block_name) as connector:\n            while True:\n                # Repeated fetch* calls using the same operation will\n                # skip re-executing and instead return the next set of results\n                new_rows = connector.fetch_many(\"SELECT * FROM customers\", size=2)\n                if len(new_rows) == 0:\n                    break\n                all_rows.append(new_rows)\n        return all_rows\n\n    @flow\n    def sqlalchemy_flow(block_name: str) -> list:\n        setup_table(block_name)\n        all_rows = fetch_data(block_name)\n        return all_rows\n\n\n    sqlalchemy_flow(\"BLOCK-NAME-PLACEHOLDER\")\n    ```\n\n=== \"Async\"\n\n    ```python\n    from prefect import flow, task\n    from prefect_sqlalchemy import SqlAlchemyConnector\n    import asyncio\n\n    @task\n    async def setup_table(block_name: str) -> None:\n        async with await SqlAlchemyConnector.load(block_name) as connector:\n            await connector.execute(\n                \"CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);\"\n            )\n            await connector.execute(\n                \"INSERT INTO customers (name, address) VALUES (:name, :address);\",\n                parameters={\"name\": \"Marvin\", \"address\": \"Highway 42\"},\n            )\n            await connector.execute_many(\n                \"INSERT INTO customers (name, address) VALUES (:name, :address);\",\n                seq_of_parameters=[\n                    {\"name\": \"Ford\", \"address\": \"Highway 42\"},\n                    {\"name\": \"Unknown\", \"address\": \"Highway 42\"},\n                ],\n            )\n\n    @task\n    async def fetch_data(block_name: str) -> list:\n        all_rows = []\n        async with await SqlAlchemyConnector.load(block_name) as connector:\n            while True:\n                # Repeated fetch* calls using the same operation will\n                # skip re-executing and instead return the next set of results\n                new_rows = await connector.fetch_many(\"SELECT * FROM customers\", size=2)\n                if len(new_rows) == 0:\n                    break\n                all_rows.append(new_rows)\n        return all_rows\n\n    @flow\n    async def sqlalchemy_flow(block_name: str) -> list:\n        await setup_table(block_name)\n        all_rows = await fetch_data(block_name)\n        return all_rows\n\n\n    asyncio.run(sqlalchemy_flow(\"BLOCK-NAME-PLACEHOLDER\"))\n    ```\n\n## Resources\n\nFor more tips on how to use tasks and flows provided in a Prefect integration library, check out the [Prefect docs on using integrations](https://docs.prefect.io/integrations/usage/).\n\n### Installation\n\nInstall `prefect-sqlalchemy` with `pip`:\n\n```bash\npip install prefect-sqlalchemy\n```\n\nRequires an installation of Python 3.8 or higher.\n\nWe recommend using a Python virtual environment manager such as pipenv, conda, or virtualenv.\n\nThe tasks in this library are designed to work with Prefect 2. For more information about how to use Prefect, please refer to the [Prefect documentation](https://docs.prefect.io/).\n\n### Saving credentials to a block\n\nTo use the `load` method on Blocks, you must have a block document [saved through code](https://docs.prefect.io/concepts/blocks/#saving-blocks) or saved through the UI.\n\nBelow is a walkthrough on saving block documents through code; simply create a short script, replacing the placeholders.\n\n```python\nfrom prefect_sqlalchemy import SqlAlchemyConnector, ConnectionComponents, SyncDriver\n\nconnector = SqlAlchemyConnector(\n    connection_info=ConnectionComponents(\n        driver=SyncDriver.POSTGRESQL_PSYCOPG2,\n        username=\"USERNAME-PLACEHOLDER\",\n        password=\"PASSWORD-PLACEHOLDER\",\n        host=\"localhost\",\n        port=5432,\n        database=\"DATABASE-PLACEHOLDER\",\n    )\n)\n\nconnector.save(\"BLOCK_NAME-PLACEHOLDER\")\n```\n\nCongrats! You can now easily load the saved block, which holds your credentials:\n\n```python\nfrom prefect_sqlalchemy import SqlAlchemyConnector\n\nSqlAlchemyConnector.load(\"BLOCK_NAME-PLACEHOLDER\")\n```\n\nThe required keywords depend upon the desired driver. For example, SQLite requires only the `driver` and `database` arguments:\n\n```python\nfrom prefect_sqlalchemy import SqlAlchemyConnector, ConnectionComponents, SyncDriver\n\nconnector = SqlAlchemyConnector(\n    connection_info=ConnectionComponents(\n        driver=SyncDriver.SQLITE_PYSQLITE,\n        database=\"DATABASE-PLACEHOLDER.db\"\n    )\n)\n\nconnector.save(\"BLOCK_NAME-PLACEHOLDER\")\n```\n\n!!! info \"Registering blocks\"\n\n    Register blocks in this module to\n    [view and edit them](https://orion-docs.prefect.io/ui/blocks/)\n    on Prefect Cloud:\n\n    ```bash\n    prefect block register -m prefect_sqlalchemy\n    ```\n\n### Feedback\n\nIf you encounter any bugs while using `prefect-sqlalchemy`, please open an issue in the [prefect](https://github.com/PrefectHQ/prefect) repository.\n\nIf you have any questions or issues while using `prefect-sqlalchemy`, you can find help in the [Prefect Community Slack ](https://prefect.io/slack).\n\n\n### Contributing\n\nIf you'd like to help contribute to fix an issue or add a feature to `prefect-sqlalchemy`, please [propose changes through a pull request from a fork of the repository](https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/proposing-changes-to-your-work-with-pull-requests/creating-a-pull-request-from-a-fork).\n\nHere are the steps:\n\n1. [Fork the repository](https://docs.github.com/en/get-started/quickstart/fork-a-repo#forking-a-repository)\n2. [Clone the forked repository](https://docs.github.com/en/get-started/quickstart/fork-a-repo#cloning-your-forked-repository)\n3. Install the repository and its dependencies:\n```\npip install -e \".[dev]\"\n```\n4. Make desired changes\n5. Add tests\n6. Install `pre-commit` to perform quality checks prior to commit:\n```\npre-commit install\n```\n7. `git commit`, `git push`, and create a pull request\n",
    "bugtrack_url": null,
    "license": "Apache License 2.0",
    "summary": "Prefect integrations for working with databases",
    "version": "0.4.1",
    "project_urls": {
        "Homepage": "https://github.com/PrefectHQ/prefect/tree/main/src/integrations/prefect-sqlalchemy"
    },
    "split_keywords": [
        "prefect"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "ecc3d08622cd14ede0b2d3593ed89d80f20afd60d61fcf00123c8d186f7f6bd8",
                "md5": "4c017bf437008a9f484d216e9e2b4297",
                "sha256": "0ab21168ad5a7562d87392a11f20118843aa99dd0ed09c57d8ca0e28732f0909"
            },
            "downloads": -1,
            "filename": "prefect_sqlalchemy-0.4.1-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "4c017bf437008a9f484d216e9e2b4297",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.8",
            "size": 15595,
            "upload_time": "2024-04-25T16:20:15",
            "upload_time_iso_8601": "2024-04-25T16:20:15.300345Z",
            "url": "https://files.pythonhosted.org/packages/ec/c3/d08622cd14ede0b2d3593ed89d80f20afd60d61fcf00123c8d186f7f6bd8/prefect_sqlalchemy-0.4.1-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "4484d99bc107ed90b298894ed13d64221871348ceb85d474777c20c68a1ee230",
                "md5": "7534cd830ba53747e2367bed5e7931c7",
                "sha256": "489984880ca1e3b71f41768d18c4d272d61675523097805d888055b77e38e238"
            },
            "downloads": -1,
            "filename": "prefect_sqlalchemy-0.4.1.tar.gz",
            "has_sig": false,
            "md5_digest": "7534cd830ba53747e2367bed5e7931c7",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8",
            "size": 21397,
            "upload_time": "2024-04-25T16:20:17",
            "upload_time_iso_8601": "2024-04-25T16:20:17.046520Z",
            "url": "https://files.pythonhosted.org/packages/44/84/d99bc107ed90b298894ed13d64221871348ceb85d474777c20c68a1ee230/prefect_sqlalchemy-0.4.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-04-25 16:20:17",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "PrefectHQ",
    "github_project": "prefect",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "circle": true,
    "requirements": [],
    "lcname": "prefect-sqlalchemy"
}
        
Elapsed time: 0.24745s