etl-pipeline-runner


Nameetl-pipeline-runner JSON
Version 1.1.1 PyPI version JSON
download
home_page
SummaryA package to run ETL Pipeline
upload_time2023-12-14 21:48:00
maintainer
docs_urlNone
author
requires_python>=3.10
licenseMIT
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            ## etl-pipeline-runner


Extract Transform Load (ETL) pipeline runner is a simple, yet effective python 
package to run ETL-Pipelines for Data sceince projects. The sole purpose of this
package is to:

```Extract data from a source --> Transform the data according to the necessity --> Load data to a Database```

![](images/ETL-Pipeline.jpg?raw=true)


## Installation

Install the library with pip:


```
    pip install etl-pipeline-runner
```

## Usage
### Run an ETL Pipeline that extracts data from kaggle and stores it in a SQLite Database.
----------

Data source: https://www.kaggle.com/datasets/edenbd/150k-lyrics-labeled-with-spotify-valence

Destination: Under ``songs`` table of ``project.sqlite`` Database. Suppose the database is located or will be created in ``/data`` directory.

#### Example code:
1. Import the following services from ``etl_pipeline_runner``

```
from etl_pipeline_runner.services import (
    ETLPipeline,
    DataExtractor,
    CSVHandler,
    SQLiteLoader,
    ETLQueue,
)
```

2. Create a function that defines the transformation you want to perform on the dataset before loading to the Database.
    The function signature must match the following. Here pd refers to pandas.

```
    def transform_songs(data_frame: pd.DataFrame):
        data_frame = data_frame.drop(columns=data_frame.columns[0], axis=1)
        data_frame = data_frame.rename(columns={"seq": "lyrics"})
        return data_frame
```

3. Create an object of the SQLiteLoader.

```
    DATA_DIRECTORY = os.path.join(os.getcwd(), "data")
    songs_loader = SQLiteLoader(
        db_name="project.sqlite",
        table_name="song_lyrics",
        if_exists=SQLiteLoader.REPLACE,
        index=False,
        method=None,
        output_directory=DATA_DIRECTORY,
    )
```

4. Create an object of the CSVHandler.

``` 
    songs_dtype = {
        "#": "Int64",
        "artist": str,
        "seq": str,
        "song": str,
        "label": np.float64,
    }

    songs_csv_handler = CSVHandler(
        file_name="labeled_lyrics_cleaned.csv",
        sep=",",
        names=None,
        dtype=songs_dtype,
        transformer=transform_songs,
        loader=songs_loader,
    )
```

5. Create an object of the DataExtractor.

```
    songs_extractor = DataExtractor(
        data_name="Song lyrics",
        url="https://www.kaggle.com/datasets/edenbd/150k-lyrics-labeled-with-spotify-valence",
        type=DataExtractor.KAGGLE_ARCHIVE,
        file_handlers=(songs_csv_handler,),
    )
```

6. Create an object of ETLPipeline.

```
    songs_pipeline = ETLPipeline(
        extractor=songs_extractor,
    )
```

7. Finally run the pipeline:

```
    if __name__ == "__main__":
        ETLQueue(etl_pipelines=(songs_pipeline,)).run()
```

