# c8-source-postgres
[![PyPI version](https://badge.fury.io/py/c8-source-postgres.svg)](https://badge.fury.io/py/c8-source-postgres)
[![PyPI - Python Version](https://img.shields.io/pypi/pyversions/c8-source-postgres.svg)](https://pypi.org/project/c8-source-postgres/)
[![License: MIT](https://img.shields.io/badge/License-GPLv3-yellow.svg)](https://opensource.org/licenses/GPL-3.0)
[Singer](https://www.singer.io/) tap that extracts data from a [PostgreSQL](https://www.postgresql.com/) database and
produces JSON-formatted data following
the [Singer spec](https://github.com/singer-io/getting-started/blob/master/docs/SPEC.md).
## How to use it
The recommended method of running this tap is to use it with Macrometa data connectors.
If you want to run this [Singer Tap](https://singer.io) independently please read further.
### Install and Run
First, make sure Python 3 is installed on your system or follow these
installation instructions for [Mac](http://docs.python-guide.org/en/latest/starting/install3/osx/) or
[Ubuntu](https://www.digitalocean.com/community/tutorials/how-to-install-python-3-and-set-up-a-local-programming-environment-on-ubuntu-16-04)
.
It's recommended to use a virtualenv:
```bash
python3 -m venv venv
pip install c8-source-postgres
```
or
```bash
make venv
```
### Create a config.json
```
{
"host": "localhost",
"port": 5432,
"user": "postgres",
"password": "secret",
"dbname": "db"
}
```
These are the same basic configuration properties used by the PostgreSQL command-line client (`psql`).
Full list of options in `config.json`:
| Property | Type | Required? | Description |
|-------------------------------------|---------|------------|---------------------------------------------------------------|
| host | String | Yes | PostgreSQL host |
| port | Integer | Yes | PostgreSQL port |
| user | String | Yes | PostgreSQL user |
| password | String | Yes | PostgreSQL password |
| dbname | String | Yes | PostgreSQL database name |
| filter_schemas | String | No | Comma separated schema names to scan only the required schemas to improve the performance of data extraction. (Default: None) |
| ssl | String | No | If set to `"true"` then use SSL via postgres sslmode `require` option. If the server does not accept SSL connections or the client certificate is not recognized the connection will fail. (Default: None) |
| logical_poll_total_seconds | Integer | No | Stop running the tap when no data received from wal after certain number of seconds. (Default: 10800) |
| break_at_end_lsn | Boolean | No | Stop running the tap if the newly received lsn is after the max lsn that was detected when the tap started. (Default: true) |
| max_run_seconds | Integer | No | Stop running the tap after certain number of seconds. (Default: 43200) |
| debug_lsn | String | No | If set to `"true"` then add `_sdc_lsn` property to the singer messages to debug postgres LSN position in the WAL stream. (Default: None) |
| tap_id | String | No | ID of the pipeline/tap (Default: None) |
| itersize | Integer | No | Size of PG cursor iterator when doing INCREMENTAL or FULL_TABLE (Default: 20000) |
| default_replication_method | String | No | Default replication method to use when no one is provided in the catalog (Values: `LOG_BASED`, `INCREMENTAL` or `FULL_TABLE`) (Default: None) |
| use_secondary | Boolean | No | Use a database replica for `INCREMENTAL` and `FULL_TABLE` replication (Default : False) |
| secondary_host | String | No | PostgreSQL Replica host (required if `use_secondary` is `True`) |
| secondary_port | Integer | No | PostgreSQL Replica port (required if `use_secondary` is `True`) |
### Run the tap in Discovery Mode
```
c8-source-postgres --config config.json --discover # Should dump a Catalog to stdout
c8-source-postgres --config config.json --discover > catalog.json # Capture the Catalog
```
### Add Metadata to the Catalog
Each entry under the Catalog's "stream" key will need the following metadata:
```
{
"streams": [
{
"stream_name": "my_topic"
"metadata": [{
"breadcrumb": [],
"metadata": {
"selected": true,
"replication-method": "LOG_BASED",
}
}]
}
]
}
```
The replication method can be one of `FULL_TABLE`, `INCREMENTAL` or `LOG_BASED`.
**Note**: Log based replication requires a few adjustments in the source postgres database, please read further
for more information.
### Run the tap in Sync Mode
```
c8-source-postgres --config config.json --catalog catalog.json
```
The tap will write bookmarks to stdout which can be captured and passed as an optional `--state state.json` parameter
to the tap for the next sync.
### Log Based replication requirements
* PostgreSQL database's running **PostgreSQL versions 9.4.x or greater**. To avoid a critical PostgreSQL bug,
use at least one of the following minor versions:
- PostgreSQL 12.0
- PostgreSQL 11.2
- PostgreSQL 10.7
- PostgreSQL 9.6.12
- PostgreSQL 9.5.16
- PostgreSQL 9.4.21
* **A connection to the master instance**. Log-based replication will only work by connecting to the master instance.
* **wal2json plugin**: To use Log Based for your PostgreSQL integration, you must install the wal2json plugin.
The wal2json plugin outputs JSON objects for logical decoding, which the tap then uses to perform Log-based
Replication.
Steps for installing the plugin vary depending on your operating system. Instructions for each operating system type
are in the wal2json’s GitHub repository:
* [Unix-based operating systems](https://github.com/eulerto/wal2json#unix-based-operating-systems)
* [Windows](https://github.com/eulerto/wal2json#windows)
* **postgres config file**: Locate the database configuration file (usually `postgresql.conf`) and define
the parameters as follows:
```
wal_level=logical
max_replication_slots=5
max_wal_senders=5
```
Restart your PostgreSQL service to ensure the changes take effect.
**Note**: For `max_replication_slots` and `max_wal_senders`, we’re defaulting to a value of 5.
This should be sufficient unless you have a large number of read replicas connected to the master instance.
* **Existing replication slot**: Log based replication requires a dedicated logical replication slot.
In PostgreSQL, a logical replication slot represents a stream of database changes that can then be replayed to a
client in the order they were made on the original server. Each slot streams a sequence of changes from a single
database.
Login to the master instance as a superuser and using the `wal2json` plugin, create a logical replication slot:
```
SELECT *
FROM pg_create_logical_replication_slot('macrometa_<database_name>', 'wal2json');
```
**Note**: Replication slots are specific to a given database in a cluster. If you want to connect multiple
databases - whether in one integration or several - you’ll need to create a replication slot for each database.
### To run tests:
1. Install python test dependencies in a virtual env:
```
make venv
```
2. You need to have a postgres database to run the tests and export its credentials:
```
export C8_SOURCE_POSTGRES_HOST=<postgres-host>
export C8_SOURCE_POSTGRES_PORT=<postgres-port>
export C8_SOURCE_POSTGRES_SECONDARY_HOST=<postgres-replica-host>
export C8_SOURCE_POSTGRES_SECONDARY_PORT=<postgres-replica-port>
export C8_SOURCE_POSTGRES_USER=<postgres-user>
export C8_SOURCE_POSTGRES_PASSWORD=<postgres-password>
```
You can make use of the local docker-compose to spin up a test database by running `make start_db`
Test objects will be created in the `postgres` database.
3. To run the tests:
```
make test
```
### To run pylint:
1. Install python dependencies and run python linter
```
make venv
make pylint
```
Raw data
{
"_id": null,
"home_page": "https://github.com/Macrometacorp/c8-source-postgres",
"name": "c8-source-postgres",
"maintainer": "",
"docs_url": null,
"requires_python": "",
"maintainer_email": "",
"keywords": "",
"author": "Macrometa",
"author_email": "",
"download_url": "https://files.pythonhosted.org/packages/96/23/3a0d9e45cf64755e9577a7a93156b8f19055ad0a365a8e1967e8c9a765e1/c8-source-postgres-0.0.12.tar.gz",
"platform": null,
"description": "# c8-source-postgres\n\n[![PyPI version](https://badge.fury.io/py/c8-source-postgres.svg)](https://badge.fury.io/py/c8-source-postgres)\n[![PyPI - Python Version](https://img.shields.io/pypi/pyversions/c8-source-postgres.svg)](https://pypi.org/project/c8-source-postgres/)\n[![License: MIT](https://img.shields.io/badge/License-GPLv3-yellow.svg)](https://opensource.org/licenses/GPL-3.0)\n\n[Singer](https://www.singer.io/) tap that extracts data from a [PostgreSQL](https://www.postgresql.com/) database and\nproduces JSON-formatted data following\nthe [Singer spec](https://github.com/singer-io/getting-started/blob/master/docs/SPEC.md).\n\n## How to use it\n\nThe recommended method of running this tap is to use it with Macrometa data connectors.\nIf you want to run this [Singer Tap](https://singer.io) independently please read further.\n\n### Install and Run\n\nFirst, make sure Python 3 is installed on your system or follow these\ninstallation instructions for [Mac](http://docs.python-guide.org/en/latest/starting/install3/osx/) or\n[Ubuntu](https://www.digitalocean.com/community/tutorials/how-to-install-python-3-and-set-up-a-local-programming-environment-on-ubuntu-16-04)\n.\n\nIt's recommended to use a virtualenv:\n\n```bash\n python3 -m venv venv\n pip install c8-source-postgres\n```\n\nor\n\n```bash\n make venv\n```\n\n### Create a config.json\n\n```\n{\n \"host\": \"localhost\",\n \"port\": 5432,\n \"user\": \"postgres\",\n \"password\": \"secret\",\n \"dbname\": \"db\"\n}\n```\n\nThese are the same basic configuration properties used by the PostgreSQL command-line client (`psql`).\n\nFull list of options in `config.json`:\n\n| Property | Type | Required? | Description |\n|-------------------------------------|---------|------------|---------------------------------------------------------------|\n| host | String | Yes | PostgreSQL host |\n| port | Integer | Yes | PostgreSQL port |\n| user | String | Yes | PostgreSQL user |\n| password | String | Yes | PostgreSQL password |\n| dbname | String | Yes | PostgreSQL database name |\n| filter_schemas | String | No | Comma separated schema names to scan only the required schemas to improve the performance of data extraction. (Default: None) |\n| ssl | String | No | If set to `\"true\"` then use SSL via postgres sslmode `require` option. If the server does not accept SSL connections or the client certificate is not recognized the connection will fail. (Default: None) |\n| logical_poll_total_seconds | Integer | No | Stop running the tap when no data received from wal after certain number of seconds. (Default: 10800) |\n| break_at_end_lsn | Boolean | No | Stop running the tap if the newly received lsn is after the max lsn that was detected when the tap started. (Default: true) |\n| max_run_seconds | Integer | No | Stop running the tap after certain number of seconds. (Default: 43200) |\n| debug_lsn | String | No | If set to `\"true\"` then add `_sdc_lsn` property to the singer messages to debug postgres LSN position in the WAL stream. (Default: None) |\n| tap_id | String | No | ID of the pipeline/tap (Default: None) |\n| itersize | Integer | No | Size of PG cursor iterator when doing INCREMENTAL or FULL_TABLE (Default: 20000) |\n| default_replication_method | String | No | Default replication method to use when no one is provided in the catalog (Values: `LOG_BASED`, `INCREMENTAL` or `FULL_TABLE`) (Default: None) |\n| use_secondary | Boolean | No | Use a database replica for `INCREMENTAL` and `FULL_TABLE` replication (Default : False) |\n| secondary_host | String | No | PostgreSQL Replica host (required if `use_secondary` is `True`) |\n| secondary_port | Integer | No | PostgreSQL Replica port (required if `use_secondary` is `True`) |\n\n### Run the tap in Discovery Mode\n\n```\nc8-source-postgres --config config.json --discover # Should dump a Catalog to stdout\nc8-source-postgres --config config.json --discover > catalog.json # Capture the Catalog\n```\n\n### Add Metadata to the Catalog\n\nEach entry under the Catalog's \"stream\" key will need the following metadata:\n\n```\n{\n \"streams\": [\n {\n \"stream_name\": \"my_topic\"\n \"metadata\": [{\n \"breadcrumb\": [],\n \"metadata\": {\n \"selected\": true,\n \"replication-method\": \"LOG_BASED\",\n }\n }]\n }\n ]\n}\n```\n\nThe replication method can be one of `FULL_TABLE`, `INCREMENTAL` or `LOG_BASED`.\n\n**Note**: Log based replication requires a few adjustments in the source postgres database, please read further\nfor more information.\n\n### Run the tap in Sync Mode\n\n```\nc8-source-postgres --config config.json --catalog catalog.json\n```\n\nThe tap will write bookmarks to stdout which can be captured and passed as an optional `--state state.json` parameter\nto the tap for the next sync.\n\n### Log Based replication requirements\n\n* PostgreSQL database's running **PostgreSQL versions 9.4.x or greater**. To avoid a critical PostgreSQL bug,\n use at least one of the following minor versions:\n - PostgreSQL 12.0\n - PostgreSQL 11.2\n - PostgreSQL 10.7\n - PostgreSQL 9.6.12\n - PostgreSQL 9.5.16\n - PostgreSQL 9.4.21\n\n* **A connection to the master instance**. Log-based replication will only work by connecting to the master instance.\n\n* **wal2json plugin**: To use Log Based for your PostgreSQL integration, you must install the wal2json plugin.\n The wal2json plugin outputs JSON objects for logical decoding, which the tap then uses to perform Log-based\n Replication.\n Steps for installing the plugin vary depending on your operating system. Instructions for each operating system type\n are in the wal2json\u2019s GitHub repository:\n\n * [Unix-based operating systems](https://github.com/eulerto/wal2json#unix-based-operating-systems)\n * [Windows](https://github.com/eulerto/wal2json#windows)\n\n\n* **postgres config file**: Locate the database configuration file (usually `postgresql.conf`) and define\n the parameters as follows:\n\n ```\n wal_level=logical\n max_replication_slots=5\n max_wal_senders=5\n ```\n\n Restart your PostgreSQL service to ensure the changes take effect.\n\n **Note**: For `max_replication_slots` and `max_wal_senders`, we\u2019re defaulting to a value of 5.\n This should be sufficient unless you have a large number of read replicas connected to the master instance.\n\n\n* **Existing replication slot**: Log based replication requires a dedicated logical replication slot.\n In PostgreSQL, a logical replication slot represents a stream of database changes that can then be replayed to a\n client in the order they were made on the original server. Each slot streams a sequence of changes from a single\n database.\n\n Login to the master instance as a superuser and using the `wal2json` plugin, create a logical replication slot:\n ```\n SELECT *\n FROM pg_create_logical_replication_slot('macrometa_<database_name>', 'wal2json');\n ```\n\n **Note**: Replication slots are specific to a given database in a cluster. If you want to connect multiple\n databases - whether in one integration or several - you\u2019ll need to create a replication slot for each database.\n\n### To run tests:\n\n1. Install python test dependencies in a virtual env:\n\n```\n make venv\n```\n\n2. You need to have a postgres database to run the tests and export its credentials:\n\n```\n export C8_SOURCE_POSTGRES_HOST=<postgres-host>\n export C8_SOURCE_POSTGRES_PORT=<postgres-port>\n export C8_SOURCE_POSTGRES_SECONDARY_HOST=<postgres-replica-host>\n export C8_SOURCE_POSTGRES_SECONDARY_PORT=<postgres-replica-port>\n export C8_SOURCE_POSTGRES_USER=<postgres-user>\n export C8_SOURCE_POSTGRES_PASSWORD=<postgres-password>\n```\n\nYou can make use of the local docker-compose to spin up a test database by running `make start_db`\n\nTest objects will be created in the `postgres` database.\n\n3. To run the tests:\n\n```\n make test\n```\n\n### To run pylint:\n\n1. Install python dependencies and run python linter\n\n```\n make venv\n make pylint\n```\n",
"bugtrack_url": null,
"license": "",
"summary": "C8 Source for extracting data from PostgresSQL.",
"version": "0.0.12",
"split_keywords": [],
"urls": [
{
"comment_text": "",
"digests": {
"md5": "7054f5a372b05d3c5a970c66be6136f6",
"sha256": "30c0ea0f8ebfd56ef04ae9b5e02e1c6b3fdbf03c8b69899f2acfc2321735a112"
},
"downloads": -1,
"filename": "c8_source_postgres-0.0.12-py3-none-any.whl",
"has_sig": false,
"md5_digest": "7054f5a372b05d3c5a970c66be6136f6",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": null,
"size": 44333,
"upload_time": "2022-12-13T05:08:21",
"upload_time_iso_8601": "2022-12-13T05:08:21.024530Z",
"url": "https://files.pythonhosted.org/packages/ef/85/da23b511cc3bc47d3823d653e58322b2b6740b6af458f76d9fb430bd22bc/c8_source_postgres-0.0.12-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"md5": "c7db9617bf88d63fff45e96ba3f586f1",
"sha256": "74aefebbbd446de6d4a4728a3fd65b228066ec34bb0d15a4afbc31d208912f69"
},
"downloads": -1,
"filename": "c8-source-postgres-0.0.12.tar.gz",
"has_sig": false,
"md5_digest": "c7db9617bf88d63fff45e96ba3f586f1",
"packagetype": "sdist",
"python_version": "source",
"requires_python": null,
"size": 41295,
"upload_time": "2022-12-13T05:08:23",
"upload_time_iso_8601": "2022-12-13T05:08:23.198317Z",
"url": "https://files.pythonhosted.org/packages/96/23/3a0d9e45cf64755e9577a7a93156b8f19055ad0a365a8e1967e8c9a765e1/c8-source-postgres-0.0.12.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2022-12-13 05:08:23",
"github": true,
"gitlab": false,
"bitbucket": false,
"github_user": "Macrometacorp",
"github_project": "c8-source-postgres",
"lcname": "c8-source-postgres"
}