FlowerPower


NameFlowerPower JSON
Version 0.5.4 PyPI version JSON
download
home_pageNone
SummaryA simple workflow framework. Hamilton + APScheduler = FlowerPower
upload_time2024-09-06 13:01:24
maintainerNone
docs_urlNone
authorNone
requires_python>=3.11
licenseNone
keywords apscheduler dask hamilton pipeline ray scheduler workflow
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            <center><h1>FlowerPower</h1></center>
<center><h2>Simple Workflow Framework - Hamilton + APScheduler = FlowerPower</h2></center>

<center><img src="./image.png" alt="Bild" width="600" height="400" ></center>

## Table of Contents
1. [Overview](#overview)
2. [Installation](#installation)
3. [Usage](#usage)
    1. [Initialze a new flowerpower project](#initialze-a-new-flowerpower-project)
    2. [Add a new pipeline](#add-a-new-pipeline)
    3. [Setup a pipeline](#setup-a-pipeline)
    4. [Run a pipeline](#run-a-pipeline)
    5. [Schedule a pipeline](#schedule-a-pipeline)
    6. [Start a woker](#start-a-woker)
    7. [Track a pipeline](#track-a-pipeline)
    8. [Optional: Dev Services](#optional-dev-services)


## Overview
FlowerPower is a simple workflow framework based on the fantastic python libraries [Hamilton](https://github.com/DAGWorks-Inc/hamilton) and [APScheduler (Advanced Python Scheduler)](https://github.com/agronholm/apscheduler) 

**[Hamilton](https://github.com/DAGWorks-Inc/hamilton)** is used as the core engine to create Directed Acyclic Graphs (DAGs) from your pipeline functions and execute them in a controlled manner. It is highly recommended to read the [Hamilton documentation](https://hamilton.dagworks.io/en/latest/) and check out their [examples](https://github.com/DAGWorks-Inc/hamilton/examples) to understand the core concepts of FlowerPower.

**[APScheduler](https://github.com/agronholm/apscheduler)** is used to schedule the pipeline execution. You can schedule the pipeline to run at a specific time, at a specific interval or at a specific cron expression. Furthermore, APScheduler can be used to run the pipeline in a distributed environment. 


#### The main features of the FlowerPower framework are:

1. **Pipeline Workflows**: FlowerPower uses the Hamilton library to create Directed Acyclic Graphs (DAGs) from pipeline functions, allowing you to define and execute complex workflow pipelines.

2. **Scheduling and Execution**: FlowerPower integrates the APScheduler library to allow scheduling of pipeline executions at specific times, intervals, or based on cron expressions. It supports running pipelines in a distributed environment with a data store and event broker.

3. **Parameterization**: FlowerPower allows you to parameterize your pipeline functions, either by setting default values or defining parameters in a configuration file.

4. **Tracking and Monitoring**: FlowerPower can integrate with the Hamilton UI to provide tracking and monitoring of pipeline executions, including visualization of the DAG.

5. **Flexible Configuration**: FlowerPower uses configuration files (`conf/pipelines.yml`, `conf/scheduler.yml`, `conf/tracker.yml`) to set up pipelines, scheduling, and tracking, allowing for easy customization.

6. **Distributed Execution**: FlowerPower supports running pipelines in a distributed environment by using a data store (e.g., PostgreSQL, MongoDB, SQLite) to persist job information and an event broker (e.g., Redis, MQTT) for communication between the scheduler and workers.

7. **Easy Setup and Usage**: FlowerPower provides command-line tools and Python APIs for initializing new projects, adding new pipelines, running and scheduling pipelines, and starting workers.

Overall, FlowerPower aims to provide a simple and flexible workflow framework that combines the power of Hamilton and APScheduler to enable the creation and execution of complex data pipelines, with support for scheduling, distribution, and monitoring.

## Installation

```shell
pip install flowerpower 
# scheduling 
pip install "flowerpower[scheduler]" 
# mqtt event broker
pip install "flowerpower[scheduler,mqtt]" 
# redis event broker
pip install "flowerpower[scheduler,redis]" 
# mongodb data store
pip install "flowerpower[scheduler,mongodb]"
# distributed computing using ray
pip install "flowerpower[scheduler,ray]"
# dask distributed computing using dask
pip install "flowerpower[scheduler,dask]"
```

## Usage

### Initialze a new flowerpower project
In the command line, run the following command to initialize a new flowerpower project
```shell
flowerpower init new-project
cd new-project
```
This adds basic config files `conf/pipelines.yml`, `conf/scheduler.yml` and `conf/tracker.yml`

or run the following in a python script/REPL
```python
from flowerpower import init
init("new-project")
```

### Add a new pipeline
In the command line run
```shell
flowerpower add my_flow
# or
flowerpower new my_flow
```
A new file `pipelines/my_flow.py` is created and the relevant entries are added to the config files.

or run the following in a python script/REPL
```python
from flowerpower.pipeline import PipelineManager, new

# using the PipelineManager
pm = PipelineManager()
pm.new("my_flow")

# or using the new function
new("my_flow")
```

### Setup a pipeline
##### Add pipeline functions
Add the pipeline functions to `pipelines/my_flow.py`.

FlowerPower uses [Hamilton](https://github.com/DAGWorks-Inc/hamilton) that converts your pipeline functions into nodes and then creates a [Directed Acyclic Graph (DAG)](https://en.wikipedia.org/wiki/Directed_acyclic_graph).

It is therefore mandatory to write your pipeline files according to the Hamilton paradigm. You can read more about this in the Hamilton documentaion chapter [Function, Nodes and DataFlow](https://hamilton.dagworks.io/en/latest/concepts/node/)

##### Parameterize the pipeline functions

To parameterize the pipeline  functions, you can either set reasonable default values in the function definition or define the parameters in the `conf/pipelines.yml` file.

###### 1. Set default values
Set the default values in the function definition
```python
...

def add_int_col(df: pd.DataFrame, col_name: str="foo", values:str="bar") -> pd.DataFrame:
    return df.assign(**{col_name: values})

...
```
###### 2. Define the parameters in the `conf/pipelines.yml`
Add the parameters to the `conf/pipelines.yml` file
```yaml
...

params:
  my_flow:
    add_int_col:
      col_name: foo
      values: bar

...
```
and add the parameterized decorator to the function
```python
...

@parameterize(**PARAMS.add_int_col)
def add_int_col(df: pd.DataFrame, col_name: str, values:int) -> pd.DataFrame:
    return df.assign(**{col_name: values})

...
```

### Run a pipeline
A flowerpower pipeline can be run in the command line or in a python script/REPL. 

When using the `run` command or function, the pipeline is executed in the current process. Using the `run-job` command or `run_job` function, the pipeline is executed as an apscheduler job and the job results is returned. The `add-job` command or `add_job`function is used to add a pipeline as an apscheduler job and a `job_id` is returned.This `job_id`, can be used to fetch the job result from the data store. The latter two options are useful when you want to run the pipeline in a distributed environment. Note: To execute these commands, you need to have a data store and an event broker setup and a worker running.

The 

In the command line
```shell
# This runs the pipeline wihtout using the apscheduler data store
flowerpower run my_flow
# This runs the pipeline as an apscheduler job
flowerpower run-job my_flow 
# This runs the pipeline as an apscheduler job using the apscheduler data store to store the job results
flowerpower add-job my_flow 
```
In a python script/REPL
```python
from flowerpower.pipeline import Pipeline, run, run_job, add_job

# using the Pipeline class
p = Pipeline("my_flow")
res = p.run()

# or using the run function
res = run("my_flow")

# or using the run_job function
res = run_job("my_flow")

# or using the add_job function
job_id = add_job("my_flow") # job_id can be used to fetch the job result from the data store
```

The above runs the pipelines with the default parameters, which are defined in the `conf/pipelines.yml` file. 

##### Configure the pipeline run parameters
You can configure the pipeline run parameters in the `conf/pipelines.yml` file
```yaml
...

run:
  my_flow:
    # dev and prod are the run environments. You can define as many environments as you want
    dev:
      # define all the pipeline input parameters here
      inputs:
        data_path: path/to/data.csv
        fs_profile: 
        fs_prtocol: local 
      # here you define the pipeline functions, for which you want to store the final result
      final_vars: 
        [
            add_int_col,
            final_df
        ]
      # wheter to use the Hamilton UI tracker or not. Note: Hamilton UI needs to be installed and running
      with_tracker: false
    
    ...
```
##### Overwrite the default parameters
You can overwrite the default parameters by using the parameters `--inputs` and `--final-vars` in the command line
```shell
# to run the pipeline with the parameters defined in the prod environment
flowerpower run my_flow --inputs data_path=path/to/data.csv,fs_protocol=local --final-vars final_df --enviroment prod
```
or in a python script/REPL
```python
from flowerpower.pipeline import run
res=run(
    "my_flow", 
    inputs={"data_path": "path/to/data.csv", "fs_protocol": "local"}, 
    final_vars=["final_df"],
    environment="prod"
)
```

### Schedule a pipeline
You can setup a scheduler for a flowerpower pipeline to run at a specific time, at a specific interval or based on a specific cron expression. Adding a flowerpower pipeline to the scheduler is done in the command line or in a python script/REPL.

```shell
# schedule the pipeline to run every 30 seconds
flowerpower schedule my_flow --type interval --interval-params seconds=30
# schedule the pipeline to run at a specific time
flowerpower schedule my_flow --type date --date-params year=2022,month=1,day=1,hour=0,minute=0,second=0
# schedule the pipeline to run based on a cron 
flowerpower schedule my_flow --type cron --cron-params second=0,minute=0,hour=0,day=1,month=1,day_of_week=0
# schedule the pipeline to run based on a crontab expression
flowerpower schedule my_flow --type cron --crontab "0 0 1 1 0"
```
or in a python script/REPL
```python
from flowerpower.scheduler import schedule, Pipeline

# using the Pipeline class
p = Pipeline("my_flow")
p.schedule("interval", seconds=30)

# or using the schedule function
schedule("my_flow", "interval", seconds=30)
```

### Start a woker
Scheduled flowerpower pipelines or jobs added to the scheduler are executed by a worker. To start a worker, you can use the command line or a python script/REPL.

```shell
flowerpower start-worker
```
or in a python script/REPL
```python
from flowerpower.scheduler import start_worker
start_worker()
```


##### Configure the scheduler/worker
The worker can run in the current process for testing purpose. In this case, no data store or event broker is required. To run the worker in a distributed environment, you need to setup a data store and an event broker.

The scheduler/worker can be configured in the `conf/scheduler.yml` file. 

```yaml
...

data_store:
  type: postgres # postgres is  can be replaced with sqlalchemy
  uri: postgresql+asyncpq://user:password@localhost:5432/flowerpower

# other data store options

# - sqlite
# data_store:
#  type: sqlite # sqlite is can be replaced with sqlalchemy
#  uri: sqlite+aiosqlite:///flowerpower.db

# - mysql
# data_store:
#  type: mysql # mysql is can be replaced with sqlalchemy
#  uri: mysql+aiomysql://user:password@localhost:3306/flowerpower

# - mongodb
# data_store:
#   type: mongodb
#   uri: mongodb://localhost:27017/flowerpower

# - memory
# data_store:
#   type: memory

event_broker:
  type: redis
  uri: redis://localhost:6379
  # or use host and port instead of uri
  # host: localhost
  # port: 6379

# - mqtt
# event_broker:
#   type: mqtt
#   host: localhost
#   port: 1883
#   # optional username and password
#   username: edge
#   password: edge

# - memory
# event_broker:
#   type: memory

...
```

### Track a pipeline
You can track the pipeline execution using the Hamilton UI. To use the Hamilton UI, you need to install the Hamilton UI package or have a run the Hamilton UI server running. See the [Hamilton UI documentation](https://hamilton.dagworks.io/en/latest/hamilton-ui/ui/) for more information.


##### Local Hamilton UI
```shell
pip install "flowerpower[ui]"

# start the Hamilton UI server
flowerpower hamilton-ui
```
This starts the Hamilton UI server on `http://localhost:8241`. 

##### Docker-based Hamilton UI
According to the [Hamilton UI documentation](https://hamilton.dagworks.io/en/latest/hamilton-ui/ui/#install), you can run the Hamilton UI server in a docker container. 
```shell
git clone https://github.com/dagworks-inc/hamilton

cd hamilton/ui

./run.sh
```
This starts the Hamilton UI server on `http://localhost:8242`. 


You need to set your username/email(and optionally an api key) during the first login. Create a new project in the Hamilton UI for each flowerpower pipeline you want to track.

To use the pipeline tracking, you need to configure the tracker in the `conf/tracker.yml` file
```yaml
username: my_email@example.com
api_url: http://localhost:8241
ui_url: http://localhost:8242
api_key: # optional

pipeline:
  my_flow:
    project_id: 1
    tags:
      environment: dev
      version: 1.0
    dag_name: my_flow_123
```

Now you can track the pipeline execution by setting the `with_tracker` parameter in the `conf/pipelines.yml` file to `true` or by using the `--with-tracker` parameter in the command line.




### *Optional: Dev Services*
To test the distributed (seperate workers) pipeline execution in a local environment, you need a data store and an event broker. You can use the following docker-compose file to setup the services.

Download the docker-compose file
```shell
curl -O https://raw.githubusercontent.com/legout/flowerpower/main/docker/docker-compose.yml
```
Start the relevant services
```shell
#  emqx mqtt broker if you want to use mqtt as the event broker
docker-compose up mqtt -d 
# valkey (OSS redis) if you want to use redis as the event broker
docker-compose up redis -d 
# mongodb if you want to use mongodb as the data store
docker-compose up mongodb -d 
 # postgres db if you want to use postgres as data store and/or event broker. 
docker-compose up postgres -d
```


            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "FlowerPower",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.11",
    "maintainer_email": null,
    "keywords": "apscheduler, dask, hamilton, pipeline, ray, scheduler, workflow",
    "author": null,
    "author_email": "\"Volker L.\" <ligno.blades@gmail.com>",
    "download_url": "https://files.pythonhosted.org/packages/02/be/b41fac6df91052f2c677403fb63b8e30f90e1bd18e9b92c78d7055c01b0d/flowerpower-0.5.4.tar.gz",
    "platform": null,
    "description": "<center><h1>FlowerPower</h1></center>\n<center><h2>Simple Workflow Framework - Hamilton + APScheduler = FlowerPower</h2></center>\n\n<center><img src=\"./image.png\" alt=\"Bild\" width=\"600\" height=\"400\" ></center>\n\n## Table of Contents\n1. [Overview](#overview)\n2. [Installation](#installation)\n3. [Usage](#usage)\n    1. [Initialze a new flowerpower project](#initialze-a-new-flowerpower-project)\n    2. [Add a new pipeline](#add-a-new-pipeline)\n    3. [Setup a pipeline](#setup-a-pipeline)\n    4. [Run a pipeline](#run-a-pipeline)\n    5. [Schedule a pipeline](#schedule-a-pipeline)\n    6. [Start a woker](#start-a-woker)\n    7. [Track a pipeline](#track-a-pipeline)\n    8. [Optional: Dev Services](#optional-dev-services)\n\n\n## Overview\nFlowerPower is a simple workflow framework based on the fantastic python libraries [Hamilton](https://github.com/DAGWorks-Inc/hamilton) and [APScheduler (Advanced Python Scheduler)](https://github.com/agronholm/apscheduler) \n\n**[Hamilton](https://github.com/DAGWorks-Inc/hamilton)** is used as the core engine to create Directed Acyclic Graphs (DAGs) from your pipeline functions and execute them in a controlled manner. It is highly recommended to read the [Hamilton documentation](https://hamilton.dagworks.io/en/latest/) and check out their [examples](https://github.com/DAGWorks-Inc/hamilton/examples) to understand the core concepts of FlowerPower.\n\n**[APScheduler](https://github.com/agronholm/apscheduler)** is used to schedule the pipeline execution. You can schedule the pipeline to run at a specific time, at a specific interval or at a specific cron expression. Furthermore, APScheduler can be used to run the pipeline in a distributed environment. \n\n\n#### The main features of the FlowerPower framework are:\n\n1. **Pipeline Workflows**: FlowerPower uses the Hamilton library to create Directed Acyclic Graphs (DAGs) from pipeline functions, allowing you to define and execute complex workflow pipelines.\n\n2. **Scheduling and Execution**: FlowerPower integrates the APScheduler library to allow scheduling of pipeline executions at specific times, intervals, or based on cron expressions. It supports running pipelines in a distributed environment with a data store and event broker.\n\n3. **Parameterization**: FlowerPower allows you to parameterize your pipeline functions, either by setting default values or defining parameters in a configuration file.\n\n4. **Tracking and Monitoring**: FlowerPower can integrate with the Hamilton UI to provide tracking and monitoring of pipeline executions, including visualization of the DAG.\n\n5. **Flexible Configuration**: FlowerPower uses configuration files (`conf/pipelines.yml`, `conf/scheduler.yml`, `conf/tracker.yml`) to set up pipelines, scheduling, and tracking, allowing for easy customization.\n\n6. **Distributed Execution**: FlowerPower supports running pipelines in a distributed environment by using a data store (e.g., PostgreSQL, MongoDB, SQLite) to persist job information and an event broker (e.g., Redis, MQTT) for communication between the scheduler and workers.\n\n7. **Easy Setup and Usage**: FlowerPower provides command-line tools and Python APIs for initializing new projects, adding new pipelines, running and scheduling pipelines, and starting workers.\n\nOverall, FlowerPower aims to provide a simple and flexible workflow framework that combines the power of Hamilton and APScheduler to enable the creation and execution of complex data pipelines, with support for scheduling, distribution, and monitoring.\n\n## Installation\n\n```shell\npip install flowerpower \n# scheduling \npip install \"flowerpower[scheduler]\" \n# mqtt event broker\npip install \"flowerpower[scheduler,mqtt]\" \n# redis event broker\npip install \"flowerpower[scheduler,redis]\" \n# mongodb data store\npip install \"flowerpower[scheduler,mongodb]\"\n# distributed computing using ray\npip install \"flowerpower[scheduler,ray]\"\n# dask distributed computing using dask\npip install \"flowerpower[scheduler,dask]\"\n```\n\n## Usage\n\n### Initialze a new flowerpower project\nIn the command line, run the following command to initialize a new flowerpower project\n```shell\nflowerpower init new-project\ncd new-project\n```\nThis adds basic config files `conf/pipelines.yml`, `conf/scheduler.yml` and `conf/tracker.yml`\n\nor run the following in a python script/REPL\n```python\nfrom flowerpower import init\ninit(\"new-project\")\n```\n\n### Add a new pipeline\nIn the command line run\n```shell\nflowerpower add my_flow\n# or\nflowerpower new my_flow\n```\nA new file `pipelines/my_flow.py` is created and the relevant entries are added to the config files.\n\nor run the following in a python script/REPL\n```python\nfrom flowerpower.pipeline import PipelineManager, new\n\n# using the PipelineManager\npm = PipelineManager()\npm.new(\"my_flow\")\n\n# or using the new function\nnew(\"my_flow\")\n```\n\n### Setup a pipeline\n##### Add pipeline functions\nAdd the pipeline functions to `pipelines/my_flow.py`.\n\nFlowerPower uses [Hamilton](https://github.com/DAGWorks-Inc/hamilton) that converts your pipeline functions into nodes and then creates a [Directed Acyclic Graph (DAG)](https://en.wikipedia.org/wiki/Directed_acyclic_graph).\n\nIt is therefore mandatory to write your pipeline files according to the Hamilton paradigm. You can read more about this in the Hamilton documentaion chapter [Function, Nodes and DataFlow](https://hamilton.dagworks.io/en/latest/concepts/node/)\n\n##### Parameterize the pipeline functions\n\nTo parameterize the pipeline  functions, you can either set reasonable default values in the function definition or define the parameters in the `conf/pipelines.yml` file.\n\n###### 1. Set default values\nSet the default values in the function definition\n```python\n...\n\ndef add_int_col(df: pd.DataFrame, col_name: str=\"foo\", values:str=\"bar\") -> pd.DataFrame:\n    return df.assign(**{col_name: values})\n\n...\n```\n###### 2. Define the parameters in the `conf/pipelines.yml`\nAdd the parameters to the `conf/pipelines.yml` file\n```yaml\n...\n\nparams:\n  my_flow:\n    add_int_col:\n      col_name: foo\n      values: bar\n\n...\n```\nand add the parameterized decorator to the function\n```python\n...\n\n@parameterize(**PARAMS.add_int_col)\ndef add_int_col(df: pd.DataFrame, col_name: str, values:int) -> pd.DataFrame:\n    return df.assign(**{col_name: values})\n\n...\n```\n\n### Run a pipeline\nA flowerpower pipeline can be run in the command line or in a python script/REPL. \n\nWhen using the `run` command or function, the pipeline is executed in the current process. Using the `run-job` command or `run_job` function, the pipeline is executed as an apscheduler job and the job results is returned. The `add-job` command or `add_job`function is used to add a pipeline as an apscheduler job and a `job_id` is returned.This `job_id`, can be used to fetch the job result from the data store. The latter two options are useful when you want to run the pipeline in a distributed environment. Note: To execute these commands, you need to have a data store and an event broker setup and a worker running.\n\nThe \n\nIn the command line\n```shell\n# This runs the pipeline wihtout using the apscheduler data store\nflowerpower run my_flow\n# This runs the pipeline as an apscheduler job\nflowerpower run-job my_flow \n# This runs the pipeline as an apscheduler job using the apscheduler data store to store the job results\nflowerpower add-job my_flow \n```\nIn a python script/REPL\n```python\nfrom flowerpower.pipeline import Pipeline, run, run_job, add_job\n\n# using the Pipeline class\np = Pipeline(\"my_flow\")\nres = p.run()\n\n# or using the run function\nres = run(\"my_flow\")\n\n# or using the run_job function\nres = run_job(\"my_flow\")\n\n# or using the add_job function\njob_id = add_job(\"my_flow\") # job_id can be used to fetch the job result from the data store\n```\n\nThe above runs the pipelines with the default parameters, which are defined in the `conf/pipelines.yml` file. \n\n##### Configure the pipeline run parameters\nYou can configure the pipeline run parameters in the `conf/pipelines.yml` file\n```yaml\n...\n\nrun:\n  my_flow:\n    # dev and prod are the run environments. You can define as many environments as you want\n    dev:\n      # define all the pipeline input parameters here\n      inputs:\n        data_path: path/to/data.csv\n        fs_profile: \n        fs_prtocol: local \n      # here you define the pipeline functions, for which you want to store the final result\n      final_vars: \n        [\n            add_int_col,\n            final_df\n        ]\n      # wheter to use the Hamilton UI tracker or not. Note: Hamilton UI needs to be installed and running\n      with_tracker: false\n    \n    ...\n```\n##### Overwrite the default parameters\nYou can overwrite the default parameters by using the parameters `--inputs` and `--final-vars` in the command line\n```shell\n# to run the pipeline with the parameters defined in the prod environment\nflowerpower run my_flow --inputs data_path=path/to/data.csv,fs_protocol=local --final-vars final_df --enviroment prod\n```\nor in a python script/REPL\n```python\nfrom flowerpower.pipeline import run\nres=run(\n    \"my_flow\", \n    inputs={\"data_path\": \"path/to/data.csv\", \"fs_protocol\": \"local\"}, \n    final_vars=[\"final_df\"],\n    environment=\"prod\"\n)\n```\n\n### Schedule a pipeline\nYou can setup a scheduler for a flowerpower pipeline to run at a specific time, at a specific interval or based on a specific cron expression. Adding a flowerpower pipeline to the scheduler is done in the command line or in a python script/REPL.\n\n```shell\n# schedule the pipeline to run every 30 seconds\nflowerpower schedule my_flow --type interval --interval-params seconds=30\n# schedule the pipeline to run at a specific time\nflowerpower schedule my_flow --type date --date-params year=2022,month=1,day=1,hour=0,minute=0,second=0\n# schedule the pipeline to run based on a cron \nflowerpower schedule my_flow --type cron --cron-params second=0,minute=0,hour=0,day=1,month=1,day_of_week=0\n# schedule the pipeline to run based on a crontab expression\nflowerpower schedule my_flow --type cron --crontab \"0 0 1 1 0\"\n```\nor in a python script/REPL\n```python\nfrom flowerpower.scheduler import schedule, Pipeline\n\n# using the Pipeline class\np = Pipeline(\"my_flow\")\np.schedule(\"interval\", seconds=30)\n\n# or using the schedule function\nschedule(\"my_flow\", \"interval\", seconds=30)\n```\n\n### Start a woker\nScheduled flowerpower pipelines or jobs added to the scheduler are executed by a worker. To start a worker, you can use the command line or a python script/REPL.\n\n```shell\nflowerpower start-worker\n```\nor in a python script/REPL\n```python\nfrom flowerpower.scheduler import start_worker\nstart_worker()\n```\n\n\n##### Configure the scheduler/worker\nThe worker can run in the current process for testing purpose. In this case, no data store or event broker is required. To run the worker in a distributed environment, you need to setup a data store and an event broker.\n\nThe scheduler/worker can be configured in the `conf/scheduler.yml` file. \n\n```yaml\n...\n\ndata_store:\n  type: postgres # postgres is  can be replaced with sqlalchemy\n  uri: postgresql+asyncpq://user:password@localhost:5432/flowerpower\n\n# other data store options\n\n# - sqlite\n# data_store:\n#  type: sqlite # sqlite is can be replaced with sqlalchemy\n#  uri: sqlite+aiosqlite:///flowerpower.db\n\n# - mysql\n# data_store:\n#  type: mysql # mysql is can be replaced with sqlalchemy\n#  uri: mysql+aiomysql://user:password@localhost:3306/flowerpower\n\n# - mongodb\n# data_store:\n#   type: mongodb\n#   uri: mongodb://localhost:27017/flowerpower\n\n# - memory\n# data_store:\n#   type: memory\n\nevent_broker:\n  type: redis\n  uri: redis://localhost:6379\n  # or use host and port instead of uri\n  # host: localhost\n  # port: 6379\n\n# - mqtt\n# event_broker:\n#   type: mqtt\n#   host: localhost\n#   port: 1883\n#   # optional username and password\n#   username: edge\n#   password: edge\n\n# - memory\n# event_broker:\n#   type: memory\n\n...\n```\n\n### Track a pipeline\nYou can track the pipeline execution using the Hamilton UI. To use the Hamilton UI, you need to install the Hamilton UI package or have a run the Hamilton UI server running. See the [Hamilton UI documentation](https://hamilton.dagworks.io/en/latest/hamilton-ui/ui/) for more information.\n\n\n##### Local Hamilton UI\n```shell\npip install \"flowerpower[ui]\"\n\n# start the Hamilton UI server\nflowerpower hamilton-ui\n```\nThis starts the Hamilton UI server on `http://localhost:8241`. \n\n##### Docker-based Hamilton UI\nAccording to the [Hamilton UI documentation](https://hamilton.dagworks.io/en/latest/hamilton-ui/ui/#install), you can run the Hamilton UI server in a docker container. \n```shell\ngit clone https://github.com/dagworks-inc/hamilton\n\ncd hamilton/ui\n\n./run.sh\n```\nThis starts the Hamilton UI server on `http://localhost:8242`. \n\n\nYou need to set your username/email(and optionally an api key) during the first login. Create a new project in the Hamilton UI for each flowerpower pipeline you want to track.\n\nTo use the pipeline tracking, you need to configure the tracker in the `conf/tracker.yml` file\n```yaml\nusername: my_email@example.com\napi_url: http://localhost:8241\nui_url: http://localhost:8242\napi_key: # optional\n\npipeline:\n  my_flow:\n    project_id: 1\n    tags:\n      environment: dev\n      version: 1.0\n    dag_name: my_flow_123\n```\n\nNow you can track the pipeline execution by setting the `with_tracker` parameter in the `conf/pipelines.yml` file to `true` or by using the `--with-tracker` parameter in the command line.\n\n\n\n\n### *Optional: Dev Services*\nTo test the distributed (seperate workers) pipeline execution in a local environment, you need a data store and an event broker. You can use the following docker-compose file to setup the services.\n\nDownload the docker-compose file\n```shell\ncurl -O https://raw.githubusercontent.com/legout/flowerpower/main/docker/docker-compose.yml\n```\nStart the relevant services\n```shell\n#  emqx mqtt broker if you want to use mqtt as the event broker\ndocker-compose up mqtt -d \n# valkey (OSS redis) if you want to use redis as the event broker\ndocker-compose up redis -d \n# mongodb if you want to use mongodb as the data store\ndocker-compose up mongodb -d \n # postgres db if you want to use postgres as data store and/or event broker. \ndocker-compose up postgres -d\n```\n\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "A simple workflow framework. Hamilton + APScheduler = FlowerPower",
    "version": "0.5.4",
    "project_urls": null,
    "split_keywords": [
        "apscheduler",
        " dask",
        " hamilton",
        " pipeline",
        " ray",
        " scheduler",
        " workflow"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "f2c6a5d8f9a3e3dfa1837642945cbc36e3888f8890b62f86d6286394ad758fdd",
                "md5": "954f261975fd021f75d97430108117b2",
                "sha256": "5a967cb7cca2a23bfb6b0881e078c27282da1ac9046b8ee376e877b73654bd08"
            },
            "downloads": -1,
            "filename": "flowerpower-0.5.4-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "954f261975fd021f75d97430108117b2",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.11",
            "size": 25005,
            "upload_time": "2024-09-06T13:01:23",
            "upload_time_iso_8601": "2024-09-06T13:01:23.545066Z",
            "url": "https://files.pythonhosted.org/packages/f2/c6/a5d8f9a3e3dfa1837642945cbc36e3888f8890b62f86d6286394ad758fdd/flowerpower-0.5.4-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "02beb41fac6df91052f2c677403fb63b8e30f90e1bd18e9b92c78d7055c01b0d",
                "md5": "70dde1f6299cbc4fa6cf599403344055",
                "sha256": "b93783a3651243106c0f8d9d77713a44cb8997a7c6c79ac2a5618f1a795eee38"
            },
            "downloads": -1,
            "filename": "flowerpower-0.5.4.tar.gz",
            "has_sig": false,
            "md5_digest": "70dde1f6299cbc4fa6cf599403344055",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.11",
            "size": 2137680,
            "upload_time": "2024-09-06T13:01:24",
            "upload_time_iso_8601": "2024-09-06T13:01:24.784536Z",
            "url": "https://files.pythonhosted.org/packages/02/be/b41fac6df91052f2c677403fb63b8e30f90e1bd18e9b92c78d7055c01b0d/flowerpower-0.5.4.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-09-06 13:01:24",
    "github": false,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "lcname": "flowerpower"
}
        
Elapsed time: 0.34254s