databend-sqlalchemy


Namedatabend-sqlalchemy JSON
Version 0.5.2 PyPI version JSON
download
home_pagehttps://github.com/databendlabs/databend-sqlalchemy
SummarySqlalchemy adapter for Databend
upload_time2025-07-09 08:40:06
maintainerNone
docs_urlNone
authorDatabend Cloud
requires_python>=3.8
licenseApache-2.0
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            databend-sqlalchemy
===================

Databend dialect for SQLAlchemy.

Installation
------------

The package is installable through PIP::

    pip install databend-sqlalchemy

Usage
-----

The DSN format is similar to that of regular Postgres::

        from sqlalchemy import create_engine, text
        from sqlalchemy.engine.base import Connection, Engine
        engine = create_engine(
            f"databend://{username}:{password}@{host_port_name}/{database_name}?sslmode=disable"
        )
        connection = engine.connect()
        result = connection.execute(text("SELECT 1"))
        assert len(result.fetchall()) == 1

        import connector
        cursor = connector.connect('databend://root:@localhost:8000?sslmode=disable').cursor()
        cursor.execute('SELECT * FROM test')
        # print(cursor.fetchone())
        # print(cursor.fetchall())
        for row in cursor:
            print(row)


Merge Command Support
---------------------

Databend SQLAlchemy supports upserts via its `Merge` custom expression.
See [Merge](https://docs.databend.com/sql/sql-commands/dml/dml-merge) for full documentation.

The Merge command can be used as below::

        from sqlalchemy.orm import sessionmaker
        from sqlalchemy import MetaData, create_engine
        from databend_sqlalchemy.databend_dialect import Merge

        engine = create_engine(db.url, echo=False)
        session = sessionmaker(bind=engine)()
        connection = engine.connect()

        meta = MetaData()
        meta.reflect(bind=session.bind)
        t1 = meta.tables['t1']
        t2 = meta.tables['t2']

        merge = Merge(target=t1, source=t2, on=t1.c.t1key == t2.c.t2key)
        merge.when_matched_then_delete().where(t2.c.marked == 1)
        merge.when_matched_then_update().where(t2.c.isnewstatus == 1).values(val = t2.c.newval, status=t2.c.newstatus)
        merge.when_matched_then_update().values(val=t2.c.newval)
        merge.when_not_matched_then_insert().values(val=t2.c.newval, status=t2.c.newstatus)
        connection.execute(merge)


Copy Into Command Support
---------------------

Databend SQLAlchemy supports copy into operations through it's CopyIntoTable and CopyIntoLocation methods
See [CopyIntoLocation](https://docs.databend.com/sql/sql-commands/dml/dml-copy-into-location) or [CopyIntoTable](https://docs.databend.com/sql/sql-commands/dml/dml-copy-into-table) for full documentation.

The CopyIntoTable command can be used as below::

        from sqlalchemy.orm import sessionmaker
        from sqlalchemy import MetaData, create_engine
        from databend_sqlalchemy import (
            CopyIntoTable, GoogleCloudStorage, ParquetFormat, CopyIntoTableOptions,
            FileColumnClause, CSVFormat,
        )

        engine = create_engine(db.url, echo=False)
        session = sessionmaker(bind=engine)()
        connection = engine.connect()

        meta = MetaData()
        meta.reflect(bind=session.bind)
        t1 = meta.tables['t1']
        t2 = meta.tables['t2']
        gcs_private_key = 'full_gcs_json_private_key'
        case_sensitive_columns = True

        copy_into = CopyIntoTable(
            target=t1,
            from_=GoogleCloudStorage(
                uri='gcs://bucket-name/path/to/file',
                credentials=base64.b64encode(gcs_private_key.encode()).decode(),
            ),
            file_format=ParquetFormat(),
            options=CopyIntoTableOptions(
                force=True,
                column_match_mode='CASE_SENSITIVE' if case_sensitive_columns else None,
            )
        )
        result = connection.execute(copy_into)
        result.fetchall()  # always call fetchall() to ensure the cursor executes to completion

        # More involved example with column selection clause that can be altered to perform operations on the columns during import.

        copy_into = CopyIntoTable(
            target=t2,
            from_=FileColumnClause(
                columns=', '.join([
                    f'${index + 1}'
                    for index, column in enumerate(t2.columns)
                ]),
                from_=GoogleCloudStorage(
                    uri='gcs://bucket-name/path/to/file',
                    credentials=base64.b64encode(gcs_private_key.encode()).decode(),
                )
            ),
            pattern='*.*',
            file_format=CSVFormat(
                record_delimiter='\n',
                field_delimiter=',',
                quote='"',
                escape='',
                skip_header=1,
                empty_field_as='NULL',
                compression=Compression.AUTO,
            ),
            options=CopyIntoTableOptions(
                force=True,
            )
        )
        result = connection.execute(copy_into)
        result.fetchall()  # always call fetchall() to ensure the cursor executes to completion

The CopyIntoLocation command can be used as below::

        from sqlalchemy.orm import sessionmaker
        from sqlalchemy import MetaData, create_engine
        from databend_sqlalchemy import (
            CopyIntoLocation, GoogleCloudStorage, ParquetFormat, CopyIntoLocationOptions,
        )

        engine = create_engine(db.url, echo=False)
        session = sessionmaker(bind=engine)()
        connection = engine.connect()

        meta = MetaData()
        meta.reflect(bind=session.bind)
        t1 = meta.tables['t1']
        gcs_private_key = 'full_gcs_json_private_key'

        copy_into = CopyIntoLocation(
            target=GoogleCloudStorage(
                uri='gcs://bucket-name/path/to/target_file',
                credentials=base64.b64encode(gcs_private_key.encode()).decode(),
            ),
            from_=select(t1).where(t1.c['col1'] == 1),
            file_format=ParquetFormat(),
            options=CopyIntoLocationOptions(
                single=True,
                overwrite=True,
                include_query_id=False,
                use_raw_path=True,
            )
        )
        result = connection.execute(copy_into)
        result.fetchall()  # always call fetchall() to ensure the cursor executes to completion

Table Options
---------------------

Databend SQLAlchemy supports databend specific table options for Engine, Cluster Keys and Transient tables

The table options can be used as below::

        from sqlalchemy import Table, Column
        from sqlalchemy import MetaData, create_engine

        engine = create_engine(db.url, echo=False)

        meta = MetaData()
        # Example of Transient Table
        t_transient = Table(
            "t_transient",
            meta,
            Column("c1", Integer),
            databend_transient=True,
        )

        # Example of Engine
        t_engine = Table(
            "t_engine",
            meta,
            Column("c1", Integer),
            databend_engine='Memory',
        )

        # Examples of Table with Cluster Keys
        t_cluster_1 = Table(
            "t_cluster_1",
            meta,
            Column("c1", Integer),
            databend_cluster_by=[c1],
        )
        #
        c = Column("id", Integer)
        c2 = Column("Name", String)
        t_cluster_2 = Table(
            't_cluster_2',
            meta,
            c,
            c2,
            databend_cluster_by=[cast(c, String), c2],
        )

        meta.create_all(engine)



Compatibility
---------------

- If databend version >= v0.9.0 or later, you need to use databend-sqlalchemy version >= v0.1.0.
- The databend-sqlalchemy use [databend-py](https://github.com/databendlabs/databend-py) as internal driver when version < v0.4.0, but when version >= v0.4.0 it use [databend driver python binding](https://github.com/databendlabs/bendsql/blob/main/bindings/python/README.md) as internal driver. The only difference between the two is that the connection parameters provided in the DSN are different. When using the corresponding version, you should refer to the connection parameters provided by the corresponding Driver.

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/databendlabs/databend-sqlalchemy",
    "name": "databend-sqlalchemy",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.8",
    "maintainer_email": null,
    "keywords": null,
    "author": "Databend Cloud",
    "author_email": "hanshanjie@databend.com",
    "download_url": "https://files.pythonhosted.org/packages/60/a6/be36a0aba7cda4460e2e4074567f8ebede3a5ac3e82b5c41a80bb85c99f5/databend_sqlalchemy-0.5.2.tar.gz",
    "platform": null,
    "description": "databend-sqlalchemy\n===================\n\nDatabend dialect for SQLAlchemy.\n\nInstallation\n------------\n\nThe package is installable through PIP::\n\n    pip install databend-sqlalchemy\n\nUsage\n-----\n\nThe DSN format is similar to that of regular Postgres::\n\n        from sqlalchemy import create_engine, text\n        from sqlalchemy.engine.base import Connection, Engine\n        engine = create_engine(\n            f\"databend://{username}:{password}@{host_port_name}/{database_name}?sslmode=disable\"\n        )\n        connection = engine.connect()\n        result = connection.execute(text(\"SELECT 1\"))\n        assert len(result.fetchall()) == 1\n\n        import connector\n        cursor = connector.connect('databend://root:@localhost:8000?sslmode=disable').cursor()\n        cursor.execute('SELECT * FROM test')\n        # print(cursor.fetchone())\n        # print(cursor.fetchall())\n        for row in cursor:\n            print(row)\n\n\nMerge Command Support\n---------------------\n\nDatabend SQLAlchemy supports upserts via its `Merge` custom expression.\nSee [Merge](https://docs.databend.com/sql/sql-commands/dml/dml-merge) for full documentation.\n\nThe Merge command can be used as below::\n\n        from sqlalchemy.orm import sessionmaker\n        from sqlalchemy import MetaData, create_engine\n        from databend_sqlalchemy.databend_dialect import Merge\n\n        engine = create_engine(db.url, echo=False)\n        session = sessionmaker(bind=engine)()\n        connection = engine.connect()\n\n        meta = MetaData()\n        meta.reflect(bind=session.bind)\n        t1 = meta.tables['t1']\n        t2 = meta.tables['t2']\n\n        merge = Merge(target=t1, source=t2, on=t1.c.t1key == t2.c.t2key)\n        merge.when_matched_then_delete().where(t2.c.marked == 1)\n        merge.when_matched_then_update().where(t2.c.isnewstatus == 1).values(val = t2.c.newval, status=t2.c.newstatus)\n        merge.when_matched_then_update().values(val=t2.c.newval)\n        merge.when_not_matched_then_insert().values(val=t2.c.newval, status=t2.c.newstatus)\n        connection.execute(merge)\n\n\nCopy Into Command Support\n---------------------\n\nDatabend SQLAlchemy supports copy into operations through it's CopyIntoTable and CopyIntoLocation methods\nSee [CopyIntoLocation](https://docs.databend.com/sql/sql-commands/dml/dml-copy-into-location) or [CopyIntoTable](https://docs.databend.com/sql/sql-commands/dml/dml-copy-into-table) for full documentation.\n\nThe CopyIntoTable command can be used as below::\n\n        from sqlalchemy.orm import sessionmaker\n        from sqlalchemy import MetaData, create_engine\n        from databend_sqlalchemy import (\n            CopyIntoTable, GoogleCloudStorage, ParquetFormat, CopyIntoTableOptions,\n            FileColumnClause, CSVFormat,\n        )\n\n        engine = create_engine(db.url, echo=False)\n        session = sessionmaker(bind=engine)()\n        connection = engine.connect()\n\n        meta = MetaData()\n        meta.reflect(bind=session.bind)\n        t1 = meta.tables['t1']\n        t2 = meta.tables['t2']\n        gcs_private_key = 'full_gcs_json_private_key'\n        case_sensitive_columns = True\n\n        copy_into = CopyIntoTable(\n            target=t1,\n            from_=GoogleCloudStorage(\n                uri='gcs://bucket-name/path/to/file',\n                credentials=base64.b64encode(gcs_private_key.encode()).decode(),\n            ),\n            file_format=ParquetFormat(),\n            options=CopyIntoTableOptions(\n                force=True,\n                column_match_mode='CASE_SENSITIVE' if case_sensitive_columns else None,\n            )\n        )\n        result = connection.execute(copy_into)\n        result.fetchall()  # always call fetchall() to ensure the cursor executes to completion\n\n        # More involved example with column selection clause that can be altered to perform operations on the columns during import.\n\n        copy_into = CopyIntoTable(\n            target=t2,\n            from_=FileColumnClause(\n                columns=', '.join([\n                    f'${index + 1}'\n                    for index, column in enumerate(t2.columns)\n                ]),\n                from_=GoogleCloudStorage(\n                    uri='gcs://bucket-name/path/to/file',\n                    credentials=base64.b64encode(gcs_private_key.encode()).decode(),\n                )\n            ),\n            pattern='*.*',\n            file_format=CSVFormat(\n                record_delimiter='\\n',\n                field_delimiter=',',\n                quote='\"',\n                escape='',\n                skip_header=1,\n                empty_field_as='NULL',\n                compression=Compression.AUTO,\n            ),\n            options=CopyIntoTableOptions(\n                force=True,\n            )\n        )\n        result = connection.execute(copy_into)\n        result.fetchall()  # always call fetchall() to ensure the cursor executes to completion\n\nThe CopyIntoLocation command can be used as below::\n\n        from sqlalchemy.orm import sessionmaker\n        from sqlalchemy import MetaData, create_engine\n        from databend_sqlalchemy import (\n            CopyIntoLocation, GoogleCloudStorage, ParquetFormat, CopyIntoLocationOptions,\n        )\n\n        engine = create_engine(db.url, echo=False)\n        session = sessionmaker(bind=engine)()\n        connection = engine.connect()\n\n        meta = MetaData()\n        meta.reflect(bind=session.bind)\n        t1 = meta.tables['t1']\n        gcs_private_key = 'full_gcs_json_private_key'\n\n        copy_into = CopyIntoLocation(\n            target=GoogleCloudStorage(\n                uri='gcs://bucket-name/path/to/target_file',\n                credentials=base64.b64encode(gcs_private_key.encode()).decode(),\n            ),\n            from_=select(t1).where(t1.c['col1'] == 1),\n            file_format=ParquetFormat(),\n            options=CopyIntoLocationOptions(\n                single=True,\n                overwrite=True,\n                include_query_id=False,\n                use_raw_path=True,\n            )\n        )\n        result = connection.execute(copy_into)\n        result.fetchall()  # always call fetchall() to ensure the cursor executes to completion\n\nTable Options\n---------------------\n\nDatabend SQLAlchemy supports databend specific table options for Engine, Cluster Keys and Transient tables\n\nThe table options can be used as below::\n\n        from sqlalchemy import Table, Column\n        from sqlalchemy import MetaData, create_engine\n\n        engine = create_engine(db.url, echo=False)\n\n        meta = MetaData()\n        # Example of Transient Table\n        t_transient = Table(\n            \"t_transient\",\n            meta,\n            Column(\"c1\", Integer),\n            databend_transient=True,\n        )\n\n        # Example of Engine\n        t_engine = Table(\n            \"t_engine\",\n            meta,\n            Column(\"c1\", Integer),\n            databend_engine='Memory',\n        )\n\n        # Examples of Table with Cluster Keys\n        t_cluster_1 = Table(\n            \"t_cluster_1\",\n            meta,\n            Column(\"c1\", Integer),\n            databend_cluster_by=[c1],\n        )\n        #\n        c = Column(\"id\", Integer)\n        c2 = Column(\"Name\", String)\n        t_cluster_2 = Table(\n            't_cluster_2',\n            meta,\n            c,\n            c2,\n            databend_cluster_by=[cast(c, String), c2],\n        )\n\n        meta.create_all(engine)\n\n\n\nCompatibility\n---------------\n\n- If databend version >= v0.9.0 or later, you need to use databend-sqlalchemy version >= v0.1.0.\n- The databend-sqlalchemy use [databend-py](https://github.com/databendlabs/databend-py) as internal driver when version < v0.4.0, but when version >= v0.4.0 it use [databend driver python binding](https://github.com/databendlabs/bendsql/blob/main/bindings/python/README.md) as internal driver. The only difference between the two is that the connection parameters provided in the DSN are different. When using the corresponding version, you should refer to the connection parameters provided by the corresponding Driver.\n",
    "bugtrack_url": null,
    "license": "Apache-2.0",
    "summary": "Sqlalchemy adapter for Databend",
    "version": "0.5.2",
    "project_urls": {
        "Bug Tracker": "https://github.com/databendlabs/databend-sqlalchemy",
        "Homepage": "https://github.com/databendlabs/databend-sqlalchemy"
    },
    "split_keywords": [],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "f1ecd97b0cd7dace8fae125905c370bc76d5d74be4eef88e51fb44e33cf852c7",
                "md5": "204a621f795e0c83aaac027b7ce986ae",
                "sha256": "a391a75fd162778c7c373d9c846944eb7b45eca523b127e7ea3709781fae56d3"
            },
            "downloads": -1,
            "filename": "databend_sqlalchemy-0.5.2-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "204a621f795e0c83aaac027b7ce986ae",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.8",
            "size": 33379,
            "upload_time": "2025-07-09T08:40:02",
            "upload_time_iso_8601": "2025-07-09T08:40:02.901554Z",
            "url": "https://files.pythonhosted.org/packages/f1/ec/d97b0cd7dace8fae125905c370bc76d5d74be4eef88e51fb44e33cf852c7/databend_sqlalchemy-0.5.2-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "60a6be36a0aba7cda4460e2e4074567f8ebede3a5ac3e82b5c41a80bb85c99f5",
                "md5": "63e0b807ffbbf86f57ee5d38cfebeaac",
                "sha256": "968223ad901ec118d75d5cfd63652b61e1bf8aacfb073d9f9bc2588925abedfd"
            },
            "downloads": -1,
            "filename": "databend_sqlalchemy-0.5.2.tar.gz",
            "has_sig": false,
            "md5_digest": "63e0b807ffbbf86f57ee5d38cfebeaac",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8",
            "size": 40851,
            "upload_time": "2025-07-09T08:40:06",
            "upload_time_iso_8601": "2025-07-09T08:40:06.684214Z",
            "url": "https://files.pythonhosted.org/packages/60/a6/be36a0aba7cda4460e2e4074567f8ebede3a5ac3e82b5c41a80bb85c99f5/databend_sqlalchemy-0.5.2.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-07-09 08:40:06",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "databendlabs",
    "github_project": "databend-sqlalchemy",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "databend-sqlalchemy"
}
        
Elapsed time: 0.71994s