# beam-postgres
[![PyPI](https://img.shields.io/pypi/v/beam-postgres.svg)][pypi-project]
[![Supported Versions](https://img.shields.io/pypi/pyversions/beam-postgres.svg)][pypi-project]
Light IO transforms for Postgres read/write in Apache Beam pipelines.
## Goal
The project aims to provide highly performant and customizable transforms and is
not intended to support many different SQL database engines.
## Features
- `ReadAllFromPostgres`, `ReadFromPostgres`` and `WriteToPostgres` transforms
- Records can be mapped to tuples, dictionaries or dataclasses
- Reads and writes are in configurable batches
## Usage
Printing data from the database table:
```python
import apache_beam as beam
from psycopg.rows import dict_row
from beam_postgres.io import ReadAllFromPostgres
with beam.Pipeline() as p:
data = p | "Reading example records from database" >> ReadAllFromPostgres(
"host=localhost dbname=examples user=postgres password=postgres",
"select id, data from source",
dict_row,
)
data | "Writing to stdout" >> beam.Map(print)
```
Writing data to the database table:
```python
from dataclasses import dataclass
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_postgres.io import WriteToPostgres
@dataclass
class Example:
data: str
with beam.Pipeline(options=PipelineOptions()) as p:
data = p | "Reading example records" >> beam.Create(
[
Example("example1"),
Example("example2"),
]
)
data | "Writing example records to database" >> WriteToPostgres(
"host=localhost dbname=examples user=postgres password=postgres",
"insert into sink (data) values (%s)",
)
```
See [here][examples] for more examples.
### Reading in batches
There may be situations when you have so much data that it will not fit into the
memory - then you want to read your table data in batches. You can see an
example code [here](examples/read.py#L11) (the code reads records in a batches of
1).
[pypi-project]: https://pypi.org/project/beam-postgres
[examples]: https://github.com/medzin/beam-postgres/tree/main/examples
Raw data
{
"_id": null,
"home_page": null,
"name": "beam-postgres",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.8",
"maintainer_email": null,
"keywords": "apache beam, beam, postgres, postgresql",
"author": "Adam Medzi\u0144ski",
"author_email": null,
"download_url": "https://files.pythonhosted.org/packages/f3/e7/97478823e7a5d0c7c65fc00eb11121cc12f4e23b1709554831b159818fdc/beam-postgres-0.4.1.tar.gz",
"platform": null,
"description": "# beam-postgres\n\n[![PyPI](https://img.shields.io/pypi/v/beam-postgres.svg)][pypi-project]\n[![Supported Versions](https://img.shields.io/pypi/pyversions/beam-postgres.svg)][pypi-project]\n\nLight IO transforms for Postgres read/write in Apache Beam pipelines.\n\n## Goal\n\nThe project aims to provide highly performant and customizable transforms and is\nnot intended to support many different SQL database engines.\n\n## Features\n\n- `ReadAllFromPostgres`, `ReadFromPostgres`` and `WriteToPostgres` transforms\n- Records can be mapped to tuples, dictionaries or dataclasses\n- Reads and writes are in configurable batches\n\n## Usage\n\nPrinting data from the database table:\n\n```python\nimport apache_beam as beam\nfrom psycopg.rows import dict_row\n\nfrom beam_postgres.io import ReadAllFromPostgres\n\nwith beam.Pipeline() as p:\n data = p | \"Reading example records from database\" >> ReadAllFromPostgres(\n \"host=localhost dbname=examples user=postgres password=postgres\",\n \"select id, data from source\",\n dict_row,\n )\n data | \"Writing to stdout\" >> beam.Map(print)\n\n```\n\nWriting data to the database table:\n\n```python\nfrom dataclasses import dataclass\n\nimport apache_beam as beam\nfrom apache_beam.options.pipeline_options import PipelineOptions\n\nfrom beam_postgres.io import WriteToPostgres\n\n\n@dataclass\nclass Example:\n data: str\n\n\nwith beam.Pipeline(options=PipelineOptions()) as p:\n data = p | \"Reading example records\" >> beam.Create(\n [\n Example(\"example1\"),\n Example(\"example2\"),\n ]\n )\n data | \"Writing example records to database\" >> WriteToPostgres(\n \"host=localhost dbname=examples user=postgres password=postgres\",\n \"insert into sink (data) values (%s)\",\n )\n\n```\n\nSee [here][examples] for more examples.\n\n### Reading in batches\n\nThere may be situations when you have so much data that it will not fit into the\nmemory - then you want to read your table data in batches. You can see an\nexample code [here](examples/read.py#L11) (the code reads records in a batches of\n1).\n\n[pypi-project]: https://pypi.org/project/beam-postgres\n[examples]: https://github.com/medzin/beam-postgres/tree/main/examples\n",
"bugtrack_url": null,
"license": "Apache-2.0",
"summary": "Light IO transforms for Postgres read/write in Apache Beam pipelines.",
"version": "0.4.1",
"project_urls": {
"Homepage": "https://github.com/medzin/beam-postgres"
},
"split_keywords": [
"apache beam",
" beam",
" postgres",
" postgresql"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "a73da72457549c731ac9cc86761317a0c1b1a6bdb84c4e05ae1c44517c37bbed",
"md5": "54937a5ca289e38f3f401cf8336a6d80",
"sha256": "1d73cf35506548c665101c7f19c21ce0152ea795cedb1dc5392588a6ff2840e7"
},
"downloads": -1,
"filename": "beam_postgres-0.4.1-py3-none-any.whl",
"has_sig": false,
"md5_digest": "54937a5ca289e38f3f401cf8336a6d80",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.8",
"size": 10537,
"upload_time": "2024-04-04T14:26:56",
"upload_time_iso_8601": "2024-04-04T14:26:56.424231Z",
"url": "https://files.pythonhosted.org/packages/a7/3d/a72457549c731ac9cc86761317a0c1b1a6bdb84c4e05ae1c44517c37bbed/beam_postgres-0.4.1-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "f3e797478823e7a5d0c7c65fc00eb11121cc12f4e23b1709554831b159818fdc",
"md5": "a02d1e16f26b63491e9c17904bacac83",
"sha256": "f5010985e98f864e7f34568bae4961cb5e063788322fa058c692ce1801923b5e"
},
"downloads": -1,
"filename": "beam-postgres-0.4.1.tar.gz",
"has_sig": false,
"md5_digest": "a02d1e16f26b63491e9c17904bacac83",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.8",
"size": 10618,
"upload_time": "2024-04-04T14:26:58",
"upload_time_iso_8601": "2024-04-04T14:26:58.307988Z",
"url": "https://files.pythonhosted.org/packages/f3/e7/97478823e7a5d0c7c65fc00eb11121cc12f4e23b1709554831b159818fdc/beam-postgres-0.4.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-04-04 14:26:58",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "medzin",
"github_project": "beam-postgres",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"tox": true,
"lcname": "beam-postgres"
}