<div align="center">
<img src="logo/fluid_ml_logo.png" width="400px">
_Develop ML pipelines fluently with no boilerplate code. Focus only on your tasks and not the boilerplate!_
---
<p align="center">
<a href="#key-features">Key Features</a> •
<a href="#getting-started">Getting Started</a> •
<a href="#functionality">Functionality</a> •
<a href="#examples">Examples</a> •
<a href="#citation">Citation</a>
</p>
[![Python Versions](https://img.shields.io/pypi/pyversions/fluidml.svg)](https://pypi.org/project/fluidml/)
[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)
[![CircleCI](https://circleci.com/gh/fluidml/fluidml/tree/main.svg?style=shield)](https://circleci.com/gh/fluidml/fluidml/tree/main)
[![codecov](https://codecov.io/gh/fluidml/fluidml/branch/main/graph/badge.svg?token=XG4UDWF8RE)](https://codecov.io/gh/fluidml/fluidml)
[![Documentation Status](https://readthedocs.org/projects/fluidml/badge/?version=latest)](https://fluidml.readthedocs.io/en/latest/?badge=latest)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/fluidml/fluidml/blob/main/CODE_OF_CONDUCT.md)
[//]: # (?style=flat-square)
</div>
---
**FluidML** is a lightweight framework for developing machine learning pipelines.
<div align="center">
<img src="logo/fluidml_example.gif" width="70%" />
</div>
Developing machine learning models is a challenging process, with a wide range of sub-tasks:
data collection, pre-processing, model development, hyper-parameter tuning and deployment.
Each of these tasks is iterative in nature and requires lot of iterations to get it right with good performance.
Due to this, each task is generally developed sequentially, with artifacts from one task being fed as inputs to the subsequent tasks.
For instance, raw datasets are first cleaned, pre-processed, featurized and stored as iterable datasets (on disk), which are then used for model training.
However, this type of development can become messy and un-maintenable quickly for several reasons:
- pipeline code may be split across multiple scripts whose dependencies are not modeled explicitly
- each of this task contains boilerplate code to collect results from previous tasks (eg: reading from disk)
- hard to keep track of task artifacts and their different versions
- hyper-parameter tuning adds further complexity and boilerplate code
FluidML attempts to solve all of the above issues without restricting the user's flexibility.
## Key Features
FluidML provides following functionalities out-of-the-box:
- **Task Graphs** - Create ML pipelines as a directed task graph using simple APIs
- **Results Forwarding** - Results from tasks are automatically forwarded to downstream tasks based on dependencies
- **Parallel Processing** - Execute the task graph parallely with multi-processing
- **Grid Search** - Extend the task graph by enabling grid search on tasks with just one line of code
- **Result Caching** - Task results are persistently cached in a results store (e.g.: Local File Store or a MongoDB Store) and made available for subsequent runs without executing the tasks again and again
- **Flexibility** - Provides full control on your task implementations. You are free to choose any framework of your choice (Sklearn, TensorFlow, Pytorch, Keras, or any of your favorite library)
---
## Getting Started
### Installation
#### 1. From Pip
Simply execute:
```bash
$ pip install fluidml
```
#### 2. From Source
1. Clone the repository,
2. Navigate into the cloned directory (contains the setup.py file),
3. Execute `$ pip install .`
**Note:** To run demo examples, execute `$ pip install fluidml[examples]` (Pip) or `$ pip install .[examples]` (Source) to install the additional requirements.
### Minimal Example
This minimal toy example showcases how to get started with FluidML.
For real machine learning examples, check the "Examples" section below.
#### 1. Define Tasks
First, we define some toy machine learning tasks. A Task can be implemented as a function or as a class inheriting from our `Task` class.
In case of the class approach, each task must implement the `run()` method, which takes some inputs and performs the desired functionality.
These inputs are actually the results from predecessor tasks and are automatically forwarded by FluidML based on registered task dependencies.
If the task has any hyper-parameters, they can be defined as arguments in the constructor.
Additionally, within each task, users have access to special `Task` methods and attributes like
`self.save()` and `self.resource` to save a result and access task resources (more on that later).
```Python
from fluidml import Task
class MyTask(Task):
def __init__(self, config_param_1, config_param_2):
...
def run(self, predecessor_result_1, predecessor_result_2):
...
```
or
```Python
def my_task(predecessor_result_1, predecessor_result_2, config_param_1, config_param_2, task: Task):
...
```
In the case of defining the task as callable, an extra task object is provided to the task,
which makes important internal attributes and functions like `task.save()` and `task.resource` available to the user.
Below, we define standard machine learning tasks such as dataset preparation, pre-processing, featurization and model training using Task classes.
Notice that:
- Each task is implemented individually and it's clear what the inputs are (check arguments of `run()` method)
- Each task saves its results using `self.save(...)` by providing the object to be saved and a unique name for it.
This unique name corresponds to input names in successor task definitions.
```Python
class DatasetFetchTask(Task):
def run(self):
...
self.save(obj=data_fetch_result, name="data_fetch_result")
class PreProcessTask(Task):
def __init__(self, pre_processing_steps: List[str]):
super().__init__()
self._pre_processing_steps = pre_processing_steps
def run(self, data_fetch_result):
...
self.save(obj=pre_process_result, name="pre_process_result")
class TFIDFFeaturizeTask(Task):
def __init__(self, min_df: int, max_features: int):
super().__init__()
self._min_df = min_df
self._max_features = max_features
def run(self, pre_process_result):
...
self.save(obj=tfidf_featurize_result, name="tfidf_featurize_result")
class GloveFeaturizeTask(Task):
def run(self, pre_process_result):
...
self.save(obj=glove_featurize_result, name="glove_featurize_result")
class TrainTask(Task):
def __init__(self, max_iter: int, balanced: str):
super().__init__()
self._max_iter = max_iter
self._class_weight = "balanced" if balanced else None
def run(self, tfidf_featurize_result, glove_featurize_result):
...
self.save(obj=train_result, name="train_result")
class EvaluateTask(Task):
def run(self, train_result):
...
self.save(obj=evaluate_result, name="evaluate_result")
```
#### 2. Task Specifications
Next, we can create the defined tasks with their specifications.
We now only write their specifications, later these are used to create real instances of tasks by FluidML.
Note the `config` argument holds the configuration of the task (i.e. hyper-parameters).
```Python
from fluidml import TaskSpec
dataset_fetch_task = TaskSpec(task=DatasetFetchTask)
pre_process_task = TaskSpec(task=PreProcessTask, config={"pre_processing_steps": ["lower_case", "remove_punct"]})
featurize_task_1 = TaskSpec(task=GloveFeaturizeTask)
featurize_task_2 = TaskSpec(task=TFIDFFeaturizeTask, config={"min_df": 5, "max_features": 1000})
train_task = TaskSpec(task=TrainTask, config={"max_iter": 50, "balanced": True})
evaluate_task = TaskSpec(task=EvaluateTask)
```
#### 3. Registering task dependencies
Here we create the task graph by registering dependencies between the tasks.
In particular, for each task specifier, you can register a list of predecessor tasks using the `requires()` method.
```Python
pre_process_task.requires(dataset_fetch_task)
featurize_task_1.requires(pre_process_task)
featurize_task_2.requires(pre_process_task)
train_task.requires(dataset_fetch_task, featurize_task_1, featurize_task_2)
evaluate_task.requires(dataset_fetch_task, featurize_task_1, featurize_task_2, train_task)
```
#### 4. Configure Logging
FluidML internally utilizes Python's `logging` library. However, we refrain from configuring a logger object with handlers
and formatters since each user has different logging needs and preferences. Hence, if you want to use FluidML's logging
capability, you just have to do the configuration yourself. For convenience, we provide a simple utility function which
configures a visually appealing logger (using a specific handler from the [rich](https://github.com/willmcgugan/rich) library).
We highly recommend to enable logging in your fluidml application in order to benefit from console progress logging.
```python
from fluidml import configure_logging
configure_logging()
```
#### 5. Run tasks using Flow
Now that we have all the tasks specified, we can just run the task graph.
For that, we create the task flow by passing all tasks to the `Flow()` class.
Subsequently, we execute the task graph by calling `flow.run()`.
```Python
from fluidml import Flow
tasks = [dataset_fetch_task, pre_process_task, featurize_task_1,
featurize_task_2, train_task, evaluate_task]
flow = Flow(tasks=tasks)
results = flow.run()
```
---
## Functionality
The following sections highlight the most important features and options when specifying and executing a task pipeline.
For a complete documentation of all available options we refer to the [API documentation](https://fluidml.readthedocs.io/en/latest/).
### Grid Search - Automatic Task Expansion
We can easily enable grid search for our tasks with just one line of code.
We just have to provide the `expand` argument with the `product` and `zip` expansion option to the `TaskSpec` constructor.
Automatically, all `List` elements in the provided config are recursively unpacked and taken into account for expansion.
If a list itself is an argument and should not be expanded, it has to be wrapped again in a list.
```Python
train_task = TaskSpec(
task=TrainTask,
config={"max_iter": [50, 100], "balanced": [True, False], "layers": [[50, 100, 50]]},
expand="product", # or 'zip'
)
```
That's it! Flow expands this task specification into 4 tasks with provided cross product combinations of `max_iter` and `balanced`.
Alternatively, using `zip` the expansion method would result in 2 expanded tasks,
with the respective `max_iter` and `balanced` combinations of `(50, True), (100, False)`.
Note `layers` is not considered for different grid search realizations since it will be unpacked and the actual list
value will be passed to the task.
Further, any successor tasks (for instance, evaluate task) in the task graph will also be automatically expanded.
Therefore, in our example, we would have 4 evaluate tasks, each one corresponding to the 4 train tasks.
For more advanced Gird Search Expansion options we refer to the documentation.
### Model Selection
Running a complete machine learning pipeline usually yields trained models for many grid search parameter combinations.
A common goal is then to automatically determine the best hyper-parameter setup and the best performing model.
FluidML enables just that by providing a `reduce=True` argument to the `TaskSpec` class. Hence, to automatically
compare the 4 evaluate tasks and select the best performing model, we implement an additional `ModelSelectionTask`
which gets wrapped by our `TaskSpec` class.
```Python
class ModelSelectionTask(Task):
def run(self, train_result: List[Sweep]):
# from all trained models/hyper-parameter combinations, determine the best performing model
...
model_selection_task = TaskSpec(task=ModelSelectionTask, reduce=True)
model_selection_task.requires(evaluate_task)
```
The important `reduce=True` argument enables that a single `ModelSelectionTask` instance gets the training results
from all grid search expanded predecessor tasks.
`train_result` is of type `List[Sweep]` and holds the results and configs of all specified grid search parameter combination. For example:
```Python
train_result = [
Sweep(value=value_1, config={...}), # first unique parameter combination config
Sweep(value=value_2, config={...}), # second unique parameter combination config
...
]
```
### Result Store
FluidML provides the `ResultStore` interface to efficiently save, load and delete task results. Internally, the result store is used
to automatically collect saved predecessor results and pass the collected results as inputs to defined successor tasks.
By default, results of tasks are stored in an `InMemoryStore`, which might be impractical for large datasets/models or long running tasks since the results are not persistent.
To have persistent storage, FluidML provides two fully implemented `ResultsStore` namely `LocalFileStore` and `MongoDBStore`.
Additionally, users can provide their own results store to `Flow.run()` by inheriting from the `ResultsStore` interface
and implementing `load()`, `save()`, `delete()`, `delete_run()` and `get_context()` methods.
Note these methods rely on task name and its config parameters, which act as lookup-key for results.
In this way, tasks are skipped by FluidML when task results are already available for the given config.
But users can override and force execute tasks by passing `force` parameter to the `Flow.run()` methods.
For details check the API documentation.
```Python
class MyResultsStore(ResultsStore):
def load(self, name: str, task_name: str, task_unique_config: Dict, **kwargs) -> Optional[Any]:
""" Query method to load an object based on its name, task_name and task_config if it exists """
raise NotImplementedError
def save(self, obj: Any, name: str, type_: str, task_name: str, task_unique_config: Dict, **kwargs):
""" Method to save/update any artifact """
raise NotImplementedError
def delete(self, name: str, task_name: str, task_unique_config: Dict):
""" Method to delete any artifact """
raise NotImplementedError
def delete_run(self, task_name: str, task_unique_config: Dict):
"""Method to delete all task results from a given run config"""
raise NotImplementedError
def get_context(self, task_name: str, task_unique_config: Dict) -> StoreContext:
"""Method to get store specific storage context, e.g. the current run directory for Local File Store"""
raise NotImplementedError
```
We can instantiate for example a `LocalFileStore`
```python
results_store = LocalFileStore(base_dir="/some/dir")
```
and use it to enable persistent results storing via `flow.run(results_store=results_store)`.
### Multiprocessing
FluidML automatically infers the optimal number of worker processes based on the expanded task graph and the number of available CPUs
in your system. If the resulting number is greater than 1, `Flow` will automatically run the graph in parallel using multiprocessing.
If 1 worker is optimal and no multiprocessing is needed, the task graph will be executed in the main process without multiprocessing.
You can manually control the number of workers by providing the `num_workers` argument to `flow.run()`.
### Logging
Internally, FluidML makes use of Python's `logging` library to visualize and log the progress of the task pipeline execution
in the console. We recommend to configure `logging` in your fluidml application for a better user experience.
For convenience, we provide a simple utility function `configure_logging()` which configures a visually appealing logger
(using a specific handler from the [rich](https://github.com/willmcgugan/rich) library). For different logging options
we refer to the documentation.
In the case of executing the task graph in parallel with multiple workers using multiprocessing, the console output might become
garbled and unreadable. In that scenario you can turn on [tmux](https://github.com/tmux/tmux/wiki) logging py providing the `log_to_tmux` argument:
`flow.run(log_to_tmux=True)`. In addition to the standard console, a `tmux` terminal session with `num_worker` panes is automatically started.
Each worker process logs to a dedicated pane in the tmux session so that the console output is nicely readable.
Note `log_to_tmux=True` requires the [installation](https://github.com/tmux/tmux/wiki/Installing) of tmux.
### Visualization
FluidML provides functions to visualize the original task specification graph as well as the (potentially expanded) task graph, which facilitates debugging.
After instantiating the `Flow` object we have access to the task specification graph `flow.task_spec_graph` and the
expanded task graph `flow.task_graph`.
Both graphs can be visualized in the console `visualize_graph_in_console` or in the browser or a jupyter notebook `visualize_graph_interactive`.
```Python
from fluidml.visualization import visualize_graph_in_console, visualize_graph_interactive
flow = Flow(tasks=tasks)
visualize_graph_in_console(graph=flow.task_spec_graph)
visualize_graph_interactive(graph=flow.task_graph, browser="firefox")
```
When using console visualization the default arguments `use_pager=True` and `use_unicode=False` will render the graph in ascii within a pager for horizontal scrolling support.
If `use_pager=False` the graph is simply printed and if `use_unicode=True` a visually more appealing unicode character set is used for console rendering.
However not every console supports unicode characters.
See below the console visualization of the task specification graph and the expanded task graph from our minimal example:
<div align="center">
<img src="logo/task_spec_graph.png" width="500px">
</div>
<div align="center">
<img src="logo/task_graph.png">
</div>
When using interactive visualization the default output is to a running jupyter notebook.
If you want the graph to be rendered in a browser, provide the `browser` argument to `visualize_graph_interactive()`, e.g.
`visualize_graph_interactive(graph=flow.task_graph, browser="chrome")`. You might receive a `webbrowser` error:
`webbrowser.Error: could not locate runnable browser` which means that you have to register the browser manually so that
Python's `webbrowser` library can find it. Registering can be done via
```python
import webbrowser
webbrowser.register(
"chrome", None, webbrowser.BackgroundBrowser("/path/to/chrome/executable")
)
```
---
## Examples
For real machine learning pipelines including grid search implemented with FluidML, check our
Jupyter Notebook tutorials:
- [Transformer based Sequence to Sequence Translation (PyTorch)](https://github.com/fluidml/fluidml/blob/main/examples/pytorch_transformer_seq2seq_translation/transformer_seq2seq_translation.ipynb)
- [Multi-class Text Classification (Sklearn)](https://github.com/fluidml/fluidml/blob/main/examples/sklearn_text_classification/sklearn_text_classification.ipynb)
---
## Citation
```
@article{fluid_ml,
title = {FluidML - a lightweight framework for developing machine learning pipelines},
author = {Hillebrand, Lars and Ramamurthy, Rajkumar},
year = {2020},
publisher = {GitHub},
journal = {GitHub repository},
howpublished = {\url{https://github.com/fluidml/fluidml}},
}
```
Raw data
{
"_id": null,
"home_page": "https://github.com/fluidml/fluidml/",
"name": "fluidml",
"maintainer": "",
"docs_url": null,
"requires_python": ">=3.7.0",
"maintainer_email": "",
"keywords": "pipelines,machine-learning,parallel,deep-learning",
"author": "Lars Hillebrand, Rajkumar Ramamurthy",
"author_email": "hokage555@web.de",
"download_url": "https://files.pythonhosted.org/packages/a5/2b/be497e8fc003d6daa5fa335f62e5a68ec0548b19610cf0722424f4262a4c/fluidml-0.3.4.tar.gz",
"platform": null,
"description": "<div align=\"center\">\n<img src=\"logo/fluid_ml_logo.png\" width=\"400px\">\n\n_Develop ML pipelines fluently with no boilerplate code. Focus only on your tasks and not the boilerplate!_\n\n---\n\n<p align=\"center\">\n <a href=\"#key-features\">Key Features</a> \u2022\n <a href=\"#getting-started\">Getting Started</a> \u2022\n <a href=\"#functionality\">Functionality</a> \u2022\n <a href=\"#examples\">Examples</a> \u2022\n <a href=\"#citation\">Citation</a>\n</p>\n\n[![Python Versions](https://img.shields.io/pypi/pyversions/fluidml.svg)](https://pypi.org/project/fluidml/)\n[![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)\n[![CircleCI](https://circleci.com/gh/fluidml/fluidml/tree/main.svg?style=shield)](https://circleci.com/gh/fluidml/fluidml/tree/main)\n[![codecov](https://codecov.io/gh/fluidml/fluidml/branch/main/graph/badge.svg?token=XG4UDWF8RE)](https://codecov.io/gh/fluidml/fluidml)\n[![Documentation Status](https://readthedocs.org/projects/fluidml/badge/?version=latest)](https://fluidml.readthedocs.io/en/latest/?badge=latest)\n[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/fluidml/fluidml/blob/main/CODE_OF_CONDUCT.md)\n\n[//]: # (?style=flat-square)\n\n</div>\n\n---\n\n**FluidML** is a lightweight framework for developing machine learning pipelines.\n\n<div align=\"center\">\n<img src=\"logo/fluidml_example.gif\" width=\"70%\" />\n</div>\n\nDeveloping machine learning models is a challenging process, with a wide range of sub-tasks: \ndata collection, pre-processing, model development, hyper-parameter tuning and deployment. \nEach of these tasks is iterative in nature and requires lot of iterations to get it right with good performance.\n\nDue to this, each task is generally developed sequentially, with artifacts from one task being fed as inputs to the subsequent tasks. \nFor instance, raw datasets are first cleaned, pre-processed, featurized and stored as iterable datasets (on disk), which are then used for model training. \nHowever, this type of development can become messy and un-maintenable quickly for several reasons:\n\n- pipeline code may be split across multiple scripts whose dependencies are not modeled explicitly\n- each of this task contains boilerplate code to collect results from previous tasks (eg: reading from disk)\n- hard to keep track of task artifacts and their different versions\n- hyper-parameter tuning adds further complexity and boilerplate code\n\nFluidML attempts to solve all of the above issues without restricting the user's flexibility.\n\n## Key Features\n\nFluidML provides following functionalities out-of-the-box:\n\n- **Task Graphs** - Create ML pipelines as a directed task graph using simple APIs\n- **Results Forwarding** - Results from tasks are automatically forwarded to downstream tasks based on dependencies\n- **Parallel Processing** - Execute the task graph parallely with multi-processing\n- **Grid Search** - Extend the task graph by enabling grid search on tasks with just one line of code\n- **Result Caching** - Task results are persistently cached in a results store (e.g.: Local File Store or a MongoDB Store) and made available for subsequent runs without executing the tasks again and again\n- **Flexibility** - Provides full control on your task implementations. You are free to choose any framework of your choice (Sklearn, TensorFlow, Pytorch, Keras, or any of your favorite library)\n\n---\n\n## Getting Started\n\n### Installation\n\n#### 1. From Pip\nSimply execute: \n```bash\n$ pip install fluidml\n```\n\n#### 2. From Source\n1. Clone the repository,\n2. Navigate into the cloned directory (contains the setup.py file),\n3. Execute `$ pip install .`\n\n**Note:** To run demo examples, execute `$ pip install fluidml[examples]` (Pip) or `$ pip install .[examples]` (Source) to install the additional requirements.\n\n### Minimal Example\n\nThis minimal toy example showcases how to get started with FluidML.\nFor real machine learning examples, check the \"Examples\" section below.\n\n#### 1. Define Tasks\n\nFirst, we define some toy machine learning tasks. A Task can be implemented as a function or as a class inheriting from our `Task` class.\n\nIn case of the class approach, each task must implement the `run()` method, which takes some inputs and performs the desired functionality. \nThese inputs are actually the results from predecessor tasks and are automatically forwarded by FluidML based on registered task dependencies. \nIf the task has any hyper-parameters, they can be defined as arguments in the constructor. \nAdditionally, within each task, users have access to special `Task` methods and attributes like \n`self.save()` and `self.resource` to save a result and access task resources (more on that later).\n\n```Python\nfrom fluidml import Task\n\n\nclass MyTask(Task):\n def __init__(self, config_param_1, config_param_2):\n ...\n def run(self, predecessor_result_1, predecessor_result_2):\n ...\n```\n\nor\n\n```Python\ndef my_task(predecessor_result_1, predecessor_result_2, config_param_1, config_param_2, task: Task):\n ...\n```\n\nIn the case of defining the task as callable, an extra task object is provided to the task,\nwhich makes important internal attributes and functions like `task.save()` and `task.resource` available to the user.\n\nBelow, we define standard machine learning tasks such as dataset preparation, pre-processing, featurization and model training using Task classes.\nNotice that:\n\n- Each task is implemented individually and it's clear what the inputs are (check arguments of `run()` method)\n- Each task saves its results using `self.save(...)` by providing the object to be saved and a unique name for it. \n This unique name corresponds to input names in successor task definitions.\n\n```Python\nclass DatasetFetchTask(Task):\n def run(self):\n ... \n self.save(obj=data_fetch_result, name=\"data_fetch_result\")\n\n\nclass PreProcessTask(Task):\n def __init__(self, pre_processing_steps: List[str]):\n super().__init__()\n self._pre_processing_steps = pre_processing_steps\n\n def run(self, data_fetch_result):\n ...\n self.save(obj=pre_process_result, name=\"pre_process_result\")\n\n\nclass TFIDFFeaturizeTask(Task):\n def __init__(self, min_df: int, max_features: int):\n super().__init__()\n self._min_df = min_df\n self._max_features = max_features\n\n def run(self, pre_process_result):\n ...\n self.save(obj=tfidf_featurize_result, name=\"tfidf_featurize_result\")\n\n\nclass GloveFeaturizeTask(Task):\n def run(self, pre_process_result):\n ...\n self.save(obj=glove_featurize_result, name=\"glove_featurize_result\")\n\n\nclass TrainTask(Task):\n def __init__(self, max_iter: int, balanced: str):\n super().__init__()\n self._max_iter = max_iter\n self._class_weight = \"balanced\" if balanced else None\n\n def run(self, tfidf_featurize_result, glove_featurize_result):\n ...\n self.save(obj=train_result, name=\"train_result\")\n\n\nclass EvaluateTask(Task):\n def run(self, train_result):\n ...\n self.save(obj=evaluate_result, name=\"evaluate_result\")\n```\n\n#### 2. Task Specifications\n\nNext, we can create the defined tasks with their specifications. \nWe now only write their specifications, later these are used to create real instances of tasks by FluidML.\n\nNote the `config` argument holds the configuration of the task (i.e. hyper-parameters).\n\n\n```Python\nfrom fluidml import TaskSpec\n\n\ndataset_fetch_task = TaskSpec(task=DatasetFetchTask)\npre_process_task = TaskSpec(task=PreProcessTask, config={\"pre_processing_steps\": [\"lower_case\", \"remove_punct\"]})\nfeaturize_task_1 = TaskSpec(task=GloveFeaturizeTask)\nfeaturize_task_2 = TaskSpec(task=TFIDFFeaturizeTask, config={\"min_df\": 5, \"max_features\": 1000})\ntrain_task = TaskSpec(task=TrainTask, config={\"max_iter\": 50, \"balanced\": True})\nevaluate_task = TaskSpec(task=EvaluateTask)\n```\n\n#### 3. Registering task dependencies\n\nHere we create the task graph by registering dependencies between the tasks. \nIn particular, for each task specifier, you can register a list of predecessor tasks using the `requires()` method.\n\n```Python\npre_process_task.requires(dataset_fetch_task)\nfeaturize_task_1.requires(pre_process_task)\nfeaturize_task_2.requires(pre_process_task)\ntrain_task.requires(dataset_fetch_task, featurize_task_1, featurize_task_2)\nevaluate_task.requires(dataset_fetch_task, featurize_task_1, featurize_task_2, train_task)\n```\n\n#### 4. Configure Logging\n\nFluidML internally utilizes Python's `logging` library. However, we refrain from configuring a logger object with handlers\nand formatters since each user has different logging needs and preferences. Hence, if you want to use FluidML's logging\ncapability, you just have to do the configuration yourself. For convenience, we provide a simple utility function which\nconfigures a visually appealing logger (using a specific handler from the [rich](https://github.com/willmcgugan/rich) library).\n\nWe highly recommend to enable logging in your fluidml application in order to benefit from console progress logging.\n\n```python\nfrom fluidml import configure_logging\n\n\nconfigure_logging()\n```\n\n#### 5. Run tasks using Flow\n\nNow that we have all the tasks specified, we can just run the task graph. \nFor that, we create the task flow by passing all tasks to the `Flow()` class.\nSubsequently, we execute the task graph by calling `flow.run()`.\n\n```Python\nfrom fluidml import Flow\n\n\ntasks = [dataset_fetch_task, pre_process_task, featurize_task_1,\n featurize_task_2, train_task, evaluate_task]\n\nflow = Flow(tasks=tasks)\nresults = flow.run()\n```\n\n---\n\n## Functionality\n\nThe following sections highlight the most important features and options when specifying and executing a task pipeline.\nFor a complete documentation of all available options we refer to the [API documentation](https://fluidml.readthedocs.io/en/latest/).\n\n### Grid Search - Automatic Task Expansion\n\nWe can easily enable grid search for our tasks with just one line of code. \nWe just have to provide the `expand` argument with the `product` and `zip` expansion option to the `TaskSpec` constructor. \nAutomatically, all `List` elements in the provided config are recursively unpacked and taken into account for expansion.\nIf a list itself is an argument and should not be expanded, it has to be wrapped again in a list. \n\n```Python\ntrain_task = TaskSpec(\n task=TrainTask,\n config={\"max_iter\": [50, 100], \"balanced\": [True, False], \"layers\": [[50, 100, 50]]},\n expand=\"product\", # or 'zip'\n)\n```\n\nThat's it! Flow expands this task specification into 4 tasks with provided cross product combinations of `max_iter` and `balanced`. \nAlternatively, using `zip` the expansion method would result in 2 expanded tasks, \nwith the respective `max_iter` and `balanced` combinations of `(50, True), (100, False)`. \n\nNote `layers` is not considered for different grid search realizations since it will be unpacked and the actual list \nvalue will be passed to the task.\nFurther, any successor tasks (for instance, evaluate task) in the task graph will also be automatically expanded. \nTherefore, in our example, we would have 4 evaluate tasks, each one corresponding to the 4 train tasks.\n\nFor more advanced Gird Search Expansion options we refer to the documentation.\n\n\n### Model Selection\n\nRunning a complete machine learning pipeline usually yields trained models for many grid search parameter combinations.\nA common goal is then to automatically determine the best hyper-parameter setup and the best performing model.\nFluidML enables just that by providing a `reduce=True` argument to the `TaskSpec` class. Hence, to automatically \ncompare the 4 evaluate tasks and select the best performing model, we implement an additional `ModelSelectionTask`\nwhich gets wrapped by our `TaskSpec` class.\n\n```Python\nclass ModelSelectionTask(Task):\n def run(self, train_result: List[Sweep]):\n # from all trained models/hyper-parameter combinations, determine the best performing model\n ...\n\nmodel_selection_task = TaskSpec(task=ModelSelectionTask, reduce=True)\n\nmodel_selection_task.requires(evaluate_task)\n```\n\nThe important `reduce=True` argument enables that a single `ModelSelectionTask` instance gets the training results\nfrom all grid search expanded predecessor tasks.\n`train_result` is of type `List[Sweep]` and holds the results and configs of all specified grid search parameter combination. For example:\n\n```Python\ntrain_result = [\n Sweep(value=value_1, config={...}), # first unique parameter combination config\n Sweep(value=value_2, config={...}), # second unique parameter combination config\n ...\n]\n```\n\n### Result Store\n\nFluidML provides the `ResultStore` interface to efficiently save, load and delete task results. Internally, the result store is used\nto automatically collect saved predecessor results and pass the collected results as inputs to defined successor tasks.\n\nBy default, results of tasks are stored in an `InMemoryStore`, which might be impractical for large datasets/models or long running tasks since the results are not persistent. \nTo have persistent storage, FluidML provides two fully implemented `ResultsStore` namely `LocalFileStore` and `MongoDBStore`.\n\nAdditionally, users can provide their own results store to `Flow.run()` by inheriting from the `ResultsStore` interface \nand implementing `load()`, `save()`, `delete()`, `delete_run()` and `get_context()` methods. \nNote these methods rely on task name and its config parameters, which act as lookup-key for results. \nIn this way, tasks are skipped by FluidML when task results are already available for the given config. \nBut users can override and force execute tasks by passing `force` parameter to the `Flow.run()` methods. \nFor details check the API documentation.\n\n```Python\nclass MyResultsStore(ResultsStore):\n def load(self, name: str, task_name: str, task_unique_config: Dict, **kwargs) -> Optional[Any]:\n \"\"\" Query method to load an object based on its name, task_name and task_config if it exists \"\"\"\n raise NotImplementedError\n\n def save(self, obj: Any, name: str, type_: str, task_name: str, task_unique_config: Dict, **kwargs):\n \"\"\" Method to save/update any artifact \"\"\"\n raise NotImplementedError\n\n def delete(self, name: str, task_name: str, task_unique_config: Dict):\n \"\"\" Method to delete any artifact \"\"\"\n raise NotImplementedError\n\n def delete_run(self, task_name: str, task_unique_config: Dict):\n \"\"\"Method to delete all task results from a given run config\"\"\"\n raise NotImplementedError\n\n def get_context(self, task_name: str, task_unique_config: Dict) -> StoreContext:\n \"\"\"Method to get store specific storage context, e.g. the current run directory for Local File Store\"\"\"\n raise NotImplementedError\n```\n\nWe can instantiate for example a `LocalFileStore`\n\n```python\nresults_store = LocalFileStore(base_dir=\"/some/dir\")\n```\n\nand use it to enable persistent results storing via `flow.run(results_store=results_store)`.\n\n### Multiprocessing\n\nFluidML automatically infers the optimal number of worker processes based on the expanded task graph and the number of available CPUs\nin your system. If the resulting number is greater than 1, `Flow` will automatically run the graph in parallel using multiprocessing.\nIf 1 worker is optimal and no multiprocessing is needed, the task graph will be executed in the main process without multiprocessing.\n\nYou can manually control the number of workers by providing the `num_workers` argument to `flow.run()`.\n\n### Logging\n\nInternally, FluidML makes use of Python's `logging` library to visualize and log the progress of the task pipeline execution\nin the console. We recommend to configure `logging` in your fluidml application for a better user experience.\nFor convenience, we provide a simple utility function `configure_logging()` which configures a visually appealing logger \n(using a specific handler from the [rich](https://github.com/willmcgugan/rich) library). For different logging options \nwe refer to the documentation.\n\nIn the case of executing the task graph in parallel with multiple workers using multiprocessing, the console output might become\ngarbled and unreadable. In that scenario you can turn on [tmux](https://github.com/tmux/tmux/wiki) logging py providing the `log_to_tmux` argument:\n`flow.run(log_to_tmux=True)`. In addition to the standard console, a `tmux` terminal session with `num_worker` panes is automatically started.\nEach worker process logs to a dedicated pane in the tmux session so that the console output is nicely readable.\n\nNote `log_to_tmux=True` requires the [installation](https://github.com/tmux/tmux/wiki/Installing) of tmux.\n\n\n### Visualization\n\nFluidML provides functions to visualize the original task specification graph as well as the (potentially expanded) task graph, which facilitates debugging.\nAfter instantiating the `Flow` object we have access to the task specification graph `flow.task_spec_graph` and the\nexpanded task graph `flow.task_graph`.\n\nBoth graphs can be visualized in the console `visualize_graph_in_console` or in the browser or a jupyter notebook `visualize_graph_interactive`.\n\n```Python\nfrom fluidml.visualization import visualize_graph_in_console, visualize_graph_interactive\n\n\nflow = Flow(tasks=tasks)\n\nvisualize_graph_in_console(graph=flow.task_spec_graph)\nvisualize_graph_interactive(graph=flow.task_graph, browser=\"firefox\")\n```\nWhen using console visualization the default arguments `use_pager=True` and `use_unicode=False` will render the graph in ascii within a pager for horizontal scrolling support. \nIf `use_pager=False` the graph is simply printed and if `use_unicode=True` a visually more appealing unicode character set is used for console rendering. \nHowever not every console supports unicode characters.\n\nSee below the console visualization of the task specification graph and the expanded task graph from our minimal example:\n\n<div align=\"center\">\n<img src=\"logo/task_spec_graph.png\" width=\"500px\">\n</div>\n\n<div align=\"center\">\n<img src=\"logo/task_graph.png\">\n</div>\n\nWhen using interactive visualization the default output is to a running jupyter notebook.\nIf you want the graph to be rendered in a browser, provide the `browser` argument to `visualize_graph_interactive()`, e.g. \n`visualize_graph_interactive(graph=flow.task_graph, browser=\"chrome\")`. You might receive a `webbrowser` error: \n`webbrowser.Error: could not locate runnable browser` which means that you have to register the browser manually so that\nPython's `webbrowser` library can find it. Registering can be done via\n\n```python\nimport webbrowser\nwebbrowser.register(\n \"chrome\", None, webbrowser.BackgroundBrowser(\"/path/to/chrome/executable\")\n)\n```\n\n---\n\n## Examples\n\nFor real machine learning pipelines including grid search implemented with FluidML, check our\nJupyter Notebook tutorials:\n\n- [Transformer based Sequence to Sequence Translation (PyTorch)](https://github.com/fluidml/fluidml/blob/main/examples/pytorch_transformer_seq2seq_translation/transformer_seq2seq_translation.ipynb)\n- [Multi-class Text Classification (Sklearn)](https://github.com/fluidml/fluidml/blob/main/examples/sklearn_text_classification/sklearn_text_classification.ipynb)\n\n---\n\n## Citation\n\n```\n@article{fluid_ml,\n title = {FluidML - a lightweight framework for developing machine learning pipelines},\n author = {Hillebrand, Lars and Ramamurthy, Rajkumar},\n year = {2020},\n publisher = {GitHub},\n journal = {GitHub repository},\n howpublished = {\\url{https://github.com/fluidml/fluidml}},\n}\n```\n\n\n",
"bugtrack_url": null,
"license": "Apache-2.0",
"summary": "FluidML is a lightweight framework for developing machine learning pipelines. Focus only on your tasks and not the boilerplate!",
"version": "0.3.4",
"project_urls": {
"Download": "https://github.com/fluidml/fluidml/",
"Homepage": "https://github.com/fluidml/fluidml/"
},
"split_keywords": [
"pipelines",
"machine-learning",
"parallel",
"deep-learning"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "40e94322d281c28773957ce0102920404445660df0ba471ec77d01f009558724",
"md5": "2e6b362c1dc7b917df10447e96d8c7b0",
"sha256": "5f464e25b6b9679ebf8097802f97e2cfd69ad23e70a8ee210089a1cbc67ba83f"
},
"downloads": -1,
"filename": "fluidml-0.3.4-py3-none-any.whl",
"has_sig": false,
"md5_digest": "2e6b362c1dc7b917df10447e96d8c7b0",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.7.0",
"size": 82205,
"upload_time": "2023-10-20T19:26:37",
"upload_time_iso_8601": "2023-10-20T19:26:37.631116Z",
"url": "https://files.pythonhosted.org/packages/40/e9/4322d281c28773957ce0102920404445660df0ba471ec77d01f009558724/fluidml-0.3.4-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "a52bbe497e8fc003d6daa5fa335f62e5a68ec0548b19610cf0722424f4262a4c",
"md5": "dccb360d098ed5156cee53defb8f5a92",
"sha256": "ccdfacd86939bca288f369baf104d9f6574ec89e71b4f449e1fbf28f33daabd8"
},
"downloads": -1,
"filename": "fluidml-0.3.4.tar.gz",
"has_sig": false,
"md5_digest": "dccb360d098ed5156cee53defb8f5a92",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.7.0",
"size": 74349,
"upload_time": "2023-10-20T19:26:39",
"upload_time_iso_8601": "2023-10-20T19:26:39.219284Z",
"url": "https://files.pythonhosted.org/packages/a5/2b/be497e8fc003d6daa5fa335f62e5a68ec0548b19610cf0722424f4262a4c/fluidml-0.3.4.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2023-10-20 19:26:39",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "fluidml",
"github_project": "fluidml",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"circle": true,
"lcname": "fluidml"
}