[![codecov](https://codecov.io/gh/nvn-nil/CloudSDP/branch/main/graph/badge.svg?token=P0U7YNO17D)](https://codecov.io/gh/nvn-nil/CloudSDP)
[![test](https://github.com/nvn-nil/CloudSDP/actions/workflows/ci.yaml/badge.svg)](https://github.com/nvn-nil/CloudSDP/actions/workflows/ci.yaml)
# CloudSDP Library
The CloudSDP library is designed to simplify the creation and management of serverless data pipelines between Google Cloud Run and Google BigQuery. It provides a developer-friendly interface to extract data from various sources, transform it, and seamlessly load it into BigQuery tables, all while leveraging the power of serverless architecture.
## Features
WIP:
- **Data Extraction and Ingestion**: Extract data from various sources, convert it into a common format, and ingest it into BigQuery tables.
TODO:
- **Data Transformation**: Perform data transformations, such as cleaning, enrichment, and normalization, before loading into BigQuery.
- **Scheduled Jobs and Triggers**: Schedule data pipeline jobs based on time triggers using Cloud Scheduler.
- **Data Pipeline Workflow**: Define and orchestrate data pipeline workflows with configurable execution order and dependencies.
- **Conflict Resolution and Error Handling**: Implement conflict resolution strategies and error handling mechanisms for reliable data processing.
- **Monitoring and Logging**: Monitor job progress, resource utilization, and performance metrics using integrated logging and monitoring tools.
- **Documentation and Examples**: Comprehensive documentation and code examples to guide developers in using the library effectively.
## Installation
Install the library using pip:
`pip install cloudsdp`
Or, install the library using poetry:
`poetry add cloudsdp`
## QuickStart
### Data Ingestion
#### Create dataset, ingest data and cleanup
Ingest data from a pandas dataframe:
```py
import os
import pandas as pd
from cloudsdp.api.bigquery import BigQuery, WriteDisposition
PROJECT_NAME = "project_name"
def main():
bq = BigQuery(PROJECT_NAME)
dataset_name = "dataset_1"
table_name = "table_1"
data = {
"name": [ f"Name{str(el)}" for el in range(0, 10000)],
"score": [ num for num in range(0, 10000)]
}
df = pd.DataFrame(data)
data_schema = [
{"name": "name", "field_type": "STRING", "mode": "REQUIRED"},
{"name": "score", "field_type": "NUMERIC", "mode": "REQUIRED"},
]
bq.create_dataset(dataset_name)
bq.create_table(table_name, data_schema, dataset_name)
bq.ingest_from_dataframe(df, dataset_name, table_name, write_disposition=WriteDisposition.WRITE_IF_TABLE_EMPTY)
bq.delete_dataset(dataset_name, delete_contents=True, not_found_ok=True)
```
From a list of python dicts:
```py
import os
from cloudsdp.api.bigquery import BigQuery
PROJECT_NAME = "project_name"
def main():
bq = BigQuery(PROJECT_NAME)
dataset_name = "dataset_1"
table_name = "table_1"
data = [{"name": "Someone", "age": 29}, {"name": "Something", "age": 22}]
data_schema = [
{"name": "name", "field_type": "STRING", "mode": "REQUIRED"},
{"name": "age", "field_type": "INTEGER", "mode": "REQUIRED"},
]
bq.create_dataset(dataset_name)
bq.create_table(table_name, data_schema, dataset_name)
errors = bq.ingest_rows_json(data, dataset_name, table_name)
if errors:
print("Errors", ";".join(errors))
bq.delete_dataset(dataset_name, delete_contents=True, not_found_ok=True)
```
From csv files stored in GCS:
```py
import os
from cloudsdp.api.bigquery import BigQuery
PROJECT_NAME = "project_name"
def main():
bq = BigQuery(PROJECT_NAME)
dataset_name = "dataset_1"
table_name = "table_1"
data_schema = [
{"name": "name", "field_type": "STRING", "mode": "REQUIRED"},
{"name": "age", "field_type": "INTEGER", "mode": "REQUIRED"},
]
bq.create_dataset(dataset_name)
bq.create_table(table_name, data_schema, dataset_name)
csv_uris = ["gs://mybucket/name_age_data_1.csv", "gs://mybucket/name_age_data_2.csv"]
result = bq.ingest_csvs_from_cloud_bucket(
csv_uris, dataset_name, table_name, skip_leading_rows=1, autodetect_schema=False, timeout=120
)
print(result)
bq.delete_dataset(dataset_name, delete_contents=True, not_found_ok=True)
```
Raw data
{
"_id": null,
"home_page": "https://github.com/nvn-nil/CloudSDP",
"name": "cloudsdp",
"maintainer": "",
"docs_url": null,
"requires_python": ">=3.9,<4.0",
"maintainer_email": "",
"keywords": "cloud,data,processing,extraction,transformation,ingestion,ETL,BigQuery,Google Cloud,data pipeline",
"author": "Naveen Anil",
"author_email": "naveenms01@gmail.com",
"download_url": "https://files.pythonhosted.org/packages/94/ac/d928e9897cbc623b154bc9e416b28056e7ee97447ce8c7c60311e0161b92/cloudsdp-0.1.11.tar.gz",
"platform": null,
"description": "[![codecov](https://codecov.io/gh/nvn-nil/CloudSDP/branch/main/graph/badge.svg?token=P0U7YNO17D)](https://codecov.io/gh/nvn-nil/CloudSDP)\n[![test](https://github.com/nvn-nil/CloudSDP/actions/workflows/ci.yaml/badge.svg)](https://github.com/nvn-nil/CloudSDP/actions/workflows/ci.yaml)\n\n# CloudSDP Library\n\nThe CloudSDP library is designed to simplify the creation and management of serverless data pipelines between Google Cloud Run and Google BigQuery. It provides a developer-friendly interface to extract data from various sources, transform it, and seamlessly load it into BigQuery tables, all while leveraging the power of serverless architecture.\n\n## Features\n\nWIP:\n\n- **Data Extraction and Ingestion**: Extract data from various sources, convert it into a common format, and ingest it into BigQuery tables.\n\nTODO:\n\n- **Data Transformation**: Perform data transformations, such as cleaning, enrichment, and normalization, before loading into BigQuery.\n- **Scheduled Jobs and Triggers**: Schedule data pipeline jobs based on time triggers using Cloud Scheduler.\n- **Data Pipeline Workflow**: Define and orchestrate data pipeline workflows with configurable execution order and dependencies.\n- **Conflict Resolution and Error Handling**: Implement conflict resolution strategies and error handling mechanisms for reliable data processing.\n- **Monitoring and Logging**: Monitor job progress, resource utilization, and performance metrics using integrated logging and monitoring tools.\n- **Documentation and Examples**: Comprehensive documentation and code examples to guide developers in using the library effectively.\n\n## Installation\n\nInstall the library using pip:\n\n`pip install cloudsdp`\n\nOr, install the library using poetry:\n\n`poetry add cloudsdp`\n\n## QuickStart\n\n### Data Ingestion\n\n#### Create dataset, ingest data and cleanup\n\nIngest data from a pandas dataframe:\n\n```py\nimport os\nimport pandas as pd\n\nfrom cloudsdp.api.bigquery import BigQuery, WriteDisposition\n\n\nPROJECT_NAME = \"project_name\"\n\n\ndef main():\n bq = BigQuery(PROJECT_NAME)\n dataset_name = \"dataset_1\"\n table_name = \"table_1\"\n\n data = {\n \"name\": [ f\"Name{str(el)}\" for el in range(0, 10000)],\n \"score\": [ num for num in range(0, 10000)]\n }\n df = pd.DataFrame(data)\n data_schema = [\n {\"name\": \"name\", \"field_type\": \"STRING\", \"mode\": \"REQUIRED\"},\n {\"name\": \"score\", \"field_type\": \"NUMERIC\", \"mode\": \"REQUIRED\"},\n ]\n\n bq.create_dataset(dataset_name)\n bq.create_table(table_name, data_schema, dataset_name)\n\n bq.ingest_from_dataframe(df, dataset_name, table_name, write_disposition=WriteDisposition.WRITE_IF_TABLE_EMPTY)\n\n bq.delete_dataset(dataset_name, delete_contents=True, not_found_ok=True)\n```\n\nFrom a list of python dicts:\n\n```py\nimport os\n\nfrom cloudsdp.api.bigquery import BigQuery\n\nPROJECT_NAME = \"project_name\"\n\n\ndef main():\n bq = BigQuery(PROJECT_NAME)\n dataset_name = \"dataset_1\"\n table_name = \"table_1\"\n\n data = [{\"name\": \"Someone\", \"age\": 29}, {\"name\": \"Something\", \"age\": 22}]\n\n data_schema = [\n {\"name\": \"name\", \"field_type\": \"STRING\", \"mode\": \"REQUIRED\"},\n {\"name\": \"age\", \"field_type\": \"INTEGER\", \"mode\": \"REQUIRED\"},\n ]\n\n bq.create_dataset(dataset_name)\n\n bq.create_table(table_name, data_schema, dataset_name)\n\n errors = bq.ingest_rows_json(data, dataset_name, table_name)\n if errors:\n print(\"Errors\", \";\".join(errors))\n\n bq.delete_dataset(dataset_name, delete_contents=True, not_found_ok=True)\n```\n\nFrom csv files stored in GCS:\n\n```py\n\nimport os\n\nfrom cloudsdp.api.bigquery import BigQuery\n\n\nPROJECT_NAME = \"project_name\"\n\n\ndef main():\n bq = BigQuery(PROJECT_NAME)\n dataset_name = \"dataset_1\"\n table_name = \"table_1\"\n\n data_schema = [\n {\"name\": \"name\", \"field_type\": \"STRING\", \"mode\": \"REQUIRED\"},\n {\"name\": \"age\", \"field_type\": \"INTEGER\", \"mode\": \"REQUIRED\"},\n ]\n\n bq.create_dataset(dataset_name)\n\n bq.create_table(table_name, data_schema, dataset_name)\n\n csv_uris = [\"gs://mybucket/name_age_data_1.csv\", \"gs://mybucket/name_age_data_2.csv\"]\n\n result = bq.ingest_csvs_from_cloud_bucket(\n csv_uris, dataset_name, table_name, skip_leading_rows=1, autodetect_schema=False, timeout=120\n )\n print(result)\n\n bq.delete_dataset(dataset_name, delete_contents=True, not_found_ok=True)\n```\n",
"bugtrack_url": null,
"license": "",
"summary": "",
"version": "0.1.11",
"project_urls": {
"Documentation": "https://nvn-nil.github.io/CloudSDP/",
"Homepage": "https://github.com/nvn-nil/CloudSDP",
"Repository": "https://github.com/nvn-nil/CloudSDP"
},
"split_keywords": [
"cloud",
"data",
"processing",
"extraction",
"transformation",
"ingestion",
"etl",
"bigquery",
"google cloud",
"data pipeline"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "0ba6b8da92fa8ead1f5fa57d2ad0280f63979f681c0e139b05f65d67a4fd5e49",
"md5": "e43c437e712339c17ced42cabe603cf6",
"sha256": "25f26768501fe00a93534c6a8c968efd0bae365375e8ce21451725003b0df38f"
},
"downloads": -1,
"filename": "cloudsdp-0.1.11-py3-none-any.whl",
"has_sig": false,
"md5_digest": "e43c437e712339c17ced42cabe603cf6",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.9,<4.0",
"size": 7487,
"upload_time": "2023-08-15T16:46:52",
"upload_time_iso_8601": "2023-08-15T16:46:52.236586Z",
"url": "https://files.pythonhosted.org/packages/0b/a6/b8da92fa8ead1f5fa57d2ad0280f63979f681c0e139b05f65d67a4fd5e49/cloudsdp-0.1.11-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "94acd928e9897cbc623b154bc9e416b28056e7ee97447ce8c7c60311e0161b92",
"md5": "3001cc7dee58b39f05d5a85d14f5cb28",
"sha256": "cb1c5bb83a73c986898f3e12ea6938dc591cfa74015f6ba600e0b4c706cd783c"
},
"downloads": -1,
"filename": "cloudsdp-0.1.11.tar.gz",
"has_sig": false,
"md5_digest": "3001cc7dee58b39f05d5a85d14f5cb28",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.9,<4.0",
"size": 6314,
"upload_time": "2023-08-15T16:46:53",
"upload_time_iso_8601": "2023-08-15T16:46:53.953639Z",
"url": "https://files.pythonhosted.org/packages/94/ac/d928e9897cbc623b154bc9e416b28056e7ee97447ce8c7c60311e0161b92/cloudsdp-0.1.11.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2023-08-15 16:46:53",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "nvn-nil",
"github_project": "CloudSDP",
"travis_ci": false,
"coveralls": true,
"github_actions": true,
"lcname": "cloudsdp"
}