beam-postgres


Namebeam-postgres JSON
Version 0.5.0 PyPI version JSON
download
home_pageNone
SummaryLight IO transforms for Postgres read/write in Apache Beam pipelines.
upload_time2024-07-20 23:05:03
maintainerNone
docs_urlNone
authorAdam Medziński
requires_python>=3.8
licenseApache-2.0
keywords apache beam beam postgres postgresql
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # beam-postgres

[![PyPI](https://img.shields.io/pypi/v/beam-postgres.svg)][pypi-project]
[![Supported Versions](https://img.shields.io/pypi/pyversions/beam-postgres.svg)][pypi-project]

Light IO transforms for Postgres read/write in Apache Beam pipelines.

## Goal

The project aims to provide highly performant and customizable transforms and is
not intended to support many different SQL database engines.

## Features

- `ReadAllFromPostgres`, `ReadFromPostgres`` and `WriteToPostgres` transforms
- Records can be mapped to tuples, dictionaries or dataclasses
- Reads and writes are in configurable batches

## Usage

Printing data from the database table:

```python
import apache_beam as beam
from psycopg.rows import dict_row

from beam_postgres.io import ReadAllFromPostgres

with beam.Pipeline() as p:
    data = p | "Reading example records from database" >> ReadAllFromPostgres(
        "host=localhost dbname=examples user=postgres password=postgres",
        "select id, data from source",
        dict_row,
    )
    data | "Writing to stdout" >> beam.Map(print)

```

Writing data to the database table:

```python
from dataclasses import dataclass

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

from beam_postgres.io import WriteToPostgres


@dataclass
class Example:
    data: str


with beam.Pipeline(options=PipelineOptions()) as p:
    data = p | "Reading example records" >> beam.Create(
        [
            Example("example1"),
            Example("example2"),
        ]
    )
    data | "Writing example records to database" >> WriteToPostgres(
        "host=localhost dbname=examples user=postgres password=postgres",
        "insert into sink (data) values (%(data)s)",
    )

```

See [here][examples] for more examples.

### Reading in batches

There may be situations when you have so much data that it will not fit into the
memory - then you want to read your table data in batches. You can see an
example code [here](examples/read.py#L11) (the code reads records in a batches of
1).

[pypi-project]: https://pypi.org/project/beam-postgres
[examples]: https://github.com/medzin/beam-postgres/tree/main/examples

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "beam-postgres",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.8",
    "maintainer_email": null,
    "keywords": "apache beam, beam, postgres, postgresql",
    "author": "Adam Medzi\u0144ski",
    "author_email": null,
    "download_url": "https://files.pythonhosted.org/packages/4f/19/1527716ebffe14a84b55e90c50a4980103623d02489a1d3eca082de2c188/beam_postgres-0.5.0.tar.gz",
    "platform": null,
    "description": "# beam-postgres\n\n[![PyPI](https://img.shields.io/pypi/v/beam-postgres.svg)][pypi-project]\n[![Supported Versions](https://img.shields.io/pypi/pyversions/beam-postgres.svg)][pypi-project]\n\nLight IO transforms for Postgres read/write in Apache Beam pipelines.\n\n## Goal\n\nThe project aims to provide highly performant and customizable transforms and is\nnot intended to support many different SQL database engines.\n\n## Features\n\n- `ReadAllFromPostgres`, `ReadFromPostgres`` and `WriteToPostgres` transforms\n- Records can be mapped to tuples, dictionaries or dataclasses\n- Reads and writes are in configurable batches\n\n## Usage\n\nPrinting data from the database table:\n\n```python\nimport apache_beam as beam\nfrom psycopg.rows import dict_row\n\nfrom beam_postgres.io import ReadAllFromPostgres\n\nwith beam.Pipeline() as p:\n    data = p | \"Reading example records from database\" >> ReadAllFromPostgres(\n        \"host=localhost dbname=examples user=postgres password=postgres\",\n        \"select id, data from source\",\n        dict_row,\n    )\n    data | \"Writing to stdout\" >> beam.Map(print)\n\n```\n\nWriting data to the database table:\n\n```python\nfrom dataclasses import dataclass\n\nimport apache_beam as beam\nfrom apache_beam.options.pipeline_options import PipelineOptions\n\nfrom beam_postgres.io import WriteToPostgres\n\n\n@dataclass\nclass Example:\n    data: str\n\n\nwith beam.Pipeline(options=PipelineOptions()) as p:\n    data = p | \"Reading example records\" >> beam.Create(\n        [\n            Example(\"example1\"),\n            Example(\"example2\"),\n        ]\n    )\n    data | \"Writing example records to database\" >> WriteToPostgres(\n        \"host=localhost dbname=examples user=postgres password=postgres\",\n        \"insert into sink (data) values (%(data)s)\",\n    )\n\n```\n\nSee [here][examples] for more examples.\n\n### Reading in batches\n\nThere may be situations when you have so much data that it will not fit into the\nmemory - then you want to read your table data in batches. You can see an\nexample code [here](examples/read.py#L11) (the code reads records in a batches of\n1).\n\n[pypi-project]: https://pypi.org/project/beam-postgres\n[examples]: https://github.com/medzin/beam-postgres/tree/main/examples\n",
    "bugtrack_url": null,
    "license": "Apache-2.0",
    "summary": "Light IO transforms for Postgres read/write in Apache Beam pipelines.",
    "version": "0.5.0",
    "project_urls": {
        "Homepage": "https://github.com/medzin/beam-postgres"
    },
    "split_keywords": [
        "apache beam",
        " beam",
        " postgres",
        " postgresql"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "017858b3f2add8c7a39f9dcedc4963a40b950541e3089dbd25e4c669eff8bc77",
                "md5": "2cf36b8af8b5304f519288c8d0ea019d",
                "sha256": "9d7a17a6b3b283ce5a2b5d5ec4c2f779b0bdd9ab8961299fb483dd67985fc36f"
            },
            "downloads": -1,
            "filename": "beam_postgres-0.5.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "2cf36b8af8b5304f519288c8d0ea019d",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.8",
            "size": 10552,
            "upload_time": "2024-07-20T23:05:02",
            "upload_time_iso_8601": "2024-07-20T23:05:02.233503Z",
            "url": "https://files.pythonhosted.org/packages/01/78/58b3f2add8c7a39f9dcedc4963a40b950541e3089dbd25e4c669eff8bc77/beam_postgres-0.5.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "4f191527716ebffe14a84b55e90c50a4980103623d02489a1d3eca082de2c188",
                "md5": "8c1ab88a6d4237b172e941baa7bef36b",
                "sha256": "d8172b39a3670046864ba1fd7ff10e18c232dc3816054a5ec22c85e23a9e01e1"
            },
            "downloads": -1,
            "filename": "beam_postgres-0.5.0.tar.gz",
            "has_sig": false,
            "md5_digest": "8c1ab88a6d4237b172e941baa7bef36b",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8",
            "size": 10534,
            "upload_time": "2024-07-20T23:05:03",
            "upload_time_iso_8601": "2024-07-20T23:05:03.789719Z",
            "url": "https://files.pythonhosted.org/packages/4f/19/1527716ebffe14a84b55e90c50a4980103623d02489a1d3eca082de2c188/beam_postgres-0.5.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-07-20 23:05:03",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "medzin",
    "github_project": "beam-postgres",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "tox": true,
    "lcname": "beam-postgres"
}
        
Elapsed time: 0.30205s