marsh-lib


Namemarsh-lib JSON
Version 0.2.0 PyPI version JSON
download
home_pagehttps://github.com/CedricAnover/marsh
SummaryLightweight, extensible Python library for building, managing, and executing command workflows.
upload_time2025-01-14 09:11:24
maintainerNone
docs_urlNone
authorCedric Anover
requires_python<3.13,>=3.10
licenseMIT
keywords marsh marsh-lib
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Marsh

**Marsh** is a lightweight Python library for building, managing, and executing command workflows. It allows chaining
commands, defining custom pre/post processing logic, creating DAG workflows, and structuring flexible CLI workflows.
With support for local, remote, Docker-based, and Python execution, Marsh simplifies automating pipelines and
integrating external processes.

---

## Installation

Install Marsh via pip:
```text
pip install marsh-lib
```

---

## Key Features

- **Command Chains:** Chain multiple commands into reusable workflows.
- **Pre/Post Processors:** Add validation, logging, or error handling without modifying data.
- **Pre/Post Modifiers:** Transform input/output data during command execution.
- **Execution Options:** Local, Remote, Docker, Python, and custom runners.
- **DAG Workflows:** Support for DAG to define and run task dependencies.

---

## Quick Start

### Workflow with `Conveyor`

```python
from marsh import Conveyor

def cmd_1(stdout, stderr): return stdout.upper(), stderr
def cmd_2(stdout, stderr): return stdout, stderr.lower()

# Chain commands with Conveyor
conveyor = Conveyor().add_cmd_runner(cmd_1).add_cmd_runner(cmd_2)
stdout, stderr = conveyor(b"input", b"ERROR")
print(stdout, stderr)
```

**Output:**  
```text
INPUT error
```

---

### Pre/Post Processors: Validating or Logging Data

**Processors** perform actions (e.g., logging, validation) without modifying data.

```python
from marsh import CmdRunDecorator

def validate(stdout, stderr): assert not stderr.strip()      # Validate no errors
def log(stdout, stderr): print(f"LOG: {stdout.decode()}")    # Log output

decorator = CmdRunDecorator()\
  .add_processor(validate, before=True)\
  .add_processor(log, before=False)

def cmd_runner(stdout, stderr): return stdout, stderr
decorated_runner = decorator.decorate(cmd_runner)
stdout, stderr = decorated_runner(b"Hello", b"")
```

**Output:**  
```text
LOG: Hello
```

---

### Pre/Post Modifiers: Transforming Data

**Modifiers** transform the data before or after a command runs. Unlike processors, modifiers must return `(stdout, stderr)`.

```python
def to_upper(stdout, stderr): return stdout.upper(), stderr
def add_prefix(stdout, stderr): return b"Prefix: " + stdout, stderr

decorator = CmdRunDecorator()\
  .add_mod_processor(to_upper, before=True)\
  .add_mod_processor(add_prefix, before=False)

def cmd_runner(stdout, stderr): return stdout, stderr
decorated_runner = decorator.decorate(cmd_runner)
stdout, stderr = decorated_runner(b"hello", b"")
print(stdout.decode())
```

**Output:**  
```text
Prefix: HELLO
```

**Key Difference:**  
- **Processors** act on data without altering it.  
- **Modifiers** transform the data and return new values.

⚠️ **IMPORTANT:**  
> **Order of Evaluation for Processors and Modifiers**
> 
> 1. **Pre-Modifiers**  
> 2. **Pre-Processors**  
> 3. **Command Runner**  
> 4. **Post-Modifiers**  
> 5. **Post-Processors**  

---

### Passing `CmdRunDecorator` instance as parameter for reusabiliity

```python
from marsh import Conveyor, CmdRunDecorator

decorator = CmdRunDecorator().add_processor(...).add_mod_processor(...)
conveyor = Conveyor().add_cmd_runner(cmd_runner, cmd_runner_decorator=decorator)
stdout, stderr = conveyor()
```

---

