# Data CI
DataCI is a comprehensive platform for track your data-centric AI pipeline in dynamic streaming data settings. It
provides rich APIs for seamless streaming dataset management, data-centric pipeline development, versioning
and evaluation on streaming scenarios.
DataCI is featured for
- :hatching_chick: **Easy to use**: Enable DataCI management by only few lines of modification to your
existing data-centric functions and pipelines
- :card_index_dividers: **Version Control**: Track versions of streaming data, data-centric pipelines, running results
and their lineages
- :ocean: **Streaming Simulation**: Testbed for developing and evaluating data-centric pipelines under simulated
streaming settings (WIP)
This project is still under development. Please feel free to open an issue if you have any questions or suggestions.
- [Installation](#installation-construction)
- [Quick Start](#quick-start-truck)
- [Examples (for Advanced Usages)](#examples-books)
- [Citation](#citation-seedling)
- [Contributors](#contributors-sparkles)
- [License](#license-page_facing_up)
## Installation :construction:
Easy install DataCI from [PyPI](https://pypi.org/project/ml-data-ci/)
```shell
pip install ml-data-ci
```
Manual install DataCI package from source code:
```shell
pip install .
```
:warning: For pipeline orchestration backend framework (i.e., [Apache-Airflow](https://airflow.apache.org/)), you may
need to install manually since the version to meet your deployment can be different.
Here is a snapshot for [quick installation](https://airflow.apache.org/docs/apache-airflow/stable/installation/installing-from-pypi.html):
```shell
AIRFLOW_VERSION=2.6.1
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow[celery]==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
```
Initialize DataCI by:
```shell
dataci init
```
:warning: This command will initialize the DataCI, you should run this command only once when installing.
Start DataCI server by:
```shell
dataci start
```
## Quick Start :truck:
With few lines of modification, you can easily track every version of the input, output data and code for your existing
data pipeline. You can also see the below example script [here :scroll:](./example/text_process/text_process_ci.py).
1. Write a data augmentation stage:
```python
import augly.text as textaugs
from dataci.plugins.decorators import stage
@stage
def text_augmentation(df):
aug_function = textaugs.ReplaceSimilarUnicodeChars()
df['text'] = aug_function(df['text'].tolist())
return df
```
2. Define the text processing CI pipeline:
```python
from datetime import datetime
from dataci.models import Event
from dataci.plugins.decorators import dag, Dataset
@dag(
start_date=datetime(2020, 7, 30), schedule=None,
trigger=[Event('publish', 'yelp_review@*', producer_type='dataset', status='success')],
)
def text_process_ci():
# Instead of manually track the dataset version:
# raw_dataset_train = df.read_csv(f'yelp_review_{DATASET_VERSION}/train.csv')
# input dataset versioning is automatically handled by DataCI
raw_dataset_train = Dataset.get('yelp_review@latest')
text_aug_df = text_augmentation(raw_dataset_train)
# Instead of manually track the output dataset version:
# text_aug_df.to_csv(f'text_aug_{DATASET_VERSION}/train.csv')
# output dataset versioning is automatically handled by DataCI.
Dataset(name='text_aug', dataset_files=text_aug_df)
# Build the pipeline
text_process_ci_pipeline = text_process_ci()
```
We write the workflow process in a function `text_process_ci` in a normal way, and then decorate it with `@dag` to
convert it to a DataCI tracked workflow. With the `trigger` argument, we can specify the trigger event for the
pipeline. In this example, the pipeline will be triggered when a new version of `yelp_review` dataset is published.
3. Manually run the pipeline:
```python
from dataci.models import Dataset
if __name__ == '__main__':
# Prepare the input dataset, you can either provide a list or a file path
yelp_review_dataset = Dataset('yelp_review', dataset_files=[
{'date': '2020-10-05 00:44:08', 'review_id': 'HWRpzNHPqjA4pxN5863QUA', 'stars': 5.0, 'text': "I called Anytime on Friday afternoon about the number pad lock on my front door. After several questions, the gentleman asked me if I had changed the battery.",},
{'date': '2020-10-15 04:34:49', 'review_id': '01plHaNGM92IT0LLcHjovQ', 'stars': 5.0, 'text': "Friend took me for lunch. Ordered the Chicken Pecan Tart although it was like a piece quiche, was absolutely delicious!",},
{'date': '2020-10-17 06:58:09', 'review_id': '7CDDSuzoxTr4H5N4lOi9zw', 'stars': 4.0, 'text': "I love coming here for my fruit and vegetables. It is always fresh and a great variety. The bags of already diced veggies are a huge time saver.",},
]).publish(version_tag='2020-10')
# Test the pipeline locally
text_process_ci_pipeline.test()
# Publish the pipeline: text_process_ci@v1 and its stage text_augmentation@v1 will be tracked
text_process_ci_pipeline.publish()
# Run the pipeline on the server
run_id = text_process_ci_pipeline.run()
```
The text process CI pipeline will be triggered and run. Go
to [pipeline runs dashboard](http://localhost:8080/taskinstance/list/?_flt_3_dag_id=testspace--text_process_ci--v1)
to check the pipeline run result.
Note that the you should write pipeline testing and trigger code aside from the pipeline definition code.
Otherwise, the pipeline will be triggered every time when you import the pipeline definition code.
4. Automatically trigger the CI pipeline by publish a new version of input dataset:
```python
from dataci.models import Dataset
if __name__ == '__main__':
# DataCI auto track the new version of yelp_review dataset
yelp_review_dataset = Dataset('yelp_review', dataset_files=[
{'date': '2020-11-02 00:59:29', 'review_id': 'LTr95e6eOmLc7S_1WxM88Q', 'stars': 5.0, 'text': "First of all the owner and staff went above and beyond to make us feel comfortable and protected during covid. Secondly, the bar had fantastic drinks",},
{'date': '2020-11-16 22:07:48', 'review_id': 'UhA9H0LK59qZegWOxyotcA', 'stars': 5.0, 'text': "Herbies is awesome! My brunch was perfect, service, food, and drinks! I will definitely continue coming back.",},
{'date': '2020-11-18 06:38:46', 'review_id': 'YqTMxlbebNBDcKYTIUvrdw', 'stars': 5.0, 'text': "You won't regret stopping here, hidden gem with great food and a laid back and comfortable atmosphere, a place you can gather with friends and they will treat you like family while you're there.",},
]).publish(version_tag='2020-11')
```
Before executing the above code, please wait some time for the previous manual triggered run to finish.
Upon the publish of the new version of input dataset, the text process CI pipeline is automatically
triggered and evaluate on the new version of streaming data.
Go to [pipeline runs dashboard](http://localhost:8080/taskinstance/list/?_flt_3_dag_id=default--text_process_ci--v1)
to check the pipeline run result.
## Examples :books:
We have provided several examples to demonstrate the advanced usage of DataCI:
- [Continuous Text Classification Development with Streaming Data](example/continuous_text_classification_dev/README.md)
- [DataCI Playground](./example/ci/README.md) (WIP)
## Citation :seedling:
Our tech report is available at [arXiv:2306.15538](https://arxiv.org/abs/2306.15538)
and [DMLR@ICML'23](https://dmlr.ai/assets/accepted-papers/42/CameraReady/DataCI_v3_camera_ready.pdf). Please cite as:
```bibtex
@article{zhang2023dataci,
title = {DataCI: A Platform for Data-Centric AI on Streaming Data},
author = {Zhang, Huaizheng and Huang, Yizheng and Li, Yuanming},
journal = {arXiv preprint arXiv:2306.15538},
year = {2023}
}
```
## Contributors :sparkles:
- Yuanming Li
- Huaizheng Zhang
- Yizheng Huang
## License :page_facing_up:
This repository is open-sourced under [MIT License](./LICENSE).
Raw data
{
"_id": null,
"home_page": "https://github.com/MLSysOps/DataCI",
"name": "ml-data-ci",
"maintainer": "",
"docs_url": null,
"requires_python": "",
"maintainer_email": "",
"keywords": "Data-centric AI,Streaming Data,MLOps,Pipeline Tracking,Data Versioning",
"author": "Yuanming Li",
"author_email": "yuanmingleee@gmail.com",
"download_url": "https://files.pythonhosted.org/packages/91/5f/ae1443ced9b0caea6b4271a5cd6c6f3cd6d5df1a91375be0468b73866fad/ml-data-ci-0.0.2.tar.gz",
"platform": null,
"description": "# Data CI\n\nDataCI is a comprehensive platform for track your data-centric AI pipeline in dynamic streaming data settings. It\nprovides rich APIs for seamless streaming dataset management, data-centric pipeline development, versioning\nand evaluation on streaming scenarios.\n\nDataCI is featured for\n- :hatching_chick: **Easy to use**: Enable DataCI management by only few lines of modification to your\n existing data-centric functions and pipelines\n- :card_index_dividers: **Version Control**: Track versions of streaming data, data-centric pipelines, running results\n and their lineages\n- :ocean: **Streaming Simulation**: Testbed for developing and evaluating data-centric pipelines under simulated\n streaming settings (WIP)\n\nThis project is still under development. Please feel free to open an issue if you have any questions or suggestions.\n- [Installation](#installation-construction)\n- [Quick Start](#quick-start-truck)\n- [Examples (for Advanced Usages)](#examples-books)\n- [Citation](#citation-seedling)\n- [Contributors](#contributors-sparkles)\n- [License](#license-page_facing_up)\n\n## Installation :construction:\n\nEasy install DataCI from [PyPI](https://pypi.org/project/ml-data-ci/)\n```shell\npip install ml-data-ci\n```\n\nManual install DataCI package from source code:\n```shell\npip install .\n```\n:warning: For pipeline orchestration backend framework (i.e., [Apache-Airflow](https://airflow.apache.org/)), you may\nneed to install manually since the version to meet your deployment can be different.\nHere is a snapshot for [quick installation](https://airflow.apache.org/docs/apache-airflow/stable/installation/installing-from-pypi.html):\n```shell\nAIRFLOW_VERSION=2.6.1\nPYTHON_VERSION=\"$(python --version | cut -d \" \" -f 2 | cut -d \".\" -f 1-2)\"\nCONSTRAINT_URL=\"https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt\"\npip install \"apache-airflow[celery]==${AIRFLOW_VERSION}\" --constraint \"${CONSTRAINT_URL}\"\n```\n\nInitialize DataCI by:\n\n```shell\ndataci init\n```\n\n:warning: This command will initialize the DataCI, you should run this command only once when installing.\n\nStart DataCI server by:\n\n```shell\ndataci start\n```\n\n## Quick Start :truck:\n\nWith few lines of modification, you can easily track every version of the input, output data and code for your existing\ndata pipeline. You can also see the below example script [here :scroll:](./example/text_process/text_process_ci.py).\n\n1. Write a data augmentation stage:\n\n ```python\n import augly.text as textaugs\n from dataci.plugins.decorators import stage\n \n \n @stage\n def text_augmentation(df):\n aug_function = textaugs.ReplaceSimilarUnicodeChars()\n df['text'] = aug_function(df['text'].tolist())\n return df\n ```\n\n2. Define the text processing CI pipeline:\n\n ```python\n from datetime import datetime\n from dataci.models import Event\n from dataci.plugins.decorators import dag, Dataset\n \n \n @dag(\n start_date=datetime(2020, 7, 30), schedule=None,\n trigger=[Event('publish', 'yelp_review@*', producer_type='dataset', status='success')],\n )\n def text_process_ci():\n # Instead of manually track the dataset version: \n # raw_dataset_train = df.read_csv(f'yelp_review_{DATASET_VERSION}/train.csv')\n # input dataset versioning is automatically handled by DataCI\n raw_dataset_train = Dataset.get('yelp_review@latest')\n text_aug_df = text_augmentation(raw_dataset_train)\n \n # Instead of manually track the output dataset version:\n # text_aug_df.to_csv(f'text_aug_{DATASET_VERSION}/train.csv')\n # output dataset versioning is automatically handled by DataCI.\n Dataset(name='text_aug', dataset_files=text_aug_df)\n \n # Build the pipeline\n text_process_ci_pipeline = text_process_ci()\n ```\n We write the workflow process in a function `text_process_ci` in a normal way, and then decorate it with `@dag` to\n convert it to a DataCI tracked workflow. With the `trigger` argument, we can specify the trigger event for the\n pipeline. In this example, the pipeline will be triggered when a new version of `yelp_review` dataset is published.\n\n3. Manually run the pipeline:\n\n ```python\n from dataci.models import Dataset\n \n \n if __name__ == '__main__':\n # Prepare the input dataset, you can either provide a list or a file path\n yelp_review_dataset = Dataset('yelp_review', dataset_files=[\n {'date': '2020-10-05 00:44:08', 'review_id': 'HWRpzNHPqjA4pxN5863QUA', 'stars': 5.0, 'text': \"I called Anytime on Friday afternoon about the number pad lock on my front door. After several questions, the gentleman asked me if I had changed the battery.\",},\n {'date': '2020-10-15 04:34:49', 'review_id': '01plHaNGM92IT0LLcHjovQ', 'stars': 5.0, 'text': \"Friend took me for lunch. Ordered the Chicken Pecan Tart although it was like a piece quiche, was absolutely delicious!\",},\n {'date': '2020-10-17 06:58:09', 'review_id': '7CDDSuzoxTr4H5N4lOi9zw', 'stars': 4.0, 'text': \"I love coming here for my fruit and vegetables. It is always fresh and a great variety. The bags of already diced veggies are a huge time saver.\",},\n ]).publish(version_tag='2020-10')\n # Test the pipeline locally\n text_process_ci_pipeline.test()\n # Publish the pipeline: text_process_ci@v1 and its stage text_augmentation@v1 will be tracked\n text_process_ci_pipeline.publish()\n # Run the pipeline on the server\n run_id = text_process_ci_pipeline.run()\n ```\n The text process CI pipeline will be triggered and run. Go\n to [pipeline runs dashboard](http://localhost:8080/taskinstance/list/?_flt_3_dag_id=testspace--text_process_ci--v1)\n to check the pipeline run result. \n Note that the you should write pipeline testing and trigger code aside from the pipeline definition code.\n Otherwise, the pipeline will be triggered every time when you import the pipeline definition code.\n\n4. Automatically trigger the CI pipeline by publish a new version of input dataset:\n\n ```python\n from dataci.models import Dataset\n \n \n if __name__ == '__main__':\n # DataCI auto track the new version of yelp_review dataset\n yelp_review_dataset = Dataset('yelp_review', dataset_files=[\n {'date': '2020-11-02 00:59:29', 'review_id': 'LTr95e6eOmLc7S_1WxM88Q', 'stars': 5.0, 'text': \"First of all the owner and staff went above and beyond to make us feel comfortable and protected during covid. Secondly, the bar had fantastic drinks\",},\n {'date': '2020-11-16 22:07:48', 'review_id': 'UhA9H0LK59qZegWOxyotcA', 'stars': 5.0, 'text': \"Herbies is awesome! My brunch was perfect, service, food, and drinks! I will definitely continue coming back.\",},\n {'date': '2020-11-18 06:38:46', 'review_id': 'YqTMxlbebNBDcKYTIUvrdw', 'stars': 5.0, 'text': \"You won't regret stopping here, hidden gem with great food and a laid back and comfortable atmosphere, a place you can gather with friends and they will treat you like family while you're there.\",},\n ]).publish(version_tag='2020-11')\n ```\n Before executing the above code, please wait some time for the previous manual triggered run to finish.\n Upon the publish of the new version of input dataset, the text process CI pipeline is automatically\n triggered and evaluate on the new version of streaming data.\n Go to [pipeline runs dashboard](http://localhost:8080/taskinstance/list/?_flt_3_dag_id=default--text_process_ci--v1)\n to check the pipeline run result.\n\n## Examples :books:\n\nWe have provided several examples to demonstrate the advanced usage of DataCI:\n\n- [Continuous Text Classification Development with Streaming Data](example/continuous_text_classification_dev/README.md)\n- [DataCI Playground](./example/ci/README.md) (WIP)\n\n## Citation :seedling:\n\nOur tech report is available at [arXiv:2306.15538](https://arxiv.org/abs/2306.15538)\nand [DMLR@ICML'23](https://dmlr.ai/assets/accepted-papers/42/CameraReady/DataCI_v3_camera_ready.pdf). Please cite as:\n\n```bibtex\n@article{zhang2023dataci,\n title = {DataCI: A Platform for Data-Centric AI on Streaming Data},\n author = {Zhang, Huaizheng and Huang, Yizheng and Li, Yuanming},\n journal = {arXiv preprint arXiv:2306.15538},\n year = {2023}\n}\n```\n\n## Contributors :sparkles:\n- Yuanming Li\n- Huaizheng Zhang\n- Yizheng Huang\n\n## License :page_facing_up:\nThis repository is open-sourced under [MIT License](./LICENSE).\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "A platform for tracking data-centric AI pipelines in dynamic streaming data",
"version": "0.0.2",
"project_urls": {
"Homepage": "https://github.com/MLSysOps/DataCI"
},
"split_keywords": [
"data-centric ai",
"streaming data",
"mlops",
"pipeline tracking",
"data versioning"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "c5bc47f21160ee95577b2b12f7fd153023f9b6352b324a66ccaeba46d8089a7a",
"md5": "bc80bb352b4c452fb6c9aaee72c5e559",
"sha256": "cefeef0693c064a2c123c927f1075da02bc3924dfcfd151f98c1f6300ccd8d16"
},
"downloads": -1,
"filename": "ml_data_ci-0.0.2-py3-none-any.whl",
"has_sig": false,
"md5_digest": "bc80bb352b4c452fb6c9aaee72c5e559",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": null,
"size": 114832,
"upload_time": "2023-09-08T15:26:59",
"upload_time_iso_8601": "2023-09-08T15:26:59.244932Z",
"url": "https://files.pythonhosted.org/packages/c5/bc/47f21160ee95577b2b12f7fd153023f9b6352b324a66ccaeba46d8089a7a/ml_data_ci-0.0.2-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "915fae1443ced9b0caea6b4271a5cd6c6f3cd6d5df1a91375be0468b73866fad",
"md5": "8815373e37bd4f853fc820f20d156300",
"sha256": "a6dacec51d283b8c02fc697398c9ae4df0443b78096a60c64f19ac3165e244dd"
},
"downloads": -1,
"filename": "ml-data-ci-0.0.2.tar.gz",
"has_sig": false,
"md5_digest": "8815373e37bd4f853fc820f20d156300",
"packagetype": "sdist",
"python_version": "source",
"requires_python": null,
"size": 85224,
"upload_time": "2023-09-08T15:27:02",
"upload_time_iso_8601": "2023-09-08T15:27:02.010052Z",
"url": "https://files.pythonhosted.org/packages/91/5f/ae1443ced9b0caea6b4271a5cd6c6f3cd6d5df1a91375be0468b73866fad/ml-data-ci-0.0.2.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2023-09-08 15:27:02",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "MLSysOps",
"github_project": "DataCI",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"requirements": [],
"lcname": "ml-data-ci"
}