axl-workflows


Nameaxl-workflows JSON
Version 0.2.0 PyPI version JSON
download
home_pageNone
SummaryLightweight framework for building data and ML workflows with class-based Python syntax
upload_time2025-09-02 03:55:55
maintainerNone
docs_urlNone
authorNone
requires_python>=3.10
licenseMIT
keywords argo dagster data kubeflow kubernetes ml workflow
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            <div align="center">
  <img src="docs/assets/axl-slogan.png" alt="AXL Workflows Logo"/>
</div>

[![CI](https://github.com/pedrospinosa/axl-workflows/actions/workflows/ci.yml/badge.svg)](https://github.com/pedrospinosa/axl-workflows/actions/workflows/ci.yml)
[![PyPI](https://img.shields.io/pypi/v/axl-workflows.svg?labelColor=ffffff&color=116aea&logo=pypi&logoColor=595959)](https://pypi.org/project/axl-workflows/)
[![Python](https://img.shields.io/pypi/pyversions/axl-workflows.svg?label=python&labelColor=ffffff&color=116aea&logo=python&logoColor=595959)](https://pypi.org/project/axl-workflows/)

**AXL Workflows (axl)** is a lightweight framework for building **data and ML workflows** with a **class-based Python syntax**.
Build a workflow once, then run it locally or on Argo/Kubeflow:

* **Local runtime** β†’ fast iteration on your machine.
* **Argo Workflows YAML** β†’ run on Kubernetes; compatible with Kubeflow Pipelines (KFP) environments.

**Write once β†’ run anywhere (locally or Argo/Kubeflow in production).**

---

## πŸš€ Quick Start

```bash
# Install
pip install axl-workflows

# Or with uv
uv pip install axl-workflows

# Create your first workflow
axl --help
```

---

## ✨ Key Features

* **Class-based DSL**: Define workflows as Python classes, with steps as methods and a `graph()` to wire them.
* **Simple params**: Treat parameters as a **normal step** that returns a Python object (e.g., a Pydantic model or dict). No special Param/Artifact classes.
* **IO Handlers**: Steps return **plain Python objects**; axl persists/loads them via an `io_handler` (default: **pickle**).

  * Per-step override (`@step(io_handler=...)`)
  * **Input modes**: receive **objects** by default or **file paths** with `input_mode="path"`.
* **Intermediate Representation (IR)**: Backend-agnostic DAG model (nodes, edges, resources, IO metadata).
* **Multiple backends**:

  * **Local runtime** β†’ develop and iterate quickly.
  * **Argo/KFP** β†’ YAML generation for production pipelines.
* **Unified runner image**: One container executes steps locally and in Argo pods.
* **Resource & retry hints**: Declare CPU, memory, caching, retries, and conditions at the step level.
* **CLI tools**: Compile, validate, run locally, or render DAGs.

---

## πŸ“¦ Example Workflow (params as a step, with Pydantic)

```python
# examples/churn_workflow.py
from axl import Workflow, step
from pydantic import BaseModel

# Parameters are just a normal step output (typed with Pydantic for convenience).
class TrainParams(BaseModel):
    seed: int = 42
    input_path: str = "data/raw.csv"

class ChurnTrain(Workflow):
    # Workflow configuration via class attributes
    name = "churn-train"
    image = "ghcr.io/you/axl-runner:0.1.0"
    io_handler = "pickle"

    @step
    def params(self) -> TrainParams:
        # Use defaults here; optionally read from YAML/env if you prefer.
        return TrainParams()

    @step  # default io_handler = pickle
    def preprocess(self, p: TrainParams):
        import pandas as pd
        df = pd.read_csv(p.input_path)
        # ... feature engineering ...
        return df  # persisted via pickle (default)

    @step
    def train(self, features, p: TrainParams):
        from sklearn.ensemble import RandomForestClassifier
        import numpy as np
        y = (features.sum(axis=1) > features.sum(axis=1).median()).astype(int)
        X = features.select_dtypes(include=[np.number]).fillna(0)
        model = RandomForestClassifier(n_estimators=50, random_state=p.seed).fit(X, y)
        return model  # persisted via pickle

    @step
    def evaluate(self, model) -> float:
        # pretend evaluation
        return 0.9123

    def graph(self):
        p = self.params()
        feats = self.preprocess(p)
        model = self.train(feats, p)
        return self.evaluate(model)
```

**Variations**

* Receive a **file path** instead of an object:

  ```python
  from pathlib import Path

  @step(input_mode={"features": "path"})
  def profile(self, features: Path) -> dict:
      return {"bytes": Path(features).stat().st_size}
  ```

* Override the **io handler** (e.g., Parquet for DataFrames):

  ```python
  from axl.io.parquet_io import parquet_io_handler

  @step(io_handler=parquet_io_handler)
  def preprocess(self, p: TrainParams):
      import pandas as pd
      return pd.read_csv(p.input_path)  # saved as .parquet; downstream gets a DataFrame
  ```

---

## πŸ›  CLI

```bash
# Compile to Argo YAML
axl compile -m examples/churn_workflow.py:ChurnTrain --target argo --out churn.yaml

# Compile to Dagster job (Python module output)
axl compile -m examples/churn_workflow.py:ChurnTrain --target dagster --out dagster_job.py

# Run locally
axl run local -m examples/churn_workflow.py:ChurnTrain

# Validate workflow definition
axl validate -m examples/churn_workflow.py:ChurnTrain

# Render DAG graph
axl render -m examples/churn_workflow.py:ChurnTrain --out dag.png
```

---

## πŸ“ Architecture

1. **Authoring Layer**

   * Python DSL: `@step` decorator, `Workflow` base class
   * **Params are a normal step** (often a Pydantic model)
   * **Configuration via class attributes** (name, image, io_handler)
   * IO handled by **io_handlers** (default: pickle)
   * Wire dependencies via `graph()`

2. **IR (Intermediate Representation)**

   * Abstract DAG: nodes, edges, inputs/outputs, resources, retry policies, IO metadata

3. **Compilers**

   * **Argo**: generates Argo Workflow YAML and run at Argo Workflows
   * **Kubeflow**: Compile to pipelines YAML and run it on Kubeflow pipelines

4. **Runtime**

   * Unified runner image (`axl-runner`) executes steps
   * Handles env (via **uv**), IO handler save/load, logging, retries

5. **CLI**

   * Single interface for compile, run, validate, render

---

## πŸ“‚ Project Structure

```
axl/
  core/          # DSL: decorators, base classes, typing
  io/            # io_handlers (pickle default; parquet/npy/torch optional)
  ir/            # Intermediate Representation (nodes, edges, workflows)
  compiler/      # Backend compilers (Argo, Kubeflow)
  runtime/       # Runner container + IO + env setup (uv)
  cli.py         # CLI entrypoint
examples/
  churn_workflow.py
tests/
  test_core.py   # Tests for DSL components
  test_ir.py     # Tests for IR components
pyproject.toml
README.md
```

---

## 🎯 Why AXL Workflows?

* **Local development** is fast and simple.
* **Kubeflow Pipelines/Argo is production-grade** is production‑grade but YAML is verbose and may harder to getting started.
* **axl bridges the gap**:

  * Simple, class-based DSL
  * **Params as a normal step**
  * IO handlers for painless object ↔ file persistence
  * Backend-agnostic IR
  * Compile once, run anywhere

---

## πŸ“„ License

This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "axl-workflows",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.10",
    "maintainer_email": "AXL Workflows Contributors <contributors@axl-workflows.dev>",
    "keywords": "argo, dagster, data, kubeflow, kubernetes, ml, workflow",
    "author": null,
    "author_email": "AXL Workflows Contributors <contributors@axl-workflows.dev>",
    "download_url": "https://files.pythonhosted.org/packages/c4/6c/a214c1d02a4ef379b48a6d27e59c12b1615cebc7207ef0ce43f99b9d8603/axl_workflows-0.2.0.tar.gz",
    "platform": null,
    "description": "<div align=\"center\">\n  <img src=\"docs/assets/axl-slogan.png\" alt=\"AXL Workflows Logo\"/>\n</div>\n\n[![CI](https://github.com/pedrospinosa/axl-workflows/actions/workflows/ci.yml/badge.svg)](https://github.com/pedrospinosa/axl-workflows/actions/workflows/ci.yml)\n[![PyPI](https://img.shields.io/pypi/v/axl-workflows.svg?labelColor=ffffff&color=116aea&logo=pypi&logoColor=595959)](https://pypi.org/project/axl-workflows/)\n[![Python](https://img.shields.io/pypi/pyversions/axl-workflows.svg?label=python&labelColor=ffffff&color=116aea&logo=python&logoColor=595959)](https://pypi.org/project/axl-workflows/)\n\n**AXL Workflows (axl)** is a lightweight framework for building **data and ML workflows** with a **class-based Python syntax**.\nBuild a workflow once, then run it locally or on Argo/Kubeflow:\n\n* **Local runtime** \u2192 fast iteration on your machine.\n* **Argo Workflows YAML** \u2192 run on Kubernetes; compatible with Kubeflow Pipelines (KFP) environments.\n\n**Write once \u2192 run anywhere (locally or Argo/Kubeflow in production).**\n\n---\n\n## \ud83d\ude80 Quick Start\n\n```bash\n# Install\npip install axl-workflows\n\n# Or with uv\nuv pip install axl-workflows\n\n# Create your first workflow\naxl --help\n```\n\n---\n\n## \u2728 Key Features\n\n* **Class-based DSL**: Define workflows as Python classes, with steps as methods and a `graph()` to wire them.\n* **Simple params**: Treat parameters as a **normal step** that returns a Python object (e.g., a Pydantic model or dict). No special Param/Artifact classes.\n* **IO Handlers**: Steps return **plain Python objects**; axl persists/loads them via an `io_handler` (default: **pickle**).\n\n  * Per-step override (`@step(io_handler=...)`)\n  * **Input modes**: receive **objects** by default or **file paths** with `input_mode=\"path\"`.\n* **Intermediate Representation (IR)**: Backend-agnostic DAG model (nodes, edges, resources, IO metadata).\n* **Multiple backends**:\n\n  * **Local runtime** \u2192 develop and iterate quickly.\n  * **Argo/KFP** \u2192 YAML generation for production pipelines.\n* **Unified runner image**: One container executes steps locally and in Argo pods.\n* **Resource & retry hints**: Declare CPU, memory, caching, retries, and conditions at the step level.\n* **CLI tools**: Compile, validate, run locally, or render DAGs.\n\n---\n\n## \ud83d\udce6 Example Workflow (params as a step, with Pydantic)\n\n```python\n# examples/churn_workflow.py\nfrom axl import Workflow, step\nfrom pydantic import BaseModel\n\n# Parameters are just a normal step output (typed with Pydantic for convenience).\nclass TrainParams(BaseModel):\n    seed: int = 42\n    input_path: str = \"data/raw.csv\"\n\nclass ChurnTrain(Workflow):\n    # Workflow configuration via class attributes\n    name = \"churn-train\"\n    image = \"ghcr.io/you/axl-runner:0.1.0\"\n    io_handler = \"pickle\"\n\n    @step\n    def params(self) -> TrainParams:\n        # Use defaults here; optionally read from YAML/env if you prefer.\n        return TrainParams()\n\n    @step  # default io_handler = pickle\n    def preprocess(self, p: TrainParams):\n        import pandas as pd\n        df = pd.read_csv(p.input_path)\n        # ... feature engineering ...\n        return df  # persisted via pickle (default)\n\n    @step\n    def train(self, features, p: TrainParams):\n        from sklearn.ensemble import RandomForestClassifier\n        import numpy as np\n        y = (features.sum(axis=1) > features.sum(axis=1).median()).astype(int)\n        X = features.select_dtypes(include=[np.number]).fillna(0)\n        model = RandomForestClassifier(n_estimators=50, random_state=p.seed).fit(X, y)\n        return model  # persisted via pickle\n\n    @step\n    def evaluate(self, model) -> float:\n        # pretend evaluation\n        return 0.9123\n\n    def graph(self):\n        p = self.params()\n        feats = self.preprocess(p)\n        model = self.train(feats, p)\n        return self.evaluate(model)\n```\n\n**Variations**\n\n* Receive a **file path** instead of an object:\n\n  ```python\n  from pathlib import Path\n\n  @step(input_mode={\"features\": \"path\"})\n  def profile(self, features: Path) -> dict:\n      return {\"bytes\": Path(features).stat().st_size}\n  ```\n\n* Override the **io handler** (e.g., Parquet for DataFrames):\n\n  ```python\n  from axl.io.parquet_io import parquet_io_handler\n\n  @step(io_handler=parquet_io_handler)\n  def preprocess(self, p: TrainParams):\n      import pandas as pd\n      return pd.read_csv(p.input_path)  # saved as .parquet; downstream gets a DataFrame\n  ```\n\n---\n\n## \ud83d\udee0 CLI\n\n```bash\n# Compile to Argo YAML\naxl compile -m examples/churn_workflow.py:ChurnTrain --target argo --out churn.yaml\n\n# Compile to Dagster job (Python module output)\naxl compile -m examples/churn_workflow.py:ChurnTrain --target dagster --out dagster_job.py\n\n# Run locally\naxl run local -m examples/churn_workflow.py:ChurnTrain\n\n# Validate workflow definition\naxl validate -m examples/churn_workflow.py:ChurnTrain\n\n# Render DAG graph\naxl render -m examples/churn_workflow.py:ChurnTrain --out dag.png\n```\n\n---\n\n## \ud83d\udcd0 Architecture\n\n1. **Authoring Layer**\n\n   * Python DSL: `@step` decorator, `Workflow` base class\n   * **Params are a normal step** (often a Pydantic model)\n   * **Configuration via class attributes** (name, image, io_handler)\n   * IO handled by **io_handlers** (default: pickle)\n   * Wire dependencies via `graph()`\n\n2. **IR (Intermediate Representation)**\n\n   * Abstract DAG: nodes, edges, inputs/outputs, resources, retry policies, IO metadata\n\n3. **Compilers**\n\n   * **Argo**: generates Argo Workflow YAML and run at Argo Workflows\n   * **Kubeflow**: Compile to pipelines YAML and run it on Kubeflow pipelines\n\n4. **Runtime**\n\n   * Unified runner image (`axl-runner`) executes steps\n   * Handles env (via **uv**), IO handler save/load, logging, retries\n\n5. **CLI**\n\n   * Single interface for compile, run, validate, render\n\n---\n\n## \ud83d\udcc2 Project Structure\n\n```\naxl/\n  core/          # DSL: decorators, base classes, typing\n  io/            # io_handlers (pickle default; parquet/npy/torch optional)\n  ir/            # Intermediate Representation (nodes, edges, workflows)\n  compiler/      # Backend compilers (Argo, Kubeflow)\n  runtime/       # Runner container + IO + env setup (uv)\n  cli.py         # CLI entrypoint\nexamples/\n  churn_workflow.py\ntests/\n  test_core.py   # Tests for DSL components\n  test_ir.py     # Tests for IR components\npyproject.toml\nREADME.md\n```\n\n---\n\n## \ud83c\udfaf Why AXL Workflows?\n\n* **Local development** is fast and simple.\n* **Kubeflow Pipelines/Argo is production-grade** is production\u2011grade but YAML is verbose and may harder to getting started.\n* **axl bridges the gap**:\n\n  * Simple, class-based DSL\n  * **Params as a normal step**\n  * IO handlers for painless object \u2194 file persistence\n  * Backend-agnostic IR\n  * Compile once, run anywhere\n\n---\n\n## \ud83d\udcc4 License\n\nThis project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Lightweight framework for building data and ML workflows with class-based Python syntax",
    "version": "0.2.0",
    "project_urls": {
        "Bug Tracker": "https://github.com/pedrospinosa/axl-workflows/issues",
        "Changelog": "https://github.com/pedrospinosa/axl-workflows/blob/main/CHANGELOG.md",
        "Documentation": "https://axl-workflows.readthedocs.io/",
        "Homepage": "https://github.com/pedrospinosa/axl-workflows",
        "Release Notes": "https://github.com/pedrospinosa/axl-workflows/releases",
        "Repository": "https://github.com/pedrospinosa/axl-workflows"
    },
    "split_keywords": [
        "argo",
        " dagster",
        " data",
        " kubeflow",
        " kubernetes",
        " ml",
        " workflow"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "c578f68f5fafa11b447104b7dec5c02dd0c02cbfa1d3eea9c6dd912624024fcb",
                "md5": "77232b73d51bf35cf98b4e0a878d9a38",
                "sha256": "8770de0374d17fc3a214a9260f7c905e85622c86845ac0257559517ba495bc40"
            },
            "downloads": -1,
            "filename": "axl_workflows-0.2.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "77232b73d51bf35cf98b4e0a878d9a38",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.10",
            "size": 35225,
            "upload_time": "2025-09-02T03:55:54",
            "upload_time_iso_8601": "2025-09-02T03:55:54.103372Z",
            "url": "https://files.pythonhosted.org/packages/c5/78/f68f5fafa11b447104b7dec5c02dd0c02cbfa1d3eea9c6dd912624024fcb/axl_workflows-0.2.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "c46ca214c1d02a4ef379b48a6d27e59c12b1615cebc7207ef0ce43f99b9d8603",
                "md5": "5c2cac0aee3c0b1b784c08d6dc5edfab",
                "sha256": "f2c8a5a3a8b56f45b102746d4bdafcadcfe5e206d22dcd7d7956759c405cba48"
            },
            "downloads": -1,
            "filename": "axl_workflows-0.2.0.tar.gz",
            "has_sig": false,
            "md5_digest": "5c2cac0aee3c0b1b784c08d6dc5edfab",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.10",
            "size": 50597,
            "upload_time": "2025-09-02T03:55:55",
            "upload_time_iso_8601": "2025-09-02T03:55:55.899709Z",
            "url": "https://files.pythonhosted.org/packages/c4/6c/a214c1d02a4ef379b48a6d27e59c12b1615cebc7207ef0ce43f99b9d8603/axl_workflows-0.2.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-09-02 03:55:55",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "pedrospinosa",
    "github_project": "axl-workflows",
    "github_not_found": true,
    "lcname": "axl-workflows"
}
        
Elapsed time: 1.05773s