## Setting-up credentials for KAGGLE Datasource
If your data source is kaggle, you need api key to download the dataset.
etl-pipeline-runner uses [opendatasets](https://github.com/JovianHQ/opendatasets) for donwloading dataset from Kaggle.  
Following step will guide you to setup kaggle credentials.

1. Go to https://kaggle.com/me/account (sign in if required).
2. Scroll down to the "API" section and click "Create New API Token".
3. This will download a file kaggle.json with the following contents:
```
    {"username":"YOUR_KAGGLE_USERNAME","key":"YOUR_KAGGLE_KEY"}
```
4. You can either put the credentials in your root directory as ``kaggle.json`` or enter your username and key in terminal when asked.

## Services explained

1. SQLiteLoader

Parameters description:

|             Parameter               |             Description                                                                                     |
|-------------------------------------|-------------------------------------------------------------------------------------------------------------|
|             db_name: str            | Name of the database.                                                                                       |
|             table_name: str         | Table name where data will be stored.                                                                       |
|             if_exists: str          | Action if the table already exists. Possible options: ``SQLiteLoader.REPLACE``, ``SQLiteLoader.APPEND``, ``SQLiteLoader.FAIL``.|
|             index: bool             | Write DataFrame index as a column. Uses index_label as the column name in the table. (From pandas Doc).     |
|             method: Callable        | Controls the SQL insertion clause used. (From pandas doc).                                                  |
|             output_directory: str   | Path where the databse is located or wil be created.                                                        |

2. CSVHandler

Parameters description:

|             Parameter               |             Description                                           |
|-------------------------------------|-------------------------------------------------------------------|
|           file_name: str            | Name of the csv file. **It must match with the actual filename.** |
|           sep: str                  | Separetor used in the csv file.                                   |
|           names: list               | Name of the columns if csv file does not contains it.             |
|           dtype: dict               | Type of the columns in the csv file.                              |
|           compression: str          | Options: ``CSVHandler.ZIP_COMPRESSION``, ``CSVHandler.GZIP_COMPRESSION``, ``CSVHandler.BZIP2_COMPRESSION``, ``CSVHandler.ZSTD_COMPRESSION``, ``CSVHandler.XZ_COMPRESSION``, ``CSVHandler.TAR_COMPRESSION``|
|           encoding: str             | Encoding of the file. Default: ``utf-8``.                         |
|           loader: SQLiteLoader      | Object of SQLiteLoader                                            |
|           transformer: Callable     | Function that defines the transformation on the data.             |

3. DataExtractor

Parameters description:

|             Parameter                         |             Description                                                                               |
|-----------------------------------------------|-------------------------------------------------------------------------------------------------------|
|           data_name: str                      | Name of the data. (Could be anything of your choice).                                                 |
|           url: str                            | Url of the data source.                                                                               |
|           type: str                           | Type of the source. Possible options: ``DataExtractor.KAGGLE_ARCHIVE``, ``DataExtractor.CSV``.        |
|           file_handlers: Tuple(CSVHandlers)   | Handler objects to handle the extracted files from the url.                                           |

4. ETLPipeline

Parameters description:

|             Parameter                 |             Description            |
|---------------------------------------|------------------------------------|
|       extractor: DataExtractor        | An object of DataExtractor service.|

5. ETLQueue

Parameters description:

|             Parameter                 |             Description           |
|---------------------------------------|-----------------------------------|
| etl_pipelines: Tuples                 |       Tupes of ETLPipelines       |

## Contributing
This is an open source project and I welcome contributions. Please create an issue first and make a feature branch
assiciated to the issue.

## Local Development Setup

1. Clone the repository:

```
    git clone git@github.com:prantoamt/etl-pipeline-runner.git
```

2. Install pdm package manager based on your local environment: https://pdm-project.org/latest/

3. Go to the project directory and install the requirements using pdm:

```
    pdm install
```

4. Open up the project in VS code, make your changes and create a pull request with proper description.
            

Raw data

            {
    "_id": null,
    "home_page": "",
    "name": "etl-pipeline-runner",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.10",
    "maintainer_email": "",
    "keywords": "",
    "author": "",
    "author_email": "Md Badiuzzaman Pranto <prantoamt@gmail.com>",
    "download_url": "https://files.pythonhosted.org/packages/57/01/f4b3efe8772d06edaefaf9553987798c178d97cc7dd0012c8fc6c8f62a0c/etl_pipeline_runner-1.1.1.tar.gz",
    "platform": null,
    "description": "## etl-pipeline-runner\n\n\nExtract Transform Load (ETL) pipeline runner is a simple, yet effective python \npackage to run ETL-Pipelines for Data sceince projects. The sole purpose of this\npackage is to:\n\n```Extract data from a source --> Transform the data according to the necessity --> Load data to a Database```\n\n![](images/ETL-Pipeline.jpg?raw=true)\n\n\n## Installation\n\nInstall the library with pip:\n\n\n```\n    pip install etl-pipeline-runner\n```\n\n## Usage\n### Run an ETL Pipeline that extracts data from kaggle and stores it in a SQLite Database.\n----------\n\nData source: https://www.kaggle.com/datasets/edenbd/150k-lyrics-labeled-with-spotify-valence\n\nDestination: Under ``songs`` table of ``project.sqlite`` Database. Suppose the database is located or will be created in ``/data`` directory.\n\n#### Example code:\n1. Import the following services from ``etl_pipeline_runner``\n\n```\nfrom etl_pipeline_runner.services import (\n    ETLPipeline,\n    DataExtractor,\n    CSVHandler,\n    SQLiteLoader,\n    ETLQueue,\n)\n```\n\n2. Create a function that defines the transformation you want to perform on the dataset before loading to the Database.\n    The function signature must match the following. Here pd refers to pandas.\n\n```\n    def transform_songs(data_frame: pd.DataFrame):\n        data_frame = data_frame.drop(columns=data_frame.columns[0], axis=1)\n        data_frame = data_frame.rename(columns={\"seq\": \"lyrics\"})\n        return data_frame\n```\n\n3. Create an object of the SQLiteLoader.\n\n```\n    DATA_DIRECTORY = os.path.join(os.getcwd(), \"data\")\n    songs_loader = SQLiteLoader(\n        db_name=\"project.sqlite\",\n        table_name=\"song_lyrics\",\n        if_exists=SQLiteLoader.REPLACE,\n        index=False,\n        method=None,\n        output_directory=DATA_DIRECTORY,\n    )\n```\n\n4. Create an object of the CSVHandler.\n\n``` \n    songs_dtype = {\n        \"#\": \"Int64\",\n        \"artist\": str,\n        \"seq\": str,\n        \"song\": str,\n        \"label\": np.float64,\n    }\n\n    songs_csv_handler = CSVHandler(\n        file_name=\"labeled_lyrics_cleaned.csv\",\n        sep=\",\",\n        names=None,\n        dtype=songs_dtype,\n        transformer=transform_songs,\n        loader=songs_loader,\n    )\n```\n\n5. Create an object of the DataExtractor.\n\n```\n    songs_extractor = DataExtractor(\n        data_name=\"Song lyrics\",\n        url=\"https://www.kaggle.com/datasets/edenbd/150k-lyrics-labeled-with-spotify-valence\",\n        type=DataExtractor.KAGGLE_ARCHIVE,\n        file_handlers=(songs_csv_handler,),\n    )\n```\n\n6. Create an object of ETLPipeline.\n\n```\n    songs_pipeline = ETLPipeline(\n        extractor=songs_extractor,\n    )\n```\n\n7. Finally run the pipeline:\n\n```\n    if __name__ == \"__main__\":\n        ETLQueue(etl_pipelines=(songs_pipeline,)).run()\n```\n\n## Setting-up credentials for KAGGLE Datasource\nIf your data source is kaggle, you need api key to download the dataset.\netl-pipeline-runner uses [opendatasets](https://github.com/JovianHQ/opendatasets) for donwloading dataset from Kaggle.  \nFollowing step will guide you to setup kaggle credentials.\n\n1. Go to https://kaggle.com/me/account (sign in if required).\n2. Scroll down to the \"API\" section and click \"Create New API Token\".\n3. This will download a file kaggle.json with the following contents:\n```\n    {\"username\":\"YOUR_KAGGLE_USERNAME\",\"key\":\"YOUR_KAGGLE_KEY\"}\n```\n4. You can either put the credentials in your root directory as ``kaggle.json`` or enter your username and key in terminal when asked.\n\n## Services explained\n\n1. SQLiteLoader\n\nParameters description:\n\n|             Parameter               |             Description                                                                                     |\n|-------------------------------------|-------------------------------------------------------------------------------------------------------------|\n|             db_name: str            | Name of the database.                                                                                       |\n|             table_name: str         | Table name where data will be stored.                                                                       |\n|             if_exists: str          | Action if the table already exists. Possible options: ``SQLiteLoader.REPLACE``, ``SQLiteLoader.APPEND``, ``SQLiteLoader.FAIL``.|\n|             index: bool             | Write DataFrame index as a column. Uses index_label as the column name in the table. (From pandas Doc).     |\n|             method: Callable        | Controls the SQL insertion clause used. (From pandas doc).                                                  |\n|             output_directory: str   | Path where the databse is located or wil be created.                                                        |\n\n2. CSVHandler\n\nParameters description:\n\n|             Parameter               |             Description                                           |\n|-------------------------------------|-------------------------------------------------------------------|\n|           file_name: str            | Name of the csv file. **It must match with the actual filename.** |\n|           sep: str                  | Separetor used in the csv file.                                   |\n|           names: list               | Name of the columns if csv file does not contains it.             |\n|           dtype: dict               | Type of the columns in the csv file.                              |\n|           compression: str          | Options: ``CSVHandler.ZIP_COMPRESSION``, ``CSVHandler.GZIP_COMPRESSION``, ``CSVHandler.BZIP2_COMPRESSION``, ``CSVHandler.ZSTD_COMPRESSION``, ``CSVHandler.XZ_COMPRESSION``, ``CSVHandler.TAR_COMPRESSION``|\n|           encoding: str             | Encoding of the file. Default: ``utf-8``.                         |\n|           loader: SQLiteLoader      | Object of SQLiteLoader                                            |\n|           transformer: Callable     | Function that defines the transformation on the data.             |\n\n3. DataExtractor\n\nParameters description:\n\n|             Parameter                         |             Description                                                                               |\n|-----------------------------------------------|-------------------------------------------------------------------------------------------------------|\n|           data_name: str                      | Name of the data. (Could be anything of your choice).                                                 |\n|           url: str                            | Url of the data source.                                                                               |\n|           type: str                           | Type of the source. Possible options: ``DataExtractor.KAGGLE_ARCHIVE``, ``DataExtractor.CSV``.        |\n|           file_handlers: Tuple(CSVHandlers)   | Handler objects to handle the extracted files from the url.                                           |\n\n4. ETLPipeline\n\nParameters description:\n\n|             Parameter                 |             Description            |\n|---------------------------------------|------------------------------------|\n|       extractor: DataExtractor        | An object of DataExtractor service.|\n\n5. ETLQueue\n\nParameters description:\n\n|             Parameter                 |             Description           |\n|---------------------------------------|-----------------------------------|\n| etl_pipelines: Tuples                 |       Tupes of ETLPipelines       |\n\n## Contributing\nThis is an open source project and I welcome contributions. Please create an issue first and make a feature branch\nassiciated to the issue.\n\n## Local Development Setup\n\n1. Clone the repository:\n\n```\n    git clone git@github.com:prantoamt/etl-pipeline-runner.git\n```\n\n2. Install pdm package manager based on your local environment: https://pdm-project.org/latest/\n\n3. Go to the project directory and install the requirements using pdm:\n\n```\n    pdm install\n```\n\n4. Open up the project in VS code, make your changes and create a pull request with proper description.",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "A package to run ETL Pipeline",
    "version": "1.1.1",
    "project_urls": null,
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "7004efffdcf5a6e5d0f6b1795d4d603be42c98bba3766043704016bd078ee65b",
                "md5": "cd9cd6038044d9b163fc95997d0e4f18",
                "sha256": "234e326ad0e8c8af0ec17c86a7e44ef6e64db5b529ca78ffa3fd7101cc55baba"
            },
            "downloads": -1,
            "filename": "etl_pipeline_runner-1.1.1-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "cd9cd6038044d9b163fc95997d0e4f18",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.10",
            "size": 7125,
            "upload_time": "2023-12-14T21:47:58",
            "upload_time_iso_8601": "2023-12-14T21:47:58.729318Z",
            "url": "https://files.pythonhosted.org/packages/70/04/efffdcf5a6e5d0f6b1795d4d603be42c98bba3766043704016bd078ee65b/etl_pipeline_runner-1.1.1-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "5701f4b3efe8772d06edaefaf9553987798c178d97cc7dd0012c8fc6c8f62a0c",
                "md5": "d18ec42e1720a3e21db0da3a34427697",
                "sha256": "57833db1e562003c83fef590addb13fdb5987fedd7b55f27c85e0868dda0f3dd"
            },
            "downloads": -1,
            "filename": "etl_pipeline_runner-1.1.1.tar.gz",
            "has_sig": false,
            "md5_digest": "d18ec42e1720a3e21db0da3a34427697",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.10",
            "size": 9966,
            "upload_time": "2023-12-14T21:48:00",
            "upload_time_iso_8601": "2023-12-14T21:48:00.018341Z",
            "url": "https://files.pythonhosted.org/packages/57/01/f4b3efe8772d06edaefaf9553987798c178d97cc7dd0012c8fc6c8f62a0c/etl_pipeline_runner-1.1.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2023-12-14 21:48:00",
    "github": false,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "lcname": "etl-pipeline-runner"
}
        
Elapsed time: 0.16935s