# beam-postgres
[data:image/s3,"s3://crabby-images/edcf0/edcf0ee34c5d055509cc152d8e623c436643db6a" alt="PyPI"][pypi-project]
[data:image/s3,"s3://crabby-images/4e4ab/4e4abf729b5b0661016774be0873a919326442d3" alt="Supported Versions"][pypi-project]
Light IO transforms for Postgres read/write in Apache Beam pipelines.
## Goal
The project aims to provide highly performant and customizable transforms and is
not intended to support many different SQL database engines.
## Features
- `ReadAllFromPostgres`, `ReadFromPostgres`` and `WriteToPostgres` transforms
- Records can be mapped to tuples, dictionaries or dataclasses
- Reads and writes are in configurable batches
## Usage
Printing data from the database table:
```python
import apache_beam as beam
from psycopg.rows import dict_row
from beam_postgres.io import ReadAllFromPostgres
with beam.Pipeline() as p:
data = p | "Reading example records from database" >> ReadAllFromPostgres(
"host=localhost dbname=examples user=postgres password=postgres",
"select id, data from source",
dict_row,
)
data | "Writing to stdout" >> beam.Map(print)
```
Writing data to the database table:
```python
from dataclasses import dataclass
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_postgres.io import WriteToPostgres
@dataclass
class Example:
data: str
with beam.Pipeline(options=PipelineOptions()) as p:
data = p | "Reading example records" >> beam.Create(
[
Example("example1"),
Example("example2"),
]
)
data | "Writing example records to database" >> WriteToPostgres(
"host=localhost dbname=examples user=postgres password=postgres",
"insert into sink (data) values (%(data)s)",
)
```
See [here][examples] for more examples.
### Reading in batches
There may be situations when you have so much data that it will not fit into the
memory - then you want to read your table data in batches. You can see an
example code [here](examples/read.py#L11) (the code reads records in a batches of
1).
[pypi-project]: https://pypi.org/project/beam-postgres
[examples]: https://github.com/medzin/beam-postgres/tree/main/examples
Raw data
{
"_id": null,
"home_page": null,
"name": "beam-postgres",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.8",
"maintainer_email": null,
"keywords": "apache beam, beam, postgres, postgresql",
"author": "Adam Medzi\u0144ski",
"author_email": null,
"download_url": "https://files.pythonhosted.org/packages/4f/19/1527716ebffe14a84b55e90c50a4980103623d02489a1d3eca082de2c188/beam_postgres-0.5.0.tar.gz",
"platform": null,
"description": "# beam-postgres\n\n[data:image/s3,"s3://crabby-images/edcf0/edcf0ee34c5d055509cc152d8e623c436643db6a" alt="PyPI"][pypi-project]\n[data:image/s3,"s3://crabby-images/4e4ab/4e4abf729b5b0661016774be0873a919326442d3" alt="Supported Versions"][pypi-project]\n\nLight IO transforms for Postgres read/write in Apache Beam pipelines.\n\n## Goal\n\nThe project aims to provide highly performant and customizable transforms and is\nnot intended to support many different SQL database engines.\n\n## Features\n\n- `ReadAllFromPostgres`, `ReadFromPostgres`` and `WriteToPostgres` transforms\n- Records can be mapped to tuples, dictionaries or dataclasses\n- Reads and writes are in configurable batches\n\n## Usage\n\nPrinting data from the database table:\n\n```python\nimport apache_beam as beam\nfrom psycopg.rows import dict_row\n\nfrom beam_postgres.io import ReadAllFromPostgres\n\nwith beam.Pipeline() as p:\n data = p | \"Reading example records from database\" >> ReadAllFromPostgres(\n \"host=localhost dbname=examples user=postgres password=postgres\",\n \"select id, data from source\",\n dict_row,\n )\n data | \"Writing to stdout\" >> beam.Map(print)\n\n```\n\nWriting data to the database table:\n\n```python\nfrom dataclasses import dataclass\n\nimport apache_beam as beam\nfrom apache_beam.options.pipeline_options import PipelineOptions\n\nfrom beam_postgres.io import WriteToPostgres\n\n\n@dataclass\nclass Example:\n data: str\n\n\nwith beam.Pipeline(options=PipelineOptions()) as p:\n data = p | \"Reading example records\" >> beam.Create(\n [\n Example(\"example1\"),\n Example(\"example2\"),\n ]\n )\n data | \"Writing example records to database\" >> WriteToPostgres(\n \"host=localhost dbname=examples user=postgres password=postgres\",\n \"insert into sink (data) values (%(data)s)\",\n )\n\n```\n\nSee [here][examples] for more examples.\n\n### Reading in batches\n\nThere may be situations when you have so much data that it will not fit into the\nmemory - then you want to read your table data in batches. You can see an\nexample code [here](examples/read.py#L11) (the code reads records in a batches of\n1).\n\n[pypi-project]: https://pypi.org/project/beam-postgres\n[examples]: https://github.com/medzin/beam-postgres/tree/main/examples\n",
"bugtrack_url": null,
"license": "Apache-2.0",
"summary": "Light IO transforms for Postgres read/write in Apache Beam pipelines.",
"version": "0.5.0",
"project_urls": {
"Homepage": "https://github.com/medzin/beam-postgres"
},
"split_keywords": [
"apache beam",
" beam",
" postgres",
" postgresql"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "017858b3f2add8c7a39f9dcedc4963a40b950541e3089dbd25e4c669eff8bc77",
"md5": "2cf36b8af8b5304f519288c8d0ea019d",
"sha256": "9d7a17a6b3b283ce5a2b5d5ec4c2f779b0bdd9ab8961299fb483dd67985fc36f"
},
"downloads": -1,
"filename": "beam_postgres-0.5.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "2cf36b8af8b5304f519288c8d0ea019d",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.8",
"size": 10552,
"upload_time": "2024-07-20T23:05:02",
"upload_time_iso_8601": "2024-07-20T23:05:02.233503Z",
"url": "https://files.pythonhosted.org/packages/01/78/58b3f2add8c7a39f9dcedc4963a40b950541e3089dbd25e4c669eff8bc77/beam_postgres-0.5.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "4f191527716ebffe14a84b55e90c50a4980103623d02489a1d3eca082de2c188",
"md5": "8c1ab88a6d4237b172e941baa7bef36b",
"sha256": "d8172b39a3670046864ba1fd7ff10e18c232dc3816054a5ec22c85e23a9e01e1"
},
"downloads": -1,
"filename": "beam_postgres-0.5.0.tar.gz",
"has_sig": false,
"md5_digest": "8c1ab88a6d4237b172e941baa7bef36b",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.8",
"size": 10534,
"upload_time": "2024-07-20T23:05:03",
"upload_time_iso_8601": "2024-07-20T23:05:03.789719Z",
"url": "https://files.pythonhosted.org/packages/4f/19/1527716ebffe14a84b55e90c50a4980103623d02489a1d3eca082de2c188/beam_postgres-0.5.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-07-20 23:05:03",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "medzin",
"github_project": "beam-postgres",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"tox": true,
"lcname": "beam-postgres"
}