### Using `@add_processors_and_modifiers` for decorating command runners

```python
from marsh import add_processors_and_modifiers


@add_processors_and_modifiers(
    ("mod", True, pre_mod_func, arg_tuple, kwg_dict),      # Pre-Modifier
    ("proc", True, pre_proc_func, arg_tuple, kwg_dict),    # Pre-Processor
    ("mod", False, post_mod_func, arg_tuple, kwg_dict),    # Post-Modifier
    ("proc", False, post_proc_func, arg_tuple, kwg_dict),  # Post-Processor
)
def cmd_runner(x_stdout: bytes, x_stderr: bytes):
    ...
    return b"stdout", b"stderr"
```

---

### Running Local Commands with `BashFactory`

#### Simple Local Command
```python
from marsh.bash import BashFactory

bash = BashFactory()
cmd = bash.create_cmd_runner(r'echo "Hello, $NAME"', env={"NAME": "World"})
stdout, stderr = cmd(b"", b"")
```

#### Examples with `BashFactory`
```python
from pathlib import Path
from marsh.bash import BashFactory

bash = BashFactory()

# Inject Environment Variables
cmd1 = bash.create_cmd_runner(
    r'echo "($ENV_VAR_1, $ENV_VAR_2)"',
    env={
        "ENV_VAR_1": "value1",
        "ENV_VAR_2": "value2"
    }
)

# Change Working Directory
cmd2 = bash.create_cmd_runner(r'echo "CWD: $PWD"', cwd=str(Path.cwd().parent))

# Unix Pipes
cmd3 = bash.create_cmd_runner(r'echo -e "Line1\nLine2\nLine3"')
cmd4 = bash.create_cmd_runner(r'grep 2 | sort', executor_kwargs={"pipe_prev_stdout": True})

# Python Command
cmd5 = bash.create_cmd_runner(r'python -c "print(\"Hello Python\")"')

# Custom Callback
import subprocess
def custom_callback(popen: subprocess.Popen, stdout, stderr):
    return popen.communicate(input=b"Custom Input")
cmd6 = bash.create_cmd_runner(r'xargs echo', callback=custom_callback)

# Combine in Conveyor
from marsh import Conveyor
conveyor = Conveyor()\
    .add_cmd_runner(cmd1)\
    .add_cmd_runner(cmd2)\
    .add_cmd_runner(cmd3)\
    .add_cmd_runner(cmd4)\
    .add_cmd_runner(cmd5)\
    .add_cmd_runner(cmd6)

stdout, stderr = conveyor()
```

---

### Running Remote Commands with `SshFactory`

```python
from marsh import Conveyor
from marsh.ssh import SshFactory

ssh = SshFactory(("user@host:port",), {"connect_kwargs": {"password": "the_ssh_password"}})
cmd1 = ssh.create_cmd_runner("echo Hello, Remote World")
cmd2 = ssh.create_chained_cmd_runner(["echo Hi", "echo there"])
conveyor = Conveyor().add_cmd_runner(cmd1).add_cmd_runner(cmd2)
stdout, stderr = conveyor()
```

---

### Running Commands with `DockerCommandExecutor`

```python
from marsh.docker.docker_executor import DockerCommandExecutor

docker_executor = DockerCommandExecutor("bash:latest", ...)

stdout, stderr = docker_executor.run(
    b"x_stdout", b"x_stderr",
    environment=dict(ENV_VAR_1="value1", ENV_VAR_2="value2"),
    workdir="/app"
)
```

---

### Running Commands with `PythonExecutor`

#### `eval` mode for evaluating python expressions

```python
from marsh import PythonExecutor

py_code = """x + y"""     # Python Evaluatable Expression

python_executor = PythonExecutor(
    py_code,
    mode="eval",
    namespace=dict(x=1, y=2),
    use_pickle=False,
)

stdout, stderr = python_executor.run(b"x_stdout", b"x_stderr", ...)

```

#### `exec` mode for executing python statements

