Name | neo4j-etl-lib JSON |
Version |
0.0.2
JSON |
| download |
home_page | None |
Summary | Building blocks for ETL pipelines. |
upload_time | 2025-01-31 22:51:51 |
maintainer | None |
docs_url | None |
author | None |
requires_python | >=3.10 |
license | None |
keywords |
etl
graph
database
|
VCS |
 |
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
# Python ETL Toolbox
Complete documentation can be found on https://neo-technology-field.github.io/python-etl-lib/index.html
A library of building blocks to assemble etl pipelines.
So, instead of providing yet another etl tool, the aim is to provide quality building blocks for the usual etl task. These building blocks should (do) meet the following functional requirements:
* logging (of tasks performed including times, errors, and statistics)
* error handling
* validation of data (currently via Pydantic)
* batching and streaming
* optionally record the information about performed tasks and provide means (NeoDash, console) to review past etl runs.
While this library currently focuses on Neo4j databases, it can be extended to other sources and sinks as needed.
It does not provide a CLI out of the box, but contains a set of functions to list and manage past runs (if they are stored in a database). In addition, the provided example illustrates how to assemble a etl pipeline and run it from a CLI.
## Quick guide
### Installation
Package is available on PyPi and can be installed (for development) via:
```bash
python3 -m venv venv
source venv/bin/activate
python -m pip install pip-tools
pip-compile --extra dev pyproject.toml
pip-sync
```
### Usage
The below shows a minimalistic etl pipeline to a single CSV file (look at the GTFS example to see more details)
```python
class LoadAgenciesTask(CSVLoad2Neo4jTasks):
class Agency(BaseModel):
""" Define the Pydantic model for data validation. """
id: str = Field(alias="agency_id", default="generic")
name: str = Field(alias="agency_name")
url: str = Field(alias="agency_url")
timezone: str = Field(alias="agency_timezone")
lang: str = Field(alias="agency_lang")
def __init__(self, context: ETLContext, file:Path):
super().__init__(context, LoadAgenciesTask.Agency, file)
def task_name(self) -> str:
return f"{self.__class__.__name__}('{self.file}')"
def _query(self):
"""Load the data into Neo4j."""
return """ UNWIND $batch AS row
MERGE (a:Agency {id: row.id})
SET a.name= row.name,
a.url= row.url,
a.timezone= row.timezone,
a.lang= row.lang
"""
@classmethod
def file_name(cls):
return "agency.txt"
context = ETLContext(env_vars=dict(os.environ))
schema = SchemaTask(context=context)
init_group = TaskGroup(context=context, tasks=[schema], name="schema-init")
tasks = [
LoadAgenciesTask(context=context, file=input_directory / LoadAgenciesTask.file_name()),
]
csv_group = TaskGroup(context=context, tasks=tasks, name="csv-loading")
all_group = TaskGroup(context=context, tasks=[init_group, csv_group], name="main")
context.reporter.register_tasks(all_group)
all_group.execute()
```
See the provided [example](examples/gtfs/README.md) for a more realistic pipeline and how the logging and reporting would look like.
With the above, all lines in the input file `agency.txt` that do not fit the Pydantic model, would be sent to an json file, containing the error data and a description of why it could not be loaded.
Raw data
{
"_id": null,
"home_page": null,
"name": "neo4j-etl-lib",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.10",
"maintainer_email": null,
"keywords": "etl, graph, database",
"author": null,
"author_email": "Bert Radke <bert.radke@pm.me>",
"download_url": "https://files.pythonhosted.org/packages/24/67/7dd5a6add2ba1da223070ef2053672e65fdd454f7eacaaa978f719262e75/neo4j_etl_lib-0.0.2.tar.gz",
"platform": null,
"description": "# Python ETL Toolbox\n\nComplete documentation can be found on https://neo-technology-field.github.io/python-etl-lib/index.html\n\nA library of building blocks to assemble etl pipelines.\n\nSo, instead of providing yet another etl tool, the aim is to provide quality building blocks for the usual etl task. These building blocks should (do) meet the following functional requirements:\n\n* logging (of tasks performed including times, errors, and statistics)\n* error handling \n* validation of data (currently via Pydantic)\n* batching and streaming\n* optionally record the information about performed tasks and provide means (NeoDash, console) to review past etl runs.\n\nWhile this library currently focuses on Neo4j databases, it can be extended to other sources and sinks as needed. \n\nIt does not provide a CLI out of the box, but contains a set of functions to list and manage past runs (if they are stored in a database). In addition, the provided example illustrates how to assemble a etl pipeline and run it from a CLI.\n\n## Quick guide\n\n### Installation\n\nPackage is available on PyPi and can be installed (for development) via:\n\n```bash\npython3 -m venv venv\nsource venv/bin/activate\npython -m pip install pip-tools\npip-compile --extra dev pyproject.toml\npip-sync\n```\n\n### Usage\n\nThe below shows a minimalistic etl pipeline to a single CSV file (look at the GTFS example to see more details)\n\n```python\n\nclass LoadAgenciesTask(CSVLoad2Neo4jTasks):\n \n class Agency(BaseModel):\n \"\"\" Define the Pydantic model for data validation. \"\"\"\n id: str = Field(alias=\"agency_id\", default=\"generic\")\n name: str = Field(alias=\"agency_name\")\n url: str = Field(alias=\"agency_url\")\n timezone: str = Field(alias=\"agency_timezone\")\n lang: str = Field(alias=\"agency_lang\")\n\n def __init__(self, context: ETLContext, file:Path):\n super().__init__(context, LoadAgenciesTask.Agency, file)\n\n def task_name(self) -> str:\n return f\"{self.__class__.__name__}('{self.file}')\"\n\n def _query(self):\n \"\"\"Load the data into Neo4j.\"\"\"\n return \"\"\" UNWIND $batch AS row\n MERGE (a:Agency {id: row.id})\n SET a.name= row.name, \n a.url= row.url, \n a.timezone= row.timezone, \n a.lang= row.lang\n \"\"\"\n\n @classmethod\n def file_name(cls):\n return \"agency.txt\"\n\ncontext = ETLContext(env_vars=dict(os.environ))\n\nschema = SchemaTask(context=context)\ninit_group = TaskGroup(context=context, tasks=[schema], name=\"schema-init\")\n\ntasks = [\n LoadAgenciesTask(context=context, file=input_directory / LoadAgenciesTask.file_name()),\n]\ncsv_group = TaskGroup(context=context, tasks=tasks, name=\"csv-loading\")\n\nall_group = TaskGroup(context=context, tasks=[init_group, csv_group], name=\"main\")\n\ncontext.reporter.register_tasks(all_group)\n\nall_group.execute()\n\n```\nSee the provided [example](examples/gtfs/README.md) for a more realistic pipeline and how the logging and reporting would look like.\n\nWith the above, all lines in the input file `agency.txt` that do not fit the Pydantic model, would be sent to an json file, containing the error data and a description of why it could not be loaded.\n",
"bugtrack_url": null,
"license": null,
"summary": "Building blocks for ETL pipelines.",
"version": "0.0.2",
"project_urls": {
"Documentation": "https://neo-technology-field.github.io/python-etl-lib/index.html",
"Home": "https://github.com/neo-technology-field/python-etl-lib"
},
"split_keywords": [
"etl",
" graph",
" database"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "79372d49bf5852aca9e407bd3651bfaedff1782d7f76e938ced50e0170551b73",
"md5": "c57457d565602d5f7263140aa19c8aa7",
"sha256": "384d893b8f4723190e3ff8e5ae9e41a208bd32fff705036335bb6d541cfaaebf"
},
"downloads": -1,
"filename": "neo4j_etl_lib-0.0.2-py3-none-any.whl",
"has_sig": false,
"md5_digest": "c57457d565602d5f7263140aa19c8aa7",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.10",
"size": 26131,
"upload_time": "2025-01-31T22:51:47",
"upload_time_iso_8601": "2025-01-31T22:51:47.786131Z",
"url": "https://files.pythonhosted.org/packages/79/37/2d49bf5852aca9e407bd3651bfaedff1782d7f76e938ced50e0170551b73/neo4j_etl_lib-0.0.2-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "24677dd5a6add2ba1da223070ef2053672e65fdd454f7eacaaa978f719262e75",
"md5": "5e89c6126a0cbcb4b38b3c5878af3f5e",
"sha256": "986bce087a014042ef050869bfaec6412d18e4abff052b2efe8e937203b3f926"
},
"downloads": -1,
"filename": "neo4j_etl_lib-0.0.2.tar.gz",
"has_sig": false,
"md5_digest": "5e89c6126a0cbcb4b38b3c5878af3f5e",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.10",
"size": 310626,
"upload_time": "2025-01-31T22:51:51",
"upload_time_iso_8601": "2025-01-31T22:51:51.294508Z",
"url": "https://files.pythonhosted.org/packages/24/67/7dd5a6add2ba1da223070ef2053672e65fdd454f7eacaaa978f719262e75/neo4j_etl_lib-0.0.2.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-01-31 22:51:51",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "neo-technology-field",
"github_project": "python-etl-lib",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "neo4j-etl-lib"
}