axl-workflows


Nameaxl-workflows JSON
Version 0.1.0 PyPI version JSON
download
home_pageNone
SummaryLightweight framework for building data and ML workflows with class-based Python syntax
upload_time2025-08-19 16:14:35
maintainerNone
docs_urlNone
authorNone
requires_python>=3.10
licenseMIT
keywords argo dagster data kubeflow kubernetes ml workflow
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # AXL Workflows (`axl`)

[![CI](https://github.com/axl-workflows/axl/actions/workflows/ci.yml/badge.svg)](https://github.com/axl-workflows/axl/actions/workflows/ci.yml)
[![PyPI](https://img.shields.io/pypi/v/axl-workflows.svg)](https://pypi.org/project/axl-workflows/)
[![Python](https://img.shields.io/pypi/pyversions/axl-workflows.svg)](https://pypi.org/project/axl-workflows/)

**AXL Workflows (axl)** is a lightweight framework for building **data and ML workflows** with a **class-based Python syntax**.
It compiles your workflows into:

* **Dagster jobs** β†’ for **local development**, type checking, and fast iteration.
* **Argo Workflows YAML** β†’ to run on **Kubeflow Pipelines** (KFP) in Kubernetes clusters.

**Write once β†’ run anywhere (Dagster locally or Kubeflow in production).**

---

## πŸš€ Quick Start

```bash
# Install
pip install axl-workflows

# Or with uv
uv pip install axl-workflows

# Create your first workflow
axl --help
```

---

## ✨ Key Features

* **Class-based DSL**: Define workflows as Python classes, with steps as methods and a `graph()` to wire them.
* **Simple params**: Treat parameters as a **normal step** that returns a Python object (e.g., a Pydantic model or dict). No special Param/Artifact classes.
* **IO Handlers**: Steps return **plain Python objects**; axl persists/loads them via an `io_handler` (default: **pickle**).

  * Per-step override (`@step(io_handler=...)`)
  * **Input modes**: receive **objects** by default or **file paths** with `input_mode="path"`.
* **Intermediate Representation (IR)**: Backend-agnostic DAG model (nodes, edges, resources, IO metadata).
* **Multiple backends**:

  * **Dagster** β†’ ops/jobs/resources for local/dev runs.
  * **Argo/KFP** β†’ YAML generation for production pipelines.
* **Unified runner image**: One container executes steps in both Dagster and Argo pods (uses **uv** for fast, reproducible envs).
* **Resource & retry hints**: Declare CPU, memory, caching, retries, and conditions at the step level.
* **CLI tools**: Compile, validate, run locally, or render DAGs.

---

## πŸ“¦ Example Workflow (params as a step, with Pydantic)

```python
# examples/churn_workflow.py
from axl import workflow, step
from pydantic import BaseModel

# Parameters are just a normal step output (typed with Pydantic for convenience).
class TrainParams(BaseModel):
    seed: int = 42
    input_path: str = "data/raw.csv"

@workflow(name="churn-train", image="ghcr.io/you/axl-runner:0.1.0")
class ChurnTrain:

    @step
    def params(self) -> TrainParams:
        # Use defaults here; optionally read from YAML/env if you prefer.
        return TrainParams()

    @step  # default io_handler = pickle
    def preprocess(self, p: TrainParams):
        import pandas as pd
        df = pd.read_csv(p.input_path)
        # ... feature engineering ...
        return df  # persisted via pickle (default)

    @step
    def train(self, features, p: TrainParams):
        from sklearn.ensemble import RandomForestClassifier
        import numpy as np
        y = (features.sum(axis=1) > features.sum(axis=1).median()).astype(int)
        X = features.select_dtypes(include=[np.number]).fillna(0)
        model = RandomForestClassifier(n_estimators=50, random_state=p.seed).fit(X, y)
        return model  # persisted via pickle

    @step
    def evaluate(self, model) -> float:
        # pretend evaluation
        return 0.9123

    def graph(self):
        p = self.params()
        feats = self.preprocess(p)
        model = self.train(feats, p)
        return self.evaluate(model)
```

**Variations**

* Receive a **file path** instead of an object:

  ```python
  from pathlib import Path

  @step(input_mode={"features": "path"})
  def profile(self, features: Path) -> dict:
      return {"bytes": Path(features).stat().st_size}
  ```

* Override the **io handler** (e.g., Parquet for DataFrames):

  ```python
  from axl.io.parquet_io import parquet_io_handler

  @step(io_handler=parquet_io_handler)
  def preprocess(self, p: TrainParams):
      import pandas as pd
      return pd.read_csv(p.input_path)  # saved as .parquet; downstream gets a DataFrame
  ```

---

## πŸ›  CLI

```bash
# Compile to Argo YAML
axl compile -m examples/churn_workflow.py:ChurnTrain --target argo --out churn.yaml

# Compile to Dagster job (Python module output)
axl compile -m examples/churn_workflow.py:ChurnTrain --target dagster --out dagster_job.py

# Run locally
axl run local -m examples/churn_workflow.py:ChurnTrain

# (Optional) Provide params to your own params() step via file/env if you implement that logic
# axl run local -m examples/churn_workflow.py:ChurnTrain --params params.yaml

# Validate workflow definition
axl validate -m examples/churn_workflow.py:ChurnTrain

# Render DAG graph
axl render -m examples/churn_workflow.py:ChurnTrain --out dag.png
```

---

## πŸ“ Architecture

1. **Authoring Layer**

   * Python DSL: `@workflow`, `@step`
   * **Params are a normal step** (often a Pydantic model)
   * IO handled by **io\_handlers** (default: pickle)
   * Wire dependencies via `graph()`

2. **IR (Intermediate Representation)**

   * Abstract DAG: nodes, edges, inputs/outputs, resources, retry policies, IO metadata

3. **Compilers**

   * **DagsterBackend**: generates ops, jobs, resources for dev
   * **ArgoBackend**: generates Argo Workflow YAML (KFP-compatible)

4. **Runtime**

   * Unified runner image (`axl-runner`) executes steps
   * Handles env (via **uv**), IO handler save/load, logging, retries

5. **CLI**

   * Single interface for compile, run, validate, render

---

## πŸ“‚ Project Structure

```
axl/
  core/          # DSL: decorators, base classes, typing
  io/            # io_handlers (pickle default; parquet/npy/torch optional)
  ir/            # Intermediate Representation (nodes, edges, workflows)
  compiler/      # Backend compilers (Argo, Dagster)
  runtime/       # Runner container + IO + env setup (uv)
  cli.py         # CLI entrypoint
examples/
  churn_workflow.py
tests/
  test_core.py   # Tests for DSL components
  test_ir.py     # Tests for IR components
pyproject.toml
README.md
```

---

## πŸš€ Roadmap

* **v0.1 (MVP)**

  * DSL for workflows & steps
  * IR builder
  * Argo compiler (PVC artifacts, retries, resources)
  * Runner container (PVC support)
  * CLI: compile, run local

* **v0.2**

  * Dagster compiler (ops/jobs)
  * S3/MinIO artifact support
  * Simple caching
  * Graph rendering

* **v0.3**

  * Conditional execution (`when`)
  * Metrics export (Prometheus, Argo/KFP UI)
  * KFP-specific features (experiments, parameters UI)

---

## 🎯 Why AXL Workflows?

* **Dagster is great for dev** but doesn’t natively compile to Argo/KFP.
* **Kubeflow Pipelines is production-grade** but YAML is verbose and hard to maintain.
* **axl bridges the gap**:

  * Simple, class-based DSL
  * **Params as a normal step** (optional Pydantic)
  * IO handlers for painless object ↔ file persistence
  * Backend-agnostic IR
  * Compile once, run anywhere

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "axl-workflows",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.10",
    "maintainer_email": "AXL Workflows Contributors <contributors@axl-workflows.dev>",
    "keywords": "argo, dagster, data, kubeflow, kubernetes, ml, workflow",
    "author": null,
    "author_email": "AXL Workflows Contributors <contributors@axl-workflows.dev>",
    "download_url": "https://files.pythonhosted.org/packages/6d/5d/6b5451c98dd17c1431c84b942dfd54354b925e7422bd2f3209f2ad318d68/axl_workflows-0.1.0.tar.gz",
    "platform": null,
    "description": "# AXL Workflows (`axl`)\n\n[![CI](https://github.com/axl-workflows/axl/actions/workflows/ci.yml/badge.svg)](https://github.com/axl-workflows/axl/actions/workflows/ci.yml)\n[![PyPI](https://img.shields.io/pypi/v/axl-workflows.svg)](https://pypi.org/project/axl-workflows/)\n[![Python](https://img.shields.io/pypi/pyversions/axl-workflows.svg)](https://pypi.org/project/axl-workflows/)\n\n**AXL Workflows (axl)** is a lightweight framework for building **data and ML workflows** with a **class-based Python syntax**.\nIt compiles your workflows into:\n\n* **Dagster jobs** \u2192 for **local development**, type checking, and fast iteration.\n* **Argo Workflows YAML** \u2192 to run on **Kubeflow Pipelines** (KFP) in Kubernetes clusters.\n\n**Write once \u2192 run anywhere (Dagster locally or Kubeflow in production).**\n\n---\n\n## \ud83d\ude80 Quick Start\n\n```bash\n# Install\npip install axl-workflows\n\n# Or with uv\nuv pip install axl-workflows\n\n# Create your first workflow\naxl --help\n```\n\n---\n\n## \u2728 Key Features\n\n* **Class-based DSL**: Define workflows as Python classes, with steps as methods and a `graph()` to wire them.\n* **Simple params**: Treat parameters as a **normal step** that returns a Python object (e.g., a Pydantic model or dict). No special Param/Artifact classes.\n* **IO Handlers**: Steps return **plain Python objects**; axl persists/loads them via an `io_handler` (default: **pickle**).\n\n  * Per-step override (`@step(io_handler=...)`)\n  * **Input modes**: receive **objects** by default or **file paths** with `input_mode=\"path\"`.\n* **Intermediate Representation (IR)**: Backend-agnostic DAG model (nodes, edges, resources, IO metadata).\n* **Multiple backends**:\n\n  * **Dagster** \u2192 ops/jobs/resources for local/dev runs.\n  * **Argo/KFP** \u2192 YAML generation for production pipelines.\n* **Unified runner image**: One container executes steps in both Dagster and Argo pods (uses **uv** for fast, reproducible envs).\n* **Resource & retry hints**: Declare CPU, memory, caching, retries, and conditions at the step level.\n* **CLI tools**: Compile, validate, run locally, or render DAGs.\n\n---\n\n## \ud83d\udce6 Example Workflow (params as a step, with Pydantic)\n\n```python\n# examples/churn_workflow.py\nfrom axl import workflow, step\nfrom pydantic import BaseModel\n\n# Parameters are just a normal step output (typed with Pydantic for convenience).\nclass TrainParams(BaseModel):\n    seed: int = 42\n    input_path: str = \"data/raw.csv\"\n\n@workflow(name=\"churn-train\", image=\"ghcr.io/you/axl-runner:0.1.0\")\nclass ChurnTrain:\n\n    @step\n    def params(self) -> TrainParams:\n        # Use defaults here; optionally read from YAML/env if you prefer.\n        return TrainParams()\n\n    @step  # default io_handler = pickle\n    def preprocess(self, p: TrainParams):\n        import pandas as pd\n        df = pd.read_csv(p.input_path)\n        # ... feature engineering ...\n        return df  # persisted via pickle (default)\n\n    @step\n    def train(self, features, p: TrainParams):\n        from sklearn.ensemble import RandomForestClassifier\n        import numpy as np\n        y = (features.sum(axis=1) > features.sum(axis=1).median()).astype(int)\n        X = features.select_dtypes(include=[np.number]).fillna(0)\n        model = RandomForestClassifier(n_estimators=50, random_state=p.seed).fit(X, y)\n        return model  # persisted via pickle\n\n    @step\n    def evaluate(self, model) -> float:\n        # pretend evaluation\n        return 0.9123\n\n    def graph(self):\n        p = self.params()\n        feats = self.preprocess(p)\n        model = self.train(feats, p)\n        return self.evaluate(model)\n```\n\n**Variations**\n\n* Receive a **file path** instead of an object:\n\n  ```python\n  from pathlib import Path\n\n  @step(input_mode={\"features\": \"path\"})\n  def profile(self, features: Path) -> dict:\n      return {\"bytes\": Path(features).stat().st_size}\n  ```\n\n* Override the **io handler** (e.g., Parquet for DataFrames):\n\n  ```python\n  from axl.io.parquet_io import parquet_io_handler\n\n  @step(io_handler=parquet_io_handler)\n  def preprocess(self, p: TrainParams):\n      import pandas as pd\n      return pd.read_csv(p.input_path)  # saved as .parquet; downstream gets a DataFrame\n  ```\n\n---\n\n## \ud83d\udee0 CLI\n\n```bash\n# Compile to Argo YAML\naxl compile -m examples/churn_workflow.py:ChurnTrain --target argo --out churn.yaml\n\n# Compile to Dagster job (Python module output)\naxl compile -m examples/churn_workflow.py:ChurnTrain --target dagster --out dagster_job.py\n\n# Run locally\naxl run local -m examples/churn_workflow.py:ChurnTrain\n\n# (Optional) Provide params to your own params() step via file/env if you implement that logic\n# axl run local -m examples/churn_workflow.py:ChurnTrain --params params.yaml\n\n# Validate workflow definition\naxl validate -m examples/churn_workflow.py:ChurnTrain\n\n# Render DAG graph\naxl render -m examples/churn_workflow.py:ChurnTrain --out dag.png\n```\n\n---\n\n## \ud83d\udcd0 Architecture\n\n1. **Authoring Layer**\n\n   * Python DSL: `@workflow`, `@step`\n   * **Params are a normal step** (often a Pydantic model)\n   * IO handled by **io\\_handlers** (default: pickle)\n   * Wire dependencies via `graph()`\n\n2. **IR (Intermediate Representation)**\n\n   * Abstract DAG: nodes, edges, inputs/outputs, resources, retry policies, IO metadata\n\n3. **Compilers**\n\n   * **DagsterBackend**: generates ops, jobs, resources for dev\n   * **ArgoBackend**: generates Argo Workflow YAML (KFP-compatible)\n\n4. **Runtime**\n\n   * Unified runner image (`axl-runner`) executes steps\n   * Handles env (via **uv**), IO handler save/load, logging, retries\n\n5. **CLI**\n\n   * Single interface for compile, run, validate, render\n\n---\n\n## \ud83d\udcc2 Project Structure\n\n```\naxl/\n  core/          # DSL: decorators, base classes, typing\n  io/            # io_handlers (pickle default; parquet/npy/torch optional)\n  ir/            # Intermediate Representation (nodes, edges, workflows)\n  compiler/      # Backend compilers (Argo, Dagster)\n  runtime/       # Runner container + IO + env setup (uv)\n  cli.py         # CLI entrypoint\nexamples/\n  churn_workflow.py\ntests/\n  test_core.py   # Tests for DSL components\n  test_ir.py     # Tests for IR components\npyproject.toml\nREADME.md\n```\n\n---\n\n## \ud83d\ude80 Roadmap\n\n* **v0.1 (MVP)**\n\n  * DSL for workflows & steps\n  * IR builder\n  * Argo compiler (PVC artifacts, retries, resources)\n  * Runner container (PVC support)\n  * CLI: compile, run local\n\n* **v0.2**\n\n  * Dagster compiler (ops/jobs)\n  * S3/MinIO artifact support\n  * Simple caching\n  * Graph rendering\n\n* **v0.3**\n\n  * Conditional execution (`when`)\n  * Metrics export (Prometheus, Argo/KFP UI)\n  * KFP-specific features (experiments, parameters UI)\n\n---\n\n## \ud83c\udfaf Why AXL Workflows?\n\n* **Dagster is great for dev** but doesn\u2019t natively compile to Argo/KFP.\n* **Kubeflow Pipelines is production-grade** but YAML is verbose and hard to maintain.\n* **axl bridges the gap**:\n\n  * Simple, class-based DSL\n  * **Params as a normal step** (optional Pydantic)\n  * IO handlers for painless object \u2194 file persistence\n  * Backend-agnostic IR\n  * Compile once, run anywhere\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Lightweight framework for building data and ML workflows with class-based Python syntax",
    "version": "0.1.0",
    "project_urls": {
        "Bug Tracker": "https://github.com/axl-workflows/axl/issues",
        "Changelog": "https://github.com/axl-workflows/axl/blob/main/CHANGELOG.md",
        "Documentation": "https://axl-workflows.readthedocs.io/",
        "Homepage": "https://github.com/axl-workflows/axl",
        "Repository": "https://github.com/axl-workflows/axl"
    },
    "split_keywords": [
        "argo",
        " dagster",
        " data",
        " kubeflow",
        " kubernetes",
        " ml",
        " workflow"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "ab28d668712cb976da787def5e5bf67b9aa716d5b5b3b5cd761b00cb5c22a56c",
                "md5": "7a82c1a188e1af72a13c5fce25345f23",
                "sha256": "d6f873b4fea44ea1273ca1aaeaad62f81c7c6cef65beb6b1a43acbc110146c8a"
            },
            "downloads": -1,
            "filename": "axl_workflows-0.1.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "7a82c1a188e1af72a13c5fce25345f23",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.10",
            "size": 31243,
            "upload_time": "2025-08-19T16:14:34",
            "upload_time_iso_8601": "2025-08-19T16:14:34.711570Z",
            "url": "https://files.pythonhosted.org/packages/ab/28/d668712cb976da787def5e5bf67b9aa716d5b5b3b5cd761b00cb5c22a56c/axl_workflows-0.1.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "6d5d6b5451c98dd17c1431c84b942dfd54354b925e7422bd2f3209f2ad318d68",
                "md5": "44cea4dc3a118edcdd2fe0cf704c073c",
                "sha256": "402820d997a227184fe6e1cb6a436210821d408d86233b02fc3dd8b5ab5e3a76"
            },
            "downloads": -1,
            "filename": "axl_workflows-0.1.0.tar.gz",
            "has_sig": false,
            "md5_digest": "44cea4dc3a118edcdd2fe0cf704c073c",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.10",
            "size": 37864,
            "upload_time": "2025-08-19T16:14:35",
            "upload_time_iso_8601": "2025-08-19T16:14:35.892367Z",
            "url": "https://files.pythonhosted.org/packages/6d/5d/6b5451c98dd17c1431c84b942dfd54354b925e7422bd2f3209f2ad318d68/axl_workflows-0.1.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-08-19 16:14:35",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "axl-workflows",
    "github_project": "axl",
    "github_not_found": true,
    "lcname": "axl-workflows"
}
        
Elapsed time: 1.26290s