Name | etl-pipeline-runner JSON |
Version |
1.1.1
JSON |
| download |
home_page | |
Summary | A package to run ETL Pipeline |
upload_time | 2023-12-14 21:48:00 |
maintainer | |
docs_url | None |
author | |
requires_python | >=3.10 |
license | MIT |
keywords |
|
VCS |
|
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
## etl-pipeline-runner
Extract Transform Load (ETL) pipeline runner is a simple, yet effective python
package to run ETL-Pipelines for Data sceince projects. The sole purpose of this
package is to:
```Extract data from a source --> Transform the data according to the necessity --> Load data to a Database```

## Installation
Install the library with pip:
```
pip install etl-pipeline-runner
```
## Usage
### Run an ETL Pipeline that extracts data from kaggle and stores it in a SQLite Database.
----------
Data source: https://www.kaggle.com/datasets/edenbd/150k-lyrics-labeled-with-spotify-valence
Destination: Under ``songs`` table of ``project.sqlite`` Database. Suppose the database is located or will be created in ``/data`` directory.
#### Example code:
1. Import the following services from ``etl_pipeline_runner``
```
from etl_pipeline_runner.services import (
ETLPipeline,
DataExtractor,
CSVHandler,
SQLiteLoader,
ETLQueue,
)
```
2. Create a function that defines the transformation you want to perform on the dataset before loading to the Database.
The function signature must match the following. Here pd refers to pandas.
```
def transform_songs(data_frame: pd.DataFrame):
data_frame = data_frame.drop(columns=data_frame.columns[0], axis=1)
data_frame = data_frame.rename(columns={"seq": "lyrics"})
return data_frame
```
3. Create an object of the SQLiteLoader.
```
DATA_DIRECTORY = os.path.join(os.getcwd(), "data")
songs_loader = SQLiteLoader(
db_name="project.sqlite",
table_name="song_lyrics",
if_exists=SQLiteLoader.REPLACE,
index=False,
method=None,
output_directory=DATA_DIRECTORY,
)
```
4. Create an object of the CSVHandler.
```
songs_dtype = {
"#": "Int64",
"artist": str,
"seq": str,
"song": str,
"label": np.float64,
}
songs_csv_handler = CSVHandler(
file_name="labeled_lyrics_cleaned.csv",
sep=",",
names=None,
dtype=songs_dtype,
transformer=transform_songs,
loader=songs_loader,
)
```
5. Create an object of the DataExtractor.
```
songs_extractor = DataExtractor(
data_name="Song lyrics",
url="https://www.kaggle.com/datasets/edenbd/150k-lyrics-labeled-with-spotify-valence",
type=DataExtractor.KAGGLE_ARCHIVE,
file_handlers=(songs_csv_handler,),
)
```
6. Create an object of ETLPipeline.
```
songs_pipeline = ETLPipeline(
extractor=songs_extractor,
)
```
7. Finally run the pipeline:
```
if __name__ == "__main__":
ETLQueue(etl_pipelines=(songs_pipeline,)).run()
```
## Setting-up credentials for KAGGLE Datasource
If your data source is kaggle, you need api key to download the dataset.
etl-pipeline-runner uses [opendatasets](https://github.com/JovianHQ/opendatasets) for donwloading dataset from Kaggle.
Following step will guide you to setup kaggle credentials.
1. Go to https://kaggle.com/me/account (sign in if required).
2. Scroll down to the "API" section and click "Create New API Token".
3. This will download a file kaggle.json with the following contents:
```
{"username":"YOUR_KAGGLE_USERNAME","key":"YOUR_KAGGLE_KEY"}
```
4. You can either put the credentials in your root directory as ``kaggle.json`` or enter your username and key in terminal when asked.
## Services explained
1. SQLiteLoader
Parameters description:
| Parameter | Description |
|-------------------------------------|-------------------------------------------------------------------------------------------------------------|
| db_name: str | Name of the database. |
| table_name: str | Table name where data will be stored. |
| if_exists: str | Action if the table already exists. Possible options: ``SQLiteLoader.REPLACE``, ``SQLiteLoader.APPEND``, ``SQLiteLoader.FAIL``.|
| index: bool | Write DataFrame index as a column. Uses index_label as the column name in the table. (From pandas Doc). |
| method: Callable | Controls the SQL insertion clause used. (From pandas doc). |
| output_directory: str | Path where the databse is located or wil be created. |
2. CSVHandler
Parameters description:
| Parameter | Description |
|-------------------------------------|-------------------------------------------------------------------|
| file_name: str | Name of the csv file. **It must match with the actual filename.** |
| sep: str | Separetor used in the csv file. |
| names: list | Name of the columns if csv file does not contains it. |
| dtype: dict | Type of the columns in the csv file. |
| compression: str | Options: ``CSVHandler.ZIP_COMPRESSION``, ``CSVHandler.GZIP_COMPRESSION``, ``CSVHandler.BZIP2_COMPRESSION``, ``CSVHandler.ZSTD_COMPRESSION``, ``CSVHandler.XZ_COMPRESSION``, ``CSVHandler.TAR_COMPRESSION``|
| encoding: str | Encoding of the file. Default: ``utf-8``. |
| loader: SQLiteLoader | Object of SQLiteLoader |
| transformer: Callable | Function that defines the transformation on the data. |
3. DataExtractor
Parameters description:
| Parameter | Description |
|-----------------------------------------------|-------------------------------------------------------------------------------------------------------|
| data_name: str | Name of the data. (Could be anything of your choice). |
| url: str | Url of the data source. |
| type: str | Type of the source. Possible options: ``DataExtractor.KAGGLE_ARCHIVE``, ``DataExtractor.CSV``. |
| file_handlers: Tuple(CSVHandlers) | Handler objects to handle the extracted files from the url. |
4. ETLPipeline
Parameters description:
| Parameter | Description |
|---------------------------------------|------------------------------------|
| extractor: DataExtractor | An object of DataExtractor service.|
5. ETLQueue
Parameters description:
| Parameter | Description |
|---------------------------------------|-----------------------------------|
| etl_pipelines: Tuples | Tupes of ETLPipelines |
## Contributing
This is an open source project and I welcome contributions. Please create an issue first and make a feature branch
assiciated to the issue.
## Local Development Setup
1. Clone the repository:
```
git clone git@github.com:prantoamt/etl-pipeline-runner.git
```
2. Install pdm package manager based on your local environment: https://pdm-project.org/latest/
3. Go to the project directory and install the requirements using pdm:
```
pdm install
```
4. Open up the project in VS code, make your changes and create a pull request with proper description.
Raw data
{
"_id": null,
"home_page": "",
"name": "etl-pipeline-runner",
"maintainer": "",
"docs_url": null,
"requires_python": ">=3.10",
"maintainer_email": "",
"keywords": "",
"author": "",
"author_email": "Md Badiuzzaman Pranto <prantoamt@gmail.com>",
"download_url": "https://files.pythonhosted.org/packages/57/01/f4b3efe8772d06edaefaf9553987798c178d97cc7dd0012c8fc6c8f62a0c/etl_pipeline_runner-1.1.1.tar.gz",
"platform": null,
"description": "## etl-pipeline-runner\n\n\nExtract Transform Load (ETL) pipeline runner is a simple, yet effective python \npackage to run ETL-Pipelines for Data sceince projects. The sole purpose of this\npackage is to:\n\n```Extract data from a source --> Transform the data according to the necessity --> Load data to a Database```\n\n\n\n\n## Installation\n\nInstall the library with pip:\n\n\n```\n pip install etl-pipeline-runner\n```\n\n## Usage\n### Run an ETL Pipeline that extracts data from kaggle and stores it in a SQLite Database.\n----------\n\nData source: https://www.kaggle.com/datasets/edenbd/150k-lyrics-labeled-with-spotify-valence\n\nDestination: Under ``songs`` table of ``project.sqlite`` Database. Suppose the database is located or will be created in ``/data`` directory.\n\n#### Example code:\n1. Import the following services from ``etl_pipeline_runner``\n\n```\nfrom etl_pipeline_runner.services import (\n ETLPipeline,\n DataExtractor,\n CSVHandler,\n SQLiteLoader,\n ETLQueue,\n)\n```\n\n2. Create a function that defines the transformation you want to perform on the dataset before loading to the Database.\n The function signature must match the following. Here pd refers to pandas.\n\n```\n def transform_songs(data_frame: pd.DataFrame):\n data_frame = data_frame.drop(columns=data_frame.columns[0], axis=1)\n data_frame = data_frame.rename(columns={\"seq\": \"lyrics\"})\n return data_frame\n```\n\n3. Create an object of the SQLiteLoader.\n\n```\n DATA_DIRECTORY = os.path.join(os.getcwd(), \"data\")\n songs_loader = SQLiteLoader(\n db_name=\"project.sqlite\",\n table_name=\"song_lyrics\",\n if_exists=SQLiteLoader.REPLACE,\n index=False,\n method=None,\n output_directory=DATA_DIRECTORY,\n )\n```\n\n4. Create an object of the CSVHandler.\n\n``` \n songs_dtype = {\n \"#\": \"Int64\",\n \"artist\": str,\n \"seq\": str,\n \"song\": str,\n \"label\": np.float64,\n }\n\n songs_csv_handler = CSVHandler(\n file_name=\"labeled_lyrics_cleaned.csv\",\n sep=\",\",\n names=None,\n dtype=songs_dtype,\n transformer=transform_songs,\n loader=songs_loader,\n )\n```\n\n5. Create an object of the DataExtractor.\n\n```\n songs_extractor = DataExtractor(\n data_name=\"Song lyrics\",\n url=\"https://www.kaggle.com/datasets/edenbd/150k-lyrics-labeled-with-spotify-valence\",\n type=DataExtractor.KAGGLE_ARCHIVE,\n file_handlers=(songs_csv_handler,),\n )\n```\n\n6. Create an object of ETLPipeline.\n\n```\n songs_pipeline = ETLPipeline(\n extractor=songs_extractor,\n )\n```\n\n7. Finally run the pipeline:\n\n```\n if __name__ == \"__main__\":\n ETLQueue(etl_pipelines=(songs_pipeline,)).run()\n```\n\n## Setting-up credentials for KAGGLE Datasource\nIf your data source is kaggle, you need api key to download the dataset.\netl-pipeline-runner uses [opendatasets](https://github.com/JovianHQ/opendatasets) for donwloading dataset from Kaggle. \nFollowing step will guide you to setup kaggle credentials.\n\n1. Go to https://kaggle.com/me/account (sign in if required).\n2. Scroll down to the \"API\" section and click \"Create New API Token\".\n3. This will download a file kaggle.json with the following contents:\n```\n {\"username\":\"YOUR_KAGGLE_USERNAME\",\"key\":\"YOUR_KAGGLE_KEY\"}\n```\n4. You can either put the credentials in your root directory as ``kaggle.json`` or enter your username and key in terminal when asked.\n\n## Services explained\n\n1. SQLiteLoader\n\nParameters description:\n\n| Parameter | Description |\n|-------------------------------------|-------------------------------------------------------------------------------------------------------------|\n| db_name: str | Name of the database. |\n| table_name: str | Table name where data will be stored. |\n| if_exists: str | Action if the table already exists. Possible options: ``SQLiteLoader.REPLACE``, ``SQLiteLoader.APPEND``, ``SQLiteLoader.FAIL``.|\n| index: bool | Write DataFrame index as a column. Uses index_label as the column name in the table. (From pandas Doc). |\n| method: Callable | Controls the SQL insertion clause used. (From pandas doc). |\n| output_directory: str | Path where the databse is located or wil be created. |\n\n2. CSVHandler\n\nParameters description:\n\n| Parameter | Description |\n|-------------------------------------|-------------------------------------------------------------------|\n| file_name: str | Name of the csv file. **It must match with the actual filename.** |\n| sep: str | Separetor used in the csv file. |\n| names: list | Name of the columns if csv file does not contains it. |\n| dtype: dict | Type of the columns in the csv file. |\n| compression: str | Options: ``CSVHandler.ZIP_COMPRESSION``, ``CSVHandler.GZIP_COMPRESSION``, ``CSVHandler.BZIP2_COMPRESSION``, ``CSVHandler.ZSTD_COMPRESSION``, ``CSVHandler.XZ_COMPRESSION``, ``CSVHandler.TAR_COMPRESSION``|\n| encoding: str | Encoding of the file. Default: ``utf-8``. |\n| loader: SQLiteLoader | Object of SQLiteLoader |\n| transformer: Callable | Function that defines the transformation on the data. |\n\n3. DataExtractor\n\nParameters description:\n\n| Parameter | Description |\n|-----------------------------------------------|-------------------------------------------------------------------------------------------------------|\n| data_name: str | Name of the data. (Could be anything of your choice). |\n| url: str | Url of the data source. |\n| type: str | Type of the source. Possible options: ``DataExtractor.KAGGLE_ARCHIVE``, ``DataExtractor.CSV``. |\n| file_handlers: Tuple(CSVHandlers) | Handler objects to handle the extracted files from the url. |\n\n4. ETLPipeline\n\nParameters description:\n\n| Parameter | Description |\n|---------------------------------------|------------------------------------|\n| extractor: DataExtractor | An object of DataExtractor service.|\n\n5. ETLQueue\n\nParameters description:\n\n| Parameter | Description |\n|---------------------------------------|-----------------------------------|\n| etl_pipelines: Tuples | Tupes of ETLPipelines |\n\n## Contributing\nThis is an open source project and I welcome contributions. Please create an issue first and make a feature branch\nassiciated to the issue.\n\n## Local Development Setup\n\n1. Clone the repository:\n\n```\n git clone git@github.com:prantoamt/etl-pipeline-runner.git\n```\n\n2. Install pdm package manager based on your local environment: https://pdm-project.org/latest/\n\n3. Go to the project directory and install the requirements using pdm:\n\n```\n pdm install\n```\n\n4. Open up the project in VS code, make your changes and create a pull request with proper description.",
"bugtrack_url": null,
"license": "MIT",
"summary": "A package to run ETL Pipeline",
"version": "1.1.1",
"project_urls": null,
"split_keywords": [],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "7004efffdcf5a6e5d0f6b1795d4d603be42c98bba3766043704016bd078ee65b",
"md5": "cd9cd6038044d9b163fc95997d0e4f18",
"sha256": "234e326ad0e8c8af0ec17c86a7e44ef6e64db5b529ca78ffa3fd7101cc55baba"
},
"downloads": -1,
"filename": "etl_pipeline_runner-1.1.1-py3-none-any.whl",
"has_sig": false,
"md5_digest": "cd9cd6038044d9b163fc95997d0e4f18",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.10",
"size": 7125,
"upload_time": "2023-12-14T21:47:58",
"upload_time_iso_8601": "2023-12-14T21:47:58.729318Z",
"url": "https://files.pythonhosted.org/packages/70/04/efffdcf5a6e5d0f6b1795d4d603be42c98bba3766043704016bd078ee65b/etl_pipeline_runner-1.1.1-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "5701f4b3efe8772d06edaefaf9553987798c178d97cc7dd0012c8fc6c8f62a0c",
"md5": "d18ec42e1720a3e21db0da3a34427697",
"sha256": "57833db1e562003c83fef590addb13fdb5987fedd7b55f27c85e0868dda0f3dd"
},
"downloads": -1,
"filename": "etl_pipeline_runner-1.1.1.tar.gz",
"has_sig": false,
"md5_digest": "d18ec42e1720a3e21db0da3a34427697",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.10",
"size": 9966,
"upload_time": "2023-12-14T21:48:00",
"upload_time_iso_8601": "2023-12-14T21:48:00.018341Z",
"url": "https://files.pythonhosted.org/packages/57/01/f4b3efe8772d06edaefaf9553987798c178d97cc7dd0012c8fc6c8f62a0c/etl_pipeline_runner-1.1.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2023-12-14 21:48:00",
"github": false,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"lcname": "etl-pipeline-runner"
}