# Marsh
**Marsh** is a lightweight Python library for building, managing, and executing command workflows. It allows chaining
commands, defining custom pre/post processing logic, creating DAG workflows, and structuring flexible CLI workflows.
With support for local, remote, Docker-based, and Python execution, Marsh simplifies automating pipelines and
integrating external processes.
---
## Installation
Install Marsh via pip:
```text
pip install marsh-lib
```
---
## Key Features
- **Command Chains:** Chain multiple commands into reusable workflows.
- **Pre/Post Processors:** Add validation, logging, or error handling without modifying data.
- **Pre/Post Modifiers:** Transform input/output data during command execution.
- **Execution Options:** Local, Remote, Docker, Python, and custom runners.
- **DAG Workflows:** Support for DAG to define and run task dependencies.
---
## Quick Start
### Workflow with `Conveyor`
```python
from marsh import Conveyor
def cmd_1(stdout, stderr): return stdout.upper(), stderr
def cmd_2(stdout, stderr): return stdout, stderr.lower()
# Chain commands with Conveyor
conveyor = Conveyor().add_cmd_runner(cmd_1).add_cmd_runner(cmd_2)
stdout, stderr = conveyor(b"input", b"ERROR")
print(stdout, stderr)
```
**Output:**
```text
INPUT error
```
---
### Pre/Post Processors: Validating or Logging Data
**Processors** perform actions (e.g., logging, validation) without modifying data.
```python
from marsh import CmdRunDecorator
def validate(stdout, stderr): assert not stderr.strip() # Validate no errors
def log(stdout, stderr): print(f"LOG: {stdout.decode()}") # Log output
decorator = CmdRunDecorator()\
.add_processor(validate, before=True)\
.add_processor(log, before=False)
def cmd_runner(stdout, stderr): return stdout, stderr
decorated_runner = decorator.decorate(cmd_runner)
stdout, stderr = decorated_runner(b"Hello", b"")
```
**Output:**
```text
LOG: Hello
```
---
### Pre/Post Modifiers: Transforming Data
**Modifiers** transform the data before or after a command runs. Unlike processors, modifiers must return `(stdout, stderr)`.
```python
def to_upper(stdout, stderr): return stdout.upper(), stderr
def add_prefix(stdout, stderr): return b"Prefix: " + stdout, stderr
decorator = CmdRunDecorator()\
.add_mod_processor(to_upper, before=True)\
.add_mod_processor(add_prefix, before=False)
def cmd_runner(stdout, stderr): return stdout, stderr
decorated_runner = decorator.decorate(cmd_runner)
stdout, stderr = decorated_runner(b"hello", b"")
print(stdout.decode())
```
**Output:**
```text
Prefix: HELLO
```
**Key Difference:**
- **Processors** act on data without altering it.
- **Modifiers** transform the data and return new values.
⚠️ **IMPORTANT:**
> **Order of Evaluation for Processors and Modifiers**
>
> 1. **Pre-Modifiers**
> 2. **Pre-Processors**
> 3. **Command Runner**
> 4. **Post-Modifiers**
> 5. **Post-Processors**
---
### Passing `CmdRunDecorator` instance as parameter for reusabiliity
```python
from marsh import Conveyor, CmdRunDecorator
decorator = CmdRunDecorator().add_processor(...).add_mod_processor(...)
conveyor = Conveyor().add_cmd_runner(cmd_runner, cmd_runner_decorator=decorator)
stdout, stderr = conveyor()
```
---
### Using `@add_processors_and_modifiers` for decorating command runners
```python
from marsh import add_processors_and_modifiers
@add_processors_and_modifiers(
("mod", True, pre_mod_func, arg_tuple, kwg_dict), # Pre-Modifier
("proc", True, pre_proc_func, arg_tuple, kwg_dict), # Pre-Processor
("mod", False, post_mod_func, arg_tuple, kwg_dict), # Post-Modifier
("proc", False, post_proc_func, arg_tuple, kwg_dict), # Post-Processor
)
def cmd_runner(x_stdout: bytes, x_stderr: bytes):
...
return b"stdout", b"stderr"
```
---
### Running Local Commands with `BashFactory`
#### Simple Local Command
```python
from marsh.bash import BashFactory
bash = BashFactory()
cmd = bash.create_cmd_runner(r'echo "Hello, $NAME"', env={"NAME": "World"})
stdout, stderr = cmd(b"", b"")
```
#### Examples with `BashFactory`
```python
from pathlib import Path
from marsh.bash import BashFactory
bash = BashFactory()
# Inject Environment Variables
cmd1 = bash.create_cmd_runner(
r'echo "($ENV_VAR_1, $ENV_VAR_2)"',
env={
"ENV_VAR_1": "value1",
"ENV_VAR_2": "value2"
}
)
# Change Working Directory
cmd2 = bash.create_cmd_runner(r'echo "CWD: $PWD"', cwd=str(Path.cwd().parent))
# Unix Pipes
cmd3 = bash.create_cmd_runner(r'echo -e "Line1\nLine2\nLine3"')
cmd4 = bash.create_cmd_runner(r'grep 2 | sort', executor_kwargs={"pipe_prev_stdout": True})
# Python Command
cmd5 = bash.create_cmd_runner(r'python -c "print(\"Hello Python\")"')
# Custom Callback
import subprocess
def custom_callback(popen: subprocess.Popen, stdout, stderr):
return popen.communicate(input=b"Custom Input")
cmd6 = bash.create_cmd_runner(r'xargs echo', callback=custom_callback)
# Combine in Conveyor
from marsh import Conveyor
conveyor = Conveyor()\
.add_cmd_runner(cmd1)\
.add_cmd_runner(cmd2)\
.add_cmd_runner(cmd3)\
.add_cmd_runner(cmd4)\
.add_cmd_runner(cmd5)\
.add_cmd_runner(cmd6)
stdout, stderr = conveyor()
```
---
### Running Remote Commands with `SshFactory`
```python
from marsh import Conveyor
from marsh.ssh import SshFactory
ssh = SshFactory(("user@host:port",), {"connect_kwargs": {"password": "the_ssh_password"}})
cmd1 = ssh.create_cmd_runner("echo Hello, Remote World")
cmd2 = ssh.create_chained_cmd_runner(["echo Hi", "echo there"])
conveyor = Conveyor().add_cmd_runner(cmd1).add_cmd_runner(cmd2)
stdout, stderr = conveyor()
```
---
### Running Commands with `DockerCommandExecutor`
```python
from marsh.docker.docker_executor import DockerCommandExecutor
docker_executor = DockerCommandExecutor("bash:latest", ...)
stdout, stderr = docker_executor.run(
b"x_stdout", b"x_stderr",
environment=dict(ENV_VAR_1="value1", ENV_VAR_2="value2"),
workdir="/app"
)
```
---
### Running Commands with `PythonExecutor`
#### `eval` mode for evaluating python expressions
```python
from marsh import PythonExecutor
py_code = """x + y""" # Python Evaluatable Expression
python_executor = PythonExecutor(
py_code,
mode="eval",
namespace=dict(x=1, y=2),
use_pickle=False,
)
stdout, stderr = python_executor.run(b"x_stdout", b"x_stderr", ...)
```
#### `exec` mode for executing python statements
```python
from marsh import PythonExecutor
py_code = """
import os
import sys
prev_stdout = x_stdout #<-- Use `x_stdout` to get the previous STDOUT
prev_stderr = x_stderr #<-- Use `x_stderr` to get the previous STDERR
exec_result = x + y #<-- Use `exec_result` for storing results and passing to STDOUT
"""
python_executor = PythonExecutor(
py_code,
mode="exec",
namespace=dict(x=1, y=2),
use_pickle=False,
)
stdout, stderr = python_executor.run(b"x_stdout", b"x_stderr", ...)
```
**Note:** `eval` mode also have access to `x_stdout` and `x_stderr` but not `exec_result`.
---
### DAG Workflow
The DAG extends the capabilities of the core components by allowing non-linear dependencies between tasks.
The DAG subpackage has two main components: `Node` and `Dag`. The `Node` encapsulates a `Conveyor` that represents a
_task_ in the workflow, while the `Dag` represents the whole workflow and task dependencies.
Note that the `Dag` manages `Startable` objects, which is the abstract base class for both `Node` and `Dag`. This means
that a `Dag` can contain both `Node` objects and other `Dag` objects.
**Different kinds of `Dag`:**
- `SyncDag`
- `AsyncDag`
- `ThreadDag`
- `ThreadPoolDag`
- `MultiprocessDag`
- `ProcessPoolDag`
#### Defining Nodes
```python
from marsh import Conveyor
from marsh.dag import Node
conveyor = Conveyor().add_cmd_runner(cmd_runner, ...)
node = Node("node_name", conveyor, **run_kwargs)
```
#### Defining and Running a Dag
```python
from marsh.dag import SyncDag
dag = SyncDag("dag_name")
# Register Nodes
dag.do(node_a)
dag.do(node_a).then(node_b, node_c) # A --> {B, C}
dag.do(node_a).when(node_b, node_c) # {B, C} --> A
dag.do(other_dag).then(node_a) # Register other Dag
...
result_dict = dag.start() # Run the Dag
result = result_dict["node_or_dag_name"] # Get result from individual startables
```
⚠️ **IMPORTANT:**
- `MultiprocessDag` and `ProcessPoolDag` requires the `start()` method to run in scope of `if __name__ == "__main__"`.
```python
from marsh.dag import MultiprocessDag, ProcessPoolDag
...
if __name__ == "__main__":
...
dag.start()
...
```
- As of the latest version, marsh DAG does not support **_result passing_** between task dependencies.
---
## License
This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.
Raw data
{
"_id": null,
"home_page": "https://github.com/CedricAnover/marsh",
"name": "marsh-lib",
"maintainer": null,
"docs_url": null,
"requires_python": "<3.13,>=3.10",
"maintainer_email": null,
"keywords": "marsh, marsh-lib",
"author": "Cedric Anover",
"author_email": "cedric.anover@hotmail.com",
"download_url": null,
"platform": null,
"description": "# Marsh\n\n**Marsh** is a lightweight Python library for building, managing, and executing command workflows. It allows chaining\ncommands, defining custom pre/post processing logic, creating DAG workflows, and structuring flexible CLI workflows.\nWith support for local, remote, Docker-based, and Python execution, Marsh simplifies automating pipelines and\nintegrating external processes.\n\n---\n\n## Installation\n\nInstall Marsh via pip:\n```text\npip install marsh-lib\n```\n\n---\n\n## Key Features\n\n- **Command Chains:** Chain multiple commands into reusable workflows.\n- **Pre/Post Processors:** Add validation, logging, or error handling without modifying data.\n- **Pre/Post Modifiers:** Transform input/output data during command execution.\n- **Execution Options:** Local, Remote, Docker, Python, and custom runners.\n- **DAG Workflows:** Support for DAG to define and run task dependencies.\n\n---\n\n## Quick Start\n\n### Workflow with `Conveyor`\n\n```python\nfrom marsh import Conveyor\n\ndef cmd_1(stdout, stderr): return stdout.upper(), stderr\ndef cmd_2(stdout, stderr): return stdout, stderr.lower()\n\n# Chain commands with Conveyor\nconveyor = Conveyor().add_cmd_runner(cmd_1).add_cmd_runner(cmd_2)\nstdout, stderr = conveyor(b\"input\", b\"ERROR\")\nprint(stdout, stderr)\n```\n\n**Output:** \n```text\nINPUT error\n```\n\n---\n\n### Pre/Post Processors: Validating or Logging Data\n\n**Processors** perform actions (e.g., logging, validation) without modifying data.\n\n```python\nfrom marsh import CmdRunDecorator\n\ndef validate(stdout, stderr): assert not stderr.strip() # Validate no errors\ndef log(stdout, stderr): print(f\"LOG: {stdout.decode()}\") # Log output\n\ndecorator = CmdRunDecorator()\\\n .add_processor(validate, before=True)\\\n .add_processor(log, before=False)\n\ndef cmd_runner(stdout, stderr): return stdout, stderr\ndecorated_runner = decorator.decorate(cmd_runner)\nstdout, stderr = decorated_runner(b\"Hello\", b\"\")\n```\n\n**Output:** \n```text\nLOG: Hello\n```\n\n---\n\n### Pre/Post Modifiers: Transforming Data\n\n**Modifiers** transform the data before or after a command runs. Unlike processors, modifiers must return `(stdout, stderr)`.\n\n```python\ndef to_upper(stdout, stderr): return stdout.upper(), stderr\ndef add_prefix(stdout, stderr): return b\"Prefix: \" + stdout, stderr\n\ndecorator = CmdRunDecorator()\\\n .add_mod_processor(to_upper, before=True)\\\n .add_mod_processor(add_prefix, before=False)\n\ndef cmd_runner(stdout, stderr): return stdout, stderr\ndecorated_runner = decorator.decorate(cmd_runner)\nstdout, stderr = decorated_runner(b\"hello\", b\"\")\nprint(stdout.decode())\n```\n\n**Output:** \n```text\nPrefix: HELLO\n```\n\n**Key Difference:** \n- **Processors** act on data without altering it. \n- **Modifiers** transform the data and return new values.\n\n\u26a0\ufe0f **IMPORTANT:** \n> **Order of Evaluation for Processors and Modifiers**\n> \n> 1. **Pre-Modifiers** \n> 2. **Pre-Processors** \n> 3. **Command Runner** \n> 4. **Post-Modifiers** \n> 5. **Post-Processors** \n\n---\n\n### Passing `CmdRunDecorator` instance as parameter for reusabiliity\n\n```python\nfrom marsh import Conveyor, CmdRunDecorator\n\ndecorator = CmdRunDecorator().add_processor(...).add_mod_processor(...)\nconveyor = Conveyor().add_cmd_runner(cmd_runner, cmd_runner_decorator=decorator)\nstdout, stderr = conveyor()\n```\n\n---\n\n### Using `@add_processors_and_modifiers` for decorating command runners\n\n```python\nfrom marsh import add_processors_and_modifiers\n\n\n@add_processors_and_modifiers(\n (\"mod\", True, pre_mod_func, arg_tuple, kwg_dict), # Pre-Modifier\n (\"proc\", True, pre_proc_func, arg_tuple, kwg_dict), # Pre-Processor\n (\"mod\", False, post_mod_func, arg_tuple, kwg_dict), # Post-Modifier\n (\"proc\", False, post_proc_func, arg_tuple, kwg_dict), # Post-Processor\n)\ndef cmd_runner(x_stdout: bytes, x_stderr: bytes):\n ...\n return b\"stdout\", b\"stderr\"\n```\n\n---\n\n### Running Local Commands with `BashFactory`\n\n#### Simple Local Command\n```python\nfrom marsh.bash import BashFactory\n\nbash = BashFactory()\ncmd = bash.create_cmd_runner(r'echo \"Hello, $NAME\"', env={\"NAME\": \"World\"})\nstdout, stderr = cmd(b\"\", b\"\")\n```\n\n#### Examples with `BashFactory`\n```python\nfrom pathlib import Path\nfrom marsh.bash import BashFactory\n\nbash = BashFactory()\n\n# Inject Environment Variables\ncmd1 = bash.create_cmd_runner(\n r'echo \"($ENV_VAR_1, $ENV_VAR_2)\"',\n env={\n \"ENV_VAR_1\": \"value1\",\n \"ENV_VAR_2\": \"value2\"\n }\n)\n\n# Change Working Directory\ncmd2 = bash.create_cmd_runner(r'echo \"CWD: $PWD\"', cwd=str(Path.cwd().parent))\n\n# Unix Pipes\ncmd3 = bash.create_cmd_runner(r'echo -e \"Line1\\nLine2\\nLine3\"')\ncmd4 = bash.create_cmd_runner(r'grep 2 | sort', executor_kwargs={\"pipe_prev_stdout\": True})\n\n# Python Command\ncmd5 = bash.create_cmd_runner(r'python -c \"print(\\\"Hello Python\\\")\"')\n\n# Custom Callback\nimport subprocess\ndef custom_callback(popen: subprocess.Popen, stdout, stderr):\n return popen.communicate(input=b\"Custom Input\")\ncmd6 = bash.create_cmd_runner(r'xargs echo', callback=custom_callback)\n\n# Combine in Conveyor\nfrom marsh import Conveyor\nconveyor = Conveyor()\\\n .add_cmd_runner(cmd1)\\\n .add_cmd_runner(cmd2)\\\n .add_cmd_runner(cmd3)\\\n .add_cmd_runner(cmd4)\\\n .add_cmd_runner(cmd5)\\\n .add_cmd_runner(cmd6)\n\nstdout, stderr = conveyor()\n```\n\n---\n\n### Running Remote Commands with `SshFactory`\n\n```python\nfrom marsh import Conveyor\nfrom marsh.ssh import SshFactory\n\nssh = SshFactory((\"user@host:port\",), {\"connect_kwargs\": {\"password\": \"the_ssh_password\"}})\ncmd1 = ssh.create_cmd_runner(\"echo Hello, Remote World\")\ncmd2 = ssh.create_chained_cmd_runner([\"echo Hi\", \"echo there\"])\nconveyor = Conveyor().add_cmd_runner(cmd1).add_cmd_runner(cmd2)\nstdout, stderr = conveyor()\n```\n\n---\n\n### Running Commands with `DockerCommandExecutor`\n\n```python\nfrom marsh.docker.docker_executor import DockerCommandExecutor\n\ndocker_executor = DockerCommandExecutor(\"bash:latest\", ...)\n\nstdout, stderr = docker_executor.run(\n b\"x_stdout\", b\"x_stderr\",\n environment=dict(ENV_VAR_1=\"value1\", ENV_VAR_2=\"value2\"),\n workdir=\"/app\"\n)\n```\n\n---\n\n### Running Commands with `PythonExecutor`\n\n#### `eval` mode for evaluating python expressions\n\n```python\nfrom marsh import PythonExecutor\n\npy_code = \"\"\"x + y\"\"\" # Python Evaluatable Expression\n\npython_executor = PythonExecutor(\n py_code,\n mode=\"eval\",\n namespace=dict(x=1, y=2),\n use_pickle=False,\n)\n\nstdout, stderr = python_executor.run(b\"x_stdout\", b\"x_stderr\", ...)\n\n```\n\n#### `exec` mode for executing python statements\n\n```python\nfrom marsh import PythonExecutor\n\npy_code = \"\"\"\nimport os\nimport sys\n\nprev_stdout = x_stdout #<-- Use `x_stdout` to get the previous STDOUT\nprev_stderr = x_stderr #<-- Use `x_stderr` to get the previous STDERR\nexec_result = x + y #<-- Use `exec_result` for storing results and passing to STDOUT\n\"\"\"\n\npython_executor = PythonExecutor(\n py_code,\n mode=\"exec\",\n namespace=dict(x=1, y=2),\n use_pickle=False,\n)\n\nstdout, stderr = python_executor.run(b\"x_stdout\", b\"x_stderr\", ...)\n\n```\n\n**Note:** `eval` mode also have access to `x_stdout` and `x_stderr` but not `exec_result`.\n\n---\n\n### DAG Workflow\n\nThe DAG extends the capabilities of the core components by allowing non-linear dependencies between tasks.\n\nThe DAG subpackage has two main components: `Node` and `Dag`. The `Node` encapsulates a `Conveyor` that represents a\n_task_ in the workflow, while the `Dag` represents the whole workflow and task dependencies.\n\nNote that the `Dag` manages `Startable` objects, which is the abstract base class for both `Node` and `Dag`. This means\nthat a `Dag` can contain both `Node` objects and other `Dag` objects.\n\n**Different kinds of `Dag`:**\n\n- `SyncDag`\n- `AsyncDag`\n- `ThreadDag`\n- `ThreadPoolDag`\n- `MultiprocessDag`\n- `ProcessPoolDag`\n\n#### Defining Nodes\n\n```python\nfrom marsh import Conveyor\nfrom marsh.dag import Node\n\nconveyor = Conveyor().add_cmd_runner(cmd_runner, ...)\nnode = Node(\"node_name\", conveyor, **run_kwargs)\n```\n\n#### Defining and Running a Dag\n\n```python\nfrom marsh.dag import SyncDag\n\ndag = SyncDag(\"dag_name\")\n\n# Register Nodes\ndag.do(node_a)\ndag.do(node_a).then(node_b, node_c) # A --> {B, C}\ndag.do(node_a).when(node_b, node_c) # {B, C} --> A\ndag.do(other_dag).then(node_a) # Register other Dag\n...\n\nresult_dict = dag.start() # Run the Dag\nresult = result_dict[\"node_or_dag_name\"] # Get result from individual startables\n```\n\n\u26a0\ufe0f **IMPORTANT:**\n\n- `MultiprocessDag` and `ProcessPoolDag` requires the `start()` method to run in scope of `if __name__ == \"__main__\"`.\n ```python\n from marsh.dag import MultiprocessDag, ProcessPoolDag\n \n ...\n \n if __name__ == \"__main__\":\n ...\n dag.start()\n ...\n ```\n- As of the latest version, marsh DAG does not support **_result passing_** between task dependencies.\n\n---\n\n## License\n\nThis project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Lightweight, extensible Python library for building, managing, and executing command workflows.",
"version": "0.2.0",
"project_urls": {
"Homepage": "https://github.com/CedricAnover/marsh",
"Repository": "https://github.com/CedricAnover/marsh"
},
"split_keywords": [
"marsh",
" marsh-lib"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "536c890f588b08d1b89656fddbf427db179eadf329c68d4051220c03191ea3f8",
"md5": "d3d0707333971538a9f98dd58b138f2a",
"sha256": "a611fe15adb0dbde2ce964c25a9bbb0d22b6a32473b96c4750fd718ad49062e3"
},
"downloads": -1,
"filename": "marsh_lib-0.2.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "d3d0707333971538a9f98dd58b138f2a",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<3.13,>=3.10",
"size": 41162,
"upload_time": "2025-01-14T09:11:24",
"upload_time_iso_8601": "2025-01-14T09:11:24.919047Z",
"url": "https://files.pythonhosted.org/packages/53/6c/890f588b08d1b89656fddbf427db179eadf329c68d4051220c03191ea3f8/marsh_lib-0.2.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-01-14 09:11:24",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "CedricAnover",
"github_project": "marsh",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "marsh-lib"
}