cloudsdp


Namecloudsdp JSON
Version 0.1.11 PyPI version JSON
download
home_pagehttps://github.com/nvn-nil/CloudSDP
Summary
upload_time2023-08-15 16:46:53
maintainer
docs_urlNone
authorNaveen Anil
requires_python>=3.9,<4.0
license
keywords cloud data processing extraction transformation ingestion etl bigquery google cloud data pipeline
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage
            [![codecov](https://codecov.io/gh/nvn-nil/CloudSDP/branch/main/graph/badge.svg?token=P0U7YNO17D)](https://codecov.io/gh/nvn-nil/CloudSDP)
[![test](https://github.com/nvn-nil/CloudSDP/actions/workflows/ci.yaml/badge.svg)](https://github.com/nvn-nil/CloudSDP/actions/workflows/ci.yaml)

# CloudSDP Library

The CloudSDP library is designed to simplify the creation and management of serverless data pipelines between Google Cloud Run and Google BigQuery. It provides a developer-friendly interface to extract data from various sources, transform it, and seamlessly load it into BigQuery tables, all while leveraging the power of serverless architecture.

## Features

WIP:

- **Data Extraction and Ingestion**: Extract data from various sources, convert it into a common format, and ingest it into BigQuery tables.

TODO:

- **Data Transformation**: Perform data transformations, such as cleaning, enrichment, and normalization, before loading into BigQuery.
- **Scheduled Jobs and Triggers**: Schedule data pipeline jobs based on time triggers using Cloud Scheduler.
- **Data Pipeline Workflow**: Define and orchestrate data pipeline workflows with configurable execution order and dependencies.
- **Conflict Resolution and Error Handling**: Implement conflict resolution strategies and error handling mechanisms for reliable data processing.
- **Monitoring and Logging**: Monitor job progress, resource utilization, and performance metrics using integrated logging and monitoring tools.
- **Documentation and Examples**: Comprehensive documentation and code examples to guide developers in using the library effectively.

## Installation

Install the library using pip:

`pip install cloudsdp`

Or, install the library using poetry:

`poetry add cloudsdp`

## QuickStart

### Data Ingestion

#### Create dataset, ingest data and cleanup

Ingest data from a pandas dataframe:

```py
import os
import pandas as pd

from cloudsdp.api.bigquery import BigQuery, WriteDisposition


PROJECT_NAME = "project_name"


def main():
    bq = BigQuery(PROJECT_NAME)
    dataset_name = "dataset_1"
    table_name = "table_1"

    data = {
        "name": [ f"Name{str(el)}" for el in range(0, 10000)],
        "score": [ num for num in range(0, 10000)]
    }
    df = pd.DataFrame(data)
    data_schema = [
        {"name": "name", "field_type": "STRING", "mode": "REQUIRED"},
        {"name": "score", "field_type": "NUMERIC", "mode": "REQUIRED"},
    ]

    bq.create_dataset(dataset_name)
    bq.create_table(table_name, data_schema, dataset_name)

    bq.ingest_from_dataframe(df, dataset_name, table_name, write_disposition=WriteDisposition.WRITE_IF_TABLE_EMPTY)

    bq.delete_dataset(dataset_name, delete_contents=True, not_found_ok=True)
```

From a list of python dicts:

```py
import os

from cloudsdp.api.bigquery import BigQuery

PROJECT_NAME = "project_name"


def main():
    bq = BigQuery(PROJECT_NAME)
    dataset_name = "dataset_1"
    table_name = "table_1"

    data = [{"name": "Someone", "age": 29}, {"name": "Something", "age": 22}]

    data_schema = [
        {"name": "name", "field_type": "STRING", "mode": "REQUIRED"},
        {"name": "age", "field_type": "INTEGER", "mode": "REQUIRED"},
    ]

    bq.create_dataset(dataset_name)

    bq.create_table(table_name, data_schema, dataset_name)

    errors = bq.ingest_rows_json(data, dataset_name, table_name)
    if errors:
        print("Errors", ";".join(errors))

    bq.delete_dataset(dataset_name, delete_contents=True, not_found_ok=True)
```

From csv files stored in GCS:

```py

import os

from cloudsdp.api.bigquery import BigQuery


PROJECT_NAME = "project_name"


def main():
    bq = BigQuery(PROJECT_NAME)
    dataset_name = "dataset_1"
    table_name = "table_1"

    data_schema = [
        {"name": "name", "field_type": "STRING", "mode": "REQUIRED"},
        {"name": "age", "field_type": "INTEGER", "mode": "REQUIRED"},
    ]

    bq.create_dataset(dataset_name)

    bq.create_table(table_name, data_schema, dataset_name)

    csv_uris = ["gs://mybucket/name_age_data_1.csv", "gs://mybucket/name_age_data_2.csv"]

    result = bq.ingest_csvs_from_cloud_bucket(
        csv_uris, dataset_name, table_name, skip_leading_rows=1, autodetect_schema=False, timeout=120
    )
    print(result)

    bq.delete_dataset(dataset_name, delete_contents=True, not_found_ok=True)
```

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/nvn-nil/CloudSDP",
    "name": "cloudsdp",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.9,<4.0",
    "maintainer_email": "",
    "keywords": "cloud,data,processing,extraction,transformation,ingestion,ETL,BigQuery,Google Cloud,data pipeline",
    "author": "Naveen Anil",
    "author_email": "naveenms01@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/94/ac/d928e9897cbc623b154bc9e416b28056e7ee97447ce8c7c60311e0161b92/cloudsdp-0.1.11.tar.gz",
    "platform": null,
    "description": "[![codecov](https://codecov.io/gh/nvn-nil/CloudSDP/branch/main/graph/badge.svg?token=P0U7YNO17D)](https://codecov.io/gh/nvn-nil/CloudSDP)\n[![test](https://github.com/nvn-nil/CloudSDP/actions/workflows/ci.yaml/badge.svg)](https://github.com/nvn-nil/CloudSDP/actions/workflows/ci.yaml)\n\n# CloudSDP Library\n\nThe CloudSDP library is designed to simplify the creation and management of serverless data pipelines between Google Cloud Run and Google BigQuery. It provides a developer-friendly interface to extract data from various sources, transform it, and seamlessly load it into BigQuery tables, all while leveraging the power of serverless architecture.\n\n## Features\n\nWIP:\n\n- **Data Extraction and Ingestion**: Extract data from various sources, convert it into a common format, and ingest it into BigQuery tables.\n\nTODO:\n\n- **Data Transformation**: Perform data transformations, such as cleaning, enrichment, and normalization, before loading into BigQuery.\n- **Scheduled Jobs and Triggers**: Schedule data pipeline jobs based on time triggers using Cloud Scheduler.\n- **Data Pipeline Workflow**: Define and orchestrate data pipeline workflows with configurable execution order and dependencies.\n- **Conflict Resolution and Error Handling**: Implement conflict resolution strategies and error handling mechanisms for reliable data processing.\n- **Monitoring and Logging**: Monitor job progress, resource utilization, and performance metrics using integrated logging and monitoring tools.\n- **Documentation and Examples**: Comprehensive documentation and code examples to guide developers in using the library effectively.\n\n## Installation\n\nInstall the library using pip:\n\n`pip install cloudsdp`\n\nOr, install the library using poetry:\n\n`poetry add cloudsdp`\n\n## QuickStart\n\n### Data Ingestion\n\n#### Create dataset, ingest data and cleanup\n\nIngest data from a pandas dataframe:\n\n```py\nimport os\nimport pandas as pd\n\nfrom cloudsdp.api.bigquery import BigQuery, WriteDisposition\n\n\nPROJECT_NAME = \"project_name\"\n\n\ndef main():\n    bq = BigQuery(PROJECT_NAME)\n    dataset_name = \"dataset_1\"\n    table_name = \"table_1\"\n\n    data = {\n        \"name\": [ f\"Name{str(el)}\" for el in range(0, 10000)],\n        \"score\": [ num for num in range(0, 10000)]\n    }\n    df = pd.DataFrame(data)\n    data_schema = [\n        {\"name\": \"name\", \"field_type\": \"STRING\", \"mode\": \"REQUIRED\"},\n        {\"name\": \"score\", \"field_type\": \"NUMERIC\", \"mode\": \"REQUIRED\"},\n    ]\n\n    bq.create_dataset(dataset_name)\n    bq.create_table(table_name, data_schema, dataset_name)\n\n    bq.ingest_from_dataframe(df, dataset_name, table_name, write_disposition=WriteDisposition.WRITE_IF_TABLE_EMPTY)\n\n    bq.delete_dataset(dataset_name, delete_contents=True, not_found_ok=True)\n```\n\nFrom a list of python dicts:\n\n```py\nimport os\n\nfrom cloudsdp.api.bigquery import BigQuery\n\nPROJECT_NAME = \"project_name\"\n\n\ndef main():\n    bq = BigQuery(PROJECT_NAME)\n    dataset_name = \"dataset_1\"\n    table_name = \"table_1\"\n\n    data = [{\"name\": \"Someone\", \"age\": 29}, {\"name\": \"Something\", \"age\": 22}]\n\n    data_schema = [\n        {\"name\": \"name\", \"field_type\": \"STRING\", \"mode\": \"REQUIRED\"},\n        {\"name\": \"age\", \"field_type\": \"INTEGER\", \"mode\": \"REQUIRED\"},\n    ]\n\n    bq.create_dataset(dataset_name)\n\n    bq.create_table(table_name, data_schema, dataset_name)\n\n    errors = bq.ingest_rows_json(data, dataset_name, table_name)\n    if errors:\n        print(\"Errors\", \";\".join(errors))\n\n    bq.delete_dataset(dataset_name, delete_contents=True, not_found_ok=True)\n```\n\nFrom csv files stored in GCS:\n\n```py\n\nimport os\n\nfrom cloudsdp.api.bigquery import BigQuery\n\n\nPROJECT_NAME = \"project_name\"\n\n\ndef main():\n    bq = BigQuery(PROJECT_NAME)\n    dataset_name = \"dataset_1\"\n    table_name = \"table_1\"\n\n    data_schema = [\n        {\"name\": \"name\", \"field_type\": \"STRING\", \"mode\": \"REQUIRED\"},\n        {\"name\": \"age\", \"field_type\": \"INTEGER\", \"mode\": \"REQUIRED\"},\n    ]\n\n    bq.create_dataset(dataset_name)\n\n    bq.create_table(table_name, data_schema, dataset_name)\n\n    csv_uris = [\"gs://mybucket/name_age_data_1.csv\", \"gs://mybucket/name_age_data_2.csv\"]\n\n    result = bq.ingest_csvs_from_cloud_bucket(\n        csv_uris, dataset_name, table_name, skip_leading_rows=1, autodetect_schema=False, timeout=120\n    )\n    print(result)\n\n    bq.delete_dataset(dataset_name, delete_contents=True, not_found_ok=True)\n```\n",
    "bugtrack_url": null,
    "license": "",
    "summary": "",
    "version": "0.1.11",
    "project_urls": {
        "Documentation": "https://nvn-nil.github.io/CloudSDP/",
        "Homepage": "https://github.com/nvn-nil/CloudSDP",
        "Repository": "https://github.com/nvn-nil/CloudSDP"
    },
    "split_keywords": [
        "cloud",
        "data",
        "processing",
        "extraction",
        "transformation",
        "ingestion",
        "etl",
        "bigquery",
        "google cloud",
        "data pipeline"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "0ba6b8da92fa8ead1f5fa57d2ad0280f63979f681c0e139b05f65d67a4fd5e49",
                "md5": "e43c437e712339c17ced42cabe603cf6",
                "sha256": "25f26768501fe00a93534c6a8c968efd0bae365375e8ce21451725003b0df38f"
            },
            "downloads": -1,
            "filename": "cloudsdp-0.1.11-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "e43c437e712339c17ced42cabe603cf6",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.9,<4.0",
            "size": 7487,
            "upload_time": "2023-08-15T16:46:52",
            "upload_time_iso_8601": "2023-08-15T16:46:52.236586Z",
            "url": "https://files.pythonhosted.org/packages/0b/a6/b8da92fa8ead1f5fa57d2ad0280f63979f681c0e139b05f65d67a4fd5e49/cloudsdp-0.1.11-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "94acd928e9897cbc623b154bc9e416b28056e7ee97447ce8c7c60311e0161b92",
                "md5": "3001cc7dee58b39f05d5a85d14f5cb28",
                "sha256": "cb1c5bb83a73c986898f3e12ea6938dc591cfa74015f6ba600e0b4c706cd783c"
            },
            "downloads": -1,
            "filename": "cloudsdp-0.1.11.tar.gz",
            "has_sig": false,
            "md5_digest": "3001cc7dee58b39f05d5a85d14f5cb28",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.9,<4.0",
            "size": 6314,
            "upload_time": "2023-08-15T16:46:53",
            "upload_time_iso_8601": "2023-08-15T16:46:53.953639Z",
            "url": "https://files.pythonhosted.org/packages/94/ac/d928e9897cbc623b154bc9e416b28056e7ee97447ce8c7c60311e0161b92/cloudsdp-0.1.11.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2023-08-15 16:46:53",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "nvn-nil",
    "github_project": "CloudSDP",
    "travis_ci": false,
    "coveralls": true,
    "github_actions": true,
    "lcname": "cloudsdp"
}
        
Elapsed time: 0.13266s