```python
from marsh import PythonExecutor

py_code = """
import os
import sys

prev_stdout = x_stdout    #<-- Use `x_stdout` to get the previous STDOUT
prev_stderr = x_stderr    #<-- Use `x_stderr` to get the previous STDERR
exec_result = x + y       #<-- Use `exec_result` for storing results and passing to STDOUT
"""

python_executor = PythonExecutor(
    py_code,
    mode="exec",
    namespace=dict(x=1, y=2),
    use_pickle=False,
)

stdout, stderr = python_executor.run(b"x_stdout", b"x_stderr", ...)

```

**Note:** `eval` mode also have access to `x_stdout` and `x_stderr` but not `exec_result`.

---

### DAG Workflow

The DAG extends the capabilities of the core components by allowing non-linear dependencies between tasks.

The DAG subpackage has two main components: `Node` and `Dag`. The `Node` encapsulates a `Conveyor` that represents a
_task_ in the workflow, while the `Dag` represents the whole workflow and task dependencies.

Note that the `Dag` manages `Startable` objects, which is the abstract base class for both `Node` and `Dag`. This means
that a `Dag` can contain both `Node` objects and other `Dag` objects.

**Different kinds of `Dag`:**

- `SyncDag`
- `AsyncDag`
- `ThreadDag`
- `ThreadPoolDag`
- `MultiprocessDag`
- `ProcessPoolDag`

#### Defining Nodes

```python
from marsh import Conveyor
from marsh.dag import Node

conveyor = Conveyor().add_cmd_runner(cmd_runner, ...)
node = Node("node_name", conveyor, **run_kwargs)
```

#### Defining and Running a Dag

```python
from marsh.dag import SyncDag

dag = SyncDag("dag_name")

# Register Nodes
dag.do(node_a)
dag.do(node_a).then(node_b, node_c)       # A --> {B, C}
dag.do(node_a).when(node_b, node_c)       # {B, C} --> A
dag.do(other_dag).then(node_a)            # Register other Dag
...

result_dict = dag.start()                 # Run the Dag
result = result_dict["node_or_dag_name"]  # Get result from individual startables
```

⚠️ **IMPORTANT:**

- `MultiprocessDag` and `ProcessPoolDag` requires the `start()` method to run in scope of `if __name__ == "__main__"`.
  ```python
  from marsh.dag import MultiprocessDag, ProcessPoolDag
  
  ...
  
  if __name__ == "__main__":
      ...
      dag.start()
      ...
  ```
- As of the latest version, marsh DAG does not support **_result passing_** between task dependencies.

---

## License

