beam-postgres


Namebeam-postgres JSON
Version 0.4.1 PyPI version JSON
download
home_pageNone
SummaryLight IO transforms for Postgres read/write in Apache Beam pipelines.
upload_time2024-04-04 14:26:58
maintainerNone
docs_urlNone
authorAdam Medziński
requires_python>=3.8
licenseApache-2.0
keywords apache beam beam postgres postgresql
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # beam-postgres

[![PyPI](https://img.shields.io/pypi/v/beam-postgres.svg)][pypi-project]
[![Supported Versions](https://img.shields.io/pypi/pyversions/beam-postgres.svg)][pypi-project]

Light IO transforms for Postgres read/write in Apache Beam pipelines.

## Goal

The project aims to provide highly performant and customizable transforms and is
not intended to support many different SQL database engines.

## Features

- `ReadAllFromPostgres`, `ReadFromPostgres`` and `WriteToPostgres` transforms
- Records can be mapped to tuples, dictionaries or dataclasses
- Reads and writes are in configurable batches

## Usage

Printing data from the database table:

```python
import apache_beam as beam
from psycopg.rows import dict_row

from beam_postgres.io import ReadAllFromPostgres

with beam.Pipeline() as p:
    data = p | "Reading example records from database" >> ReadAllFromPostgres(
        "host=localhost dbname=examples user=postgres password=postgres",
        "select id, data from source",
        dict_row,
    )
    data | "Writing to stdout" >> beam.Map(print)

```

Writing data to the database table:

```python
from dataclasses import dataclass

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

from beam_postgres.io import WriteToPostgres


@dataclass
class Example:
    data: str


with beam.Pipeline(options=PipelineOptions()) as p:
    data = p | "Reading example records" >> beam.Create(
        [
            Example("example1"),
            Example("example2"),
        ]
    )
    data | "Writing example records to database" >> WriteToPostgres(
        "host=localhost dbname=examples user=postgres password=postgres",
        "insert into sink (data) values (%s)",
    )

```

See [here][examples] for more examples.

### Reading in batches

There may be situations when you have so much data that it will not fit into the
memory - then you want to read your table data in batches. You can see an
example code [here](examples/read.py#L11) (the code reads records in a batches of
1).

[pypi-project]: https://pypi.org/project/beam-postgres
[examples]: https://github.com/medzin/beam-postgres/tree/main/examples

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "beam-postgres",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.8",
    "maintainer_email": null,
    "keywords": "apache beam, beam, postgres, postgresql",
    "author": "Adam Medzi\u0144ski",
    "author_email": null,
    "download_url": "https://files.pythonhosted.org/packages/f3/e7/97478823e7a5d0c7c65fc00eb11121cc12f4e23b1709554831b159818fdc/beam-postgres-0.4.1.tar.gz",
    "platform": null,
    "description": "# beam-postgres\n\n[![PyPI](https://img.shields.io/pypi/v/beam-postgres.svg)][pypi-project]\n[![Supported Versions](https://img.shields.io/pypi/pyversions/beam-postgres.svg)][pypi-project]\n\nLight IO transforms for Postgres read/write in Apache Beam pipelines.\n\n## Goal\n\nThe project aims to provide highly performant and customizable transforms and is\nnot intended to support many different SQL database engines.\n\n## Features\n\n- `ReadAllFromPostgres`, `ReadFromPostgres`` and `WriteToPostgres` transforms\n- Records can be mapped to tuples, dictionaries or dataclasses\n- Reads and writes are in configurable batches\n\n## Usage\n\nPrinting data from the database table:\n\n```python\nimport apache_beam as beam\nfrom psycopg.rows import dict_row\n\nfrom beam_postgres.io import ReadAllFromPostgres\n\nwith beam.Pipeline() as p:\n    data = p | \"Reading example records from database\" >> ReadAllFromPostgres(\n        \"host=localhost dbname=examples user=postgres password=postgres\",\n        \"select id, data from source\",\n        dict_row,\n    )\n    data | \"Writing to stdout\" >> beam.Map(print)\n\n```\n\nWriting data to the database table:\n\n```python\nfrom dataclasses import dataclass\n\nimport apache_beam as beam\nfrom apache_beam.options.pipeline_options import PipelineOptions\n\nfrom beam_postgres.io import WriteToPostgres\n\n\n@dataclass\nclass Example:\n    data: str\n\n\nwith beam.Pipeline(options=PipelineOptions()) as p:\n    data = p | \"Reading example records\" >> beam.Create(\n        [\n            Example(\"example1\"),\n            Example(\"example2\"),\n        ]\n    )\n    data | \"Writing example records to database\" >> WriteToPostgres(\n        \"host=localhost dbname=examples user=postgres password=postgres\",\n        \"insert into sink (data) values (%s)\",\n    )\n\n```\n\nSee [here][examples] for more examples.\n\n### Reading in batches\n\nThere may be situations when you have so much data that it will not fit into the\nmemory - then you want to read your table data in batches. You can see an\nexample code [here](examples/read.py#L11) (the code reads records in a batches of\n1).\n\n[pypi-project]: https://pypi.org/project/beam-postgres\n[examples]: https://github.com/medzin/beam-postgres/tree/main/examples\n",
    "bugtrack_url": null,
    "license": "Apache-2.0",
    "summary": "Light IO transforms for Postgres read/write in Apache Beam pipelines.",
    "version": "0.4.1",
    "project_urls": {
        "Homepage": "https://github.com/medzin/beam-postgres"
    },
    "split_keywords": [
        "apache beam",
        " beam",
        " postgres",
        " postgresql"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "a73da72457549c731ac9cc86761317a0c1b1a6bdb84c4e05ae1c44517c37bbed",
                "md5": "54937a5ca289e38f3f401cf8336a6d80",
                "sha256": "1d73cf35506548c665101c7f19c21ce0152ea795cedb1dc5392588a6ff2840e7"
            },
            "downloads": -1,
            "filename": "beam_postgres-0.4.1-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "54937a5ca289e38f3f401cf8336a6d80",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.8",
            "size": 10537,
            "upload_time": "2024-04-04T14:26:56",
            "upload_time_iso_8601": "2024-04-04T14:26:56.424231Z",
            "url": "https://files.pythonhosted.org/packages/a7/3d/a72457549c731ac9cc86761317a0c1b1a6bdb84c4e05ae1c44517c37bbed/beam_postgres-0.4.1-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "f3e797478823e7a5d0c7c65fc00eb11121cc12f4e23b1709554831b159818fdc",
                "md5": "a02d1e16f26b63491e9c17904bacac83",
                "sha256": "f5010985e98f864e7f34568bae4961cb5e063788322fa058c692ce1801923b5e"
            },
            "downloads": -1,
            "filename": "beam-postgres-0.4.1.tar.gz",
            "has_sig": false,
            "md5_digest": "a02d1e16f26b63491e9c17904bacac83",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8",
            "size": 10618,
            "upload_time": "2024-04-04T14:26:58",
            "upload_time_iso_8601": "2024-04-04T14:26:58.307988Z",
            "url": "https://files.pythonhosted.org/packages/f3/e7/97478823e7a5d0c7c65fc00eb11121cc12f4e23b1709554831b159818fdc/beam-postgres-0.4.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-04-04 14:26:58",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "medzin",
    "github_project": "beam-postgres",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "tox": true,
    "lcname": "beam-postgres"
}
        
Elapsed time: 0.21862s