This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/CedricAnover/marsh",
    "name": "marsh-lib",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<3.13,>=3.10",
    "maintainer_email": null,
    "keywords": "marsh, marsh-lib",
    "author": "Cedric Anover",
    "author_email": "cedric.anover@hotmail.com",
    "download_url": null,
    "platform": null,
    "description": "# Marsh\n\n**Marsh** is a lightweight Python library for building, managing, and executing command workflows. It allows chaining\ncommands, defining custom pre/post processing logic, creating DAG workflows, and structuring flexible CLI workflows.\nWith support for local, remote, Docker-based, and Python execution, Marsh simplifies automating pipelines and\nintegrating external processes.\n\n---\n\n## Installation\n\nInstall Marsh via pip:\n```text\npip install marsh-lib\n```\n\n---\n\n## Key Features\n\n- **Command Chains:** Chain multiple commands into reusable workflows.\n- **Pre/Post Processors:** Add validation, logging, or error handling without modifying data.\n- **Pre/Post Modifiers:** Transform input/output data during command execution.\n- **Execution Options:** Local, Remote, Docker, Python, and custom runners.\n- **DAG Workflows:** Support for DAG to define and run task dependencies.\n\n---\n\n## Quick Start\n\n### Workflow with `Conveyor`\n\n```python\nfrom marsh import Conveyor\n\ndef cmd_1(stdout, stderr): return stdout.upper(), stderr\ndef cmd_2(stdout, stderr): return stdout, stderr.lower()\n\n# Chain commands with Conveyor\nconveyor = Conveyor().add_cmd_runner(cmd_1).add_cmd_runner(cmd_2)\nstdout, stderr = conveyor(b\"input\", b\"ERROR\")\nprint(stdout, stderr)\n```\n\n**Output:**  \n```text\nINPUT error\n```\n\n---\n\n### Pre/Post Processors: Validating or Logging Data\n\n**Processors** perform actions (e.g., logging, validation) without modifying data.\n\n```python\nfrom marsh import CmdRunDecorator\n\ndef validate(stdout, stderr): assert not stderr.strip()      # Validate no errors\ndef log(stdout, stderr): print(f\"LOG: {stdout.decode()}\")    # Log output\n\ndecorator = CmdRunDecorator()\\\n  .add_processor(validate, before=True)\\\n  .add_processor(log, before=False)\n\ndef cmd_runner(stdout, stderr): return stdout, stderr\ndecorated_runner = decorator.decorate(cmd_runner)\nstdout, stderr = decorated_runner(b\"Hello\", b\"\")\n```\n\n**Output:**  \n```text\nLOG: Hello\n```\n\n---\n\n### Pre/Post Modifiers: Transforming Data\n\n**Modifiers** transform the data before or after a command runs. Unlike processors, modifiers must return `(stdout, stderr)`.\n\n```python\ndef to_upper(stdout, stderr): return stdout.upper(), stderr\ndef add_prefix(stdout, stderr): return b\"Prefix: \" + stdout, stderr\n\ndecorator = CmdRunDecorator()\\\n  .add_mod_processor(to_upper, before=True)\\\n  .add_mod_processor(add_prefix, before=False)\n\ndef cmd_runner(stdout, stderr): return stdout, stderr\ndecorated_runner = decorator.decorate(cmd_runner)\nstdout, stderr = decorated_runner(b\"hello\", b\"\")\nprint(stdout.decode())\n```\n\n**Output:**  \n```text\nPrefix: HELLO\n```\n\n**Key Difference:**  \n- **Processors** act on data without altering it.  \n- **Modifiers** transform the data and return new values.\n\n\u26a0\ufe0f **IMPORTANT:**  \n> **Order of Evaluation for Processors and Modifiers**\n> \n> 1. **Pre-Modifiers**  \n> 2. **Pre-Processors**  \n> 3. **Command Runner**  \n> 4. **Post-Modifiers**  \n> 5. **Post-Processors**  \n\n---\n\n### Passing `CmdRunDecorator` instance as parameter for reusabiliity\n\n```python\nfrom marsh import Conveyor, CmdRunDecorator\n\ndecorator = CmdRunDecorator().add_processor(...).add_mod_processor(...)\nconveyor = Conveyor().add_cmd_runner(cmd_runner, cmd_runner_decorator=decorator)\nstdout, stderr = conveyor()\n```\n\n---\n\n### Using `@add_processors_and_modifiers` for decorating command runners\n\n```python\nfrom marsh import add_processors_and_modifiers\n\n\n@add_processors_and_modifiers(\n    (\"mod\", True, pre_mod_func, arg_tuple, kwg_dict),      # Pre-Modifier\n    (\"proc\", True, pre_proc_func, arg_tuple, kwg_dict),    # Pre-Processor\n    (\"mod\", False, post_mod_func, arg_tuple, kwg_dict),    # Post-Modifier\n    (\"proc\", False, post_proc_func, arg_tuple, kwg_dict),  # Post-Processor\n)\ndef cmd_runner(x_stdout: bytes, x_stderr: bytes):\n    ...\n    return b\"stdout\", b\"stderr\"\n```\n\n---\n\n### Running Local Commands with `BashFactory`\n\n#### Simple Local Command\n```python\nfrom marsh.bash import BashFactory\n\nbash = BashFactory()\ncmd = bash.create_cmd_runner(r'echo \"Hello, $NAME\"', env={\"NAME\": \"World\"})\nstdout, stderr = cmd(b\"\", b\"\")\n```\n\n#### Examples with `BashFactory`\n```python\nfrom pathlib import Path\nfrom marsh.bash import BashFactory\n\nbash = BashFactory()\n\n# Inject Environment Variables\ncmd1 = bash.create_cmd_runner(\n    r'echo \"($ENV_VAR_1, $ENV_VAR_2)\"',\n    env={\n        \"ENV_VAR_1\": \"value1\",\n        \"ENV_VAR_2\": \"value2\"\n    }\n)\n\n# Change Working Directory\ncmd2 = bash.create_cmd_runner(r'echo \"CWD: $PWD\"', cwd=str(Path.cwd().parent))\n\n# Unix Pipes\ncmd3 = bash.create_cmd_runner(r'echo -e \"Line1\\nLine2\\nLine3\"')\ncmd4 = bash.create_cmd_runner(r'grep 2 | sort', executor_kwargs={\"pipe_prev_stdout\": True})\n\n# Python Command\ncmd5 = bash.create_cmd_runner(r'python -c \"print(\\\"Hello Python\\\")\"')\n\n# Custom Callback\nimport subprocess\ndef custom_callback(popen: subprocess.Popen, stdout, stderr):\n    return popen.communicate(input=b\"Custom Input\")\ncmd6 = bash.create_cmd_runner(r'xargs echo', callback=custom_callback)\n\n# Combine in Conveyor\nfrom marsh import Conveyor\nconveyor = Conveyor()\\\n    .add_cmd_runner(cmd1)\\\n    .add_cmd_runner(cmd2)\\\n    .add_cmd_runner(cmd3)\\\n    .add_cmd_runner(cmd4)\\\n    .add_cmd_runner(cmd5)\\\n    .add_cmd_runner(cmd6)\n\nstdout, stderr = conveyor()\n```\n\n---\n\n### Running Remote Commands with `SshFactory`\n\n```python\nfrom marsh import Conveyor\nfrom marsh.ssh import SshFactory\n\nssh = SshFactory((\"user@host:port\",), {\"connect_kwargs\": {\"password\": \"the_ssh_password\"}})\ncmd1 = ssh.create_cmd_runner(\"echo Hello, Remote World\")\ncmd2 = ssh.create_chained_cmd_runner([\"echo Hi\", \"echo there\"])\nconveyor = Conveyor().add_cmd_runner(cmd1).add_cmd_runner(cmd2)\nstdout, stderr = conveyor()\n```\n\n---\n\n### Running Commands with `DockerCommandExecutor`\n\n```python\nfrom marsh.docker.docker_executor import DockerCommandExecutor\n\ndocker_executor = DockerCommandExecutor(\"bash:latest\", ...)\n\nstdout, stderr = docker_executor.run(\n    b\"x_stdout\", b\"x_stderr\",\n    environment=dict(ENV_VAR_1=\"value1\", ENV_VAR_2=\"value2\"),\n    workdir=\"/app\"\n)\n```\n\n---\n\n### Running Commands with `PythonExecutor`\n\n#### `eval` mode for evaluating python expressions\n\n```python\nfrom marsh import PythonExecutor\n\npy_code = \"\"\"x + y\"\"\"     # Python Evaluatable Expression\n\npython_executor = PythonExecutor(\n    py_code,\n    mode=\"eval\",\n    namespace=dict(x=1, y=2),\n    use_pickle=False,\n)\n\nstdout, stderr = python_executor.run(b\"x_stdout\", b\"x_stderr\", ...)\n\n```\n\n#### `exec` mode for executing python statements\n\n```python\nfrom marsh import PythonExecutor\n\npy_code = \"\"\"\nimport os\nimport sys\n\nprev_stdout = x_stdout    #<-- Use `x_stdout` to get the previous STDOUT\nprev_stderr = x_stderr    #<-- Use `x_stderr` to get the previous STDERR\nexec_result = x + y       #<-- Use `exec_result` for storing results and passing to STDOUT\n\"\"\"\n\npython_executor = PythonExecutor(\n    py_code,\n    mode=\"exec\",\n    namespace=dict(x=1, y=2),\n    use_pickle=False,\n)\n\nstdout, stderr = python_executor.run(b\"x_stdout\", b\"x_stderr\", ...)\n\n```\n\n**Note:** `eval` mode also have access to `x_stdout` and `x_stderr` but not `exec_result`.\n\n---\n\n### DAG Workflow\n\nThe DAG extends the capabilities of the core components by allowing non-linear dependencies between tasks.\n\nThe DAG subpackage has two main components: `Node` and `Dag`. The `Node` encapsulates a `Conveyor` that represents a\n_task_ in the workflow, while the `Dag` represents the whole workflow and task dependencies.\n\nNote that the `Dag` manages `Startable` objects, which is the abstract base class for both `Node` and `Dag`. This means\nthat a `Dag` can contain both `Node` objects and other `Dag` objects.\n\n**Different kinds of `Dag`:**\n\n- `SyncDag`\n- `AsyncDag`\n- `ThreadDag`\n- `ThreadPoolDag`\n- `MultiprocessDag`\n- `ProcessPoolDag`\n\n#### Defining Nodes\n\n```python\nfrom marsh import Conveyor\nfrom marsh.dag import Node\n\nconveyor = Conveyor().add_cmd_runner(cmd_runner, ...)\nnode = Node(\"node_name\", conveyor, **run_kwargs)\n```\n\n#### Defining and Running a Dag\n\n```python\nfrom marsh.dag import SyncDag\n\ndag = SyncDag(\"dag_name\")\n\n# Register Nodes\ndag.do(node_a)\ndag.do(node_a).then(node_b, node_c)       # A --> {B, C}\ndag.do(node_a).when(node_b, node_c)       # {B, C} --> A\ndag.do(other_dag).then(node_a)            # Register other Dag\n...\n\nresult_dict = dag.start()                 # Run the Dag\nresult = result_dict[\"node_or_dag_name\"]  # Get result from individual startables\n```\n\n\u26a0\ufe0f **IMPORTANT:**\n\n- `MultiprocessDag` and `ProcessPoolDag` requires the `start()` method to run in scope of `if __name__ == \"__main__\"`.\n  ```python\n  from marsh.dag import MultiprocessDag, ProcessPoolDag\n  \n  ...\n  \n  if __name__ == \"__main__\":\n      ...\n      dag.start()\n      ...\n  ```\n- As of the latest version, marsh DAG does not support **_result passing_** between task dependencies.\n\n---\n\n## License\n\nThis project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "Lightweight, extensible Python library for building, managing, and executing command workflows.",
    "version": "0.2.0",
    "project_urls": {
        "Homepage": "https://github.com/CedricAnover/marsh",
        "Repository": "https://github.com/CedricAnover/marsh"
    },
    "split_keywords": [
        "marsh",
        " marsh-lib"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "536c890f588b08d1b89656fddbf427db179eadf329c68d4051220c03191ea3f8",
                "md5": "d3d0707333971538a9f98dd58b138f2a",
                "sha256": "a611fe15adb0dbde2ce964c25a9bbb0d22b6a32473b96c4750fd718ad49062e3"
            },
            "downloads": -1,
            "filename": "marsh_lib-0.2.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "d3d0707333971538a9f98dd58b138f2a",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<3.13,>=3.10",
            "size": 41162,
            "upload_time": "2025-01-14T09:11:24",
            "upload_time_iso_8601": "2025-01-14T09:11:24.919047Z",
            "url": "https://files.pythonhosted.org/packages/53/6c/890f588b08d1b89656fddbf427db179eadf329c68d4051220c03191ea3f8/marsh_lib-0.2.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-01-14 09:11:24",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "CedricAnover",
    "github_project": "marsh",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "marsh-lib"
}
        
Elapsed time: 0.37923s