fsdag


Namefsdag JSON
Version 1.0.1 PyPI version JSON
download
home_pageNone
SummarySimply define DAG-workflows in Python where artefacts are stored on a filesystem.
upload_time2024-12-10 04:36:26
maintainerNone
docs_urlNone
authorNone
requires_python>=3.8
licenseMIT License
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # fsdag

[![CI](https://github.com/mristin/fsdag/actions/workflows/ci.yml/badge.svg)](https://github.com/mristin/fsdag/actions/workflows/ci.yml)
[![Coverage Status](https://coveralls.io/repos/github/mristin/fsdag/badge.svg?branch=main)](https://coveralls.io/github/mristin/fsdag)
[![PyPI - Version](https://badge.fury.io/py/fsdag.svg)](https://badge.fury.io/py/fsdag)
![PyPI - Python Version](https://img.shields.io/pypi/pyversions/fsdag.svg)

This library allows you to simply define DAG-workflows in Python where artefacts are stored on a filesystem.

Fsdag aims at simple personal or group projects, where no dependencies and simplicity are paramount.
It is implemented in less than 100 lines of code.

For more complex workflow libraries, see:
* [pydags],
* [hamilton],
* [luigi],
* ... and many others on: https://github.com/pditommaso/awesome-pipeline

[pydags]: https://pypi.org/project/pydags/
[hamilton]: https://pypi.org/project/sf-hamilton/
[luigi]: https://pypi.org/project/luigi/

## Approach

You simply define nodes of your workflow, and execute them lazily.
Each node corresponds to an artefact.
If the artefact already exists on the filesystem, it will be loaded; otherwise, it will be computed.
Once loaded or computed, the artefacts are kept in memory for further access.

## Installation

To install fsdag, simply run the following command in your virtual environment:
```
pip3 install fsdag
```

## Usage

The workflow node is implemented as an abstract class `fsdag.Node`.
For your concrete nodes, you have to implement the following methods:
* `_path`: where the artefact should be stored on disk,
* `_save`: how to store the artefact to `_path()`,
* `_load`: how to load the artefact from `_path()`, and
* `_compute`: how to compute the artefact.

To resolve the node, call `resolve()`.

## Examples

### Basic Example

Here is an example showing how you can model a node where the data is de/serialized using JSON.

```python
import json
import pathlib
from typing import List

import fsdag

class Something(fsdag.Node[List[int]]):
    def _path(self) -> pathlib.Path:
        return pathlib.Path("/some/path/something.json")

    def _save(self, artefact: List[int]) -> None:
        self._path().write_text(json.dumps(artefact))

    def _load(self) -> List[int]:
        return json.loads(
            self._path().read_text()
        )  # type: ignore

    def _compute(self) -> List[int]:
        return [1, 2, 3]

something = Something()
print(something.resolve())
# Outputs: [1, 2, 3]
# The artefact is now saved to the filesystem. It is also kept
# in memory # for faster access if you ever resolve it again.

# For example, calling ``resolve`` here again retrieves
# the artefact from the memory cache:
print(something.resolve())
# Outputs: [1, 2, 3]

another_something = Something()
# This call to the ``resolve`` method will not perform
# the computation, but load the artefact from the filesystem.
print(another_something.resolve())
# Outputs: [1, 2, 3]
```

### `None` Artefact

Some tasks contain no artefact, *i.e.*, they are mere procedures which should be executed, but return nothing.
To model such procedures, use `None` as the generic parameter and a marker file:

```python
import pathlib

import fsdag

class Something(fsdag.Node[None]):
    def _path(self) -> pathlib.Path:
        return pathlib.Path("/path/to/somewhere/done")

    def _save(self, artefact: None) -> None:
        self._path().write_text("done")

    def _load(self) -> None:
        return

    def _compute(self) -> None:
        # Perform some complex procedure.
        ...
        return

something = Something()
# The procedure is executed here once.
something.resolve()

another_something = Something()
# This resolution does nothing as the procedure 
# has been already executed.
another_something.resolve()
```

### Workflow Graph

Here is a full example of a simple workflow graph.

```python
import json
import pathlib

import fsdag

class Something(fsdag.Node[int]):
    def _path(self) -> pathlib.Path:
        return pathlib.Path("/some/path/something.json")

    def _save(self, artefact: int) -> None:
        self._path().write_text(json.dumps(artefact))

    def _load(self) -> int:
        return json.loads(
            self._path().read_text()
        )  # type: ignore

    def _compute(self) -> int:
        return 1


class Another(fsdag.Node[int]):
    def _path(self) -> pathlib.Path:
        return pathlib.Path("/some/path/another.json")

    def _save(self, artefact: int) -> None:
        self._path().write_text(json.dumps(artefact))

    def _load(self) -> int:
        return json.loads(
            self._path().read_text()
        )  # type: ignore

    def _compute(self) -> int:
        return 2

class Sum(fsdag.Node[int]):
    def __init__(
            self, 
            something: Something, 
            another: Another
    ) -> None:
        super().__init__()
        self.something = something
        self.another = another
    
    def _path(self) -> pathlib.Path:
        return pathlib.Path("/some/path/sum.json")

    def _save(self, artefact: int) -> None:
        self._path().write_text(json.dumps(artefact))

    def _load(self) -> int:
        return json.loads(
            self._path().read_text()
        )  # type: ignore

    def _compute(self) -> int:
        # Note the calls to ``resolve`` methods here.
        return (
            self.something.resolve() 
            + self.another.resolve()
        )

something = Something()
another = Another()

result = Sum(something=something, another=another)

# The call to ``result.resolve`` will recursively and 
# lazily resolve the ``something`` and ``another``.
print(result.resolve())
# Outputs: 3
```

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "fsdag",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.8",
    "maintainer_email": null,
    "keywords": null,
    "author": null,
    "author_email": null,
    "download_url": "https://files.pythonhosted.org/packages/41/50/8b07eab2fad24ec9c9f97f45f4b2e612b8f9a2fcada29f52ce3addf8d9a9/fsdag-1.0.1.tar.gz",
    "platform": null,
    "description": "# fsdag\n\n[![CI](https://github.com/mristin/fsdag/actions/workflows/ci.yml/badge.svg)](https://github.com/mristin/fsdag/actions/workflows/ci.yml)\n[![Coverage Status](https://coveralls.io/repos/github/mristin/fsdag/badge.svg?branch=main)](https://coveralls.io/github/mristin/fsdag)\n[![PyPI - Version](https://badge.fury.io/py/fsdag.svg)](https://badge.fury.io/py/fsdag)\n![PyPI - Python Version](https://img.shields.io/pypi/pyversions/fsdag.svg)\n\nThis library allows you to simply define DAG-workflows in Python where artefacts are stored on a filesystem.\n\nFsdag aims at simple personal or group projects, where no dependencies and simplicity are paramount.\nIt is implemented in less than 100 lines of code.\n\nFor more complex workflow libraries, see:\n* [pydags],\n* [hamilton],\n* [luigi],\n* ... and many others on: https://github.com/pditommaso/awesome-pipeline\n\n[pydags]: https://pypi.org/project/pydags/\n[hamilton]: https://pypi.org/project/sf-hamilton/\n[luigi]: https://pypi.org/project/luigi/\n\n## Approach\n\nYou simply define nodes of your workflow, and execute them lazily.\nEach node corresponds to an artefact.\nIf the artefact already exists on the filesystem, it will be loaded; otherwise, it will be computed.\nOnce loaded or computed, the artefacts are kept in memory for further access.\n\n## Installation\n\nTo install fsdag, simply run the following command in your virtual environment:\n```\npip3 install fsdag\n```\n\n## Usage\n\nThe workflow node is implemented as an abstract class `fsdag.Node`.\nFor your concrete nodes, you have to implement the following methods:\n* `_path`: where the artefact should be stored on disk,\n* `_save`: how to store the artefact to `_path()`,\n* `_load`: how to load the artefact from `_path()`, and\n* `_compute`: how to compute the artefact.\n\nTo resolve the node, call `resolve()`.\n\n## Examples\n\n### Basic Example\n\nHere is an example showing how you can model a node where the data is de/serialized using JSON.\n\n```python\nimport json\nimport pathlib\nfrom typing import List\n\nimport fsdag\n\nclass Something(fsdag.Node[List[int]]):\n    def _path(self) -> pathlib.Path:\n        return pathlib.Path(\"/some/path/something.json\")\n\n    def _save(self, artefact: List[int]) -> None:\n        self._path().write_text(json.dumps(artefact))\n\n    def _load(self) -> List[int]:\n        return json.loads(\n            self._path().read_text()\n        )  # type: ignore\n\n    def _compute(self) -> List[int]:\n        return [1, 2, 3]\n\nsomething = Something()\nprint(something.resolve())\n# Outputs: [1, 2, 3]\n# The artefact is now saved to the filesystem. It is also kept\n# in memory # for faster access if you ever resolve it again.\n\n# For example, calling ``resolve`` here again retrieves\n# the artefact from the memory cache:\nprint(something.resolve())\n# Outputs: [1, 2, 3]\n\nanother_something = Something()\n# This call to the ``resolve`` method will not perform\n# the computation, but load the artefact from the filesystem.\nprint(another_something.resolve())\n# Outputs: [1, 2, 3]\n```\n\n### `None` Artefact\n\nSome tasks contain no artefact, *i.e.*, they are mere procedures which should be executed, but return nothing.\nTo model such procedures, use `None` as the generic parameter and a marker file:\n\n```python\nimport pathlib\n\nimport fsdag\n\nclass Something(fsdag.Node[None]):\n    def _path(self) -> pathlib.Path:\n        return pathlib.Path(\"/path/to/somewhere/done\")\n\n    def _save(self, artefact: None) -> None:\n        self._path().write_text(\"done\")\n\n    def _load(self) -> None:\n        return\n\n    def _compute(self) -> None:\n        # Perform some complex procedure.\n        ...\n        return\n\nsomething = Something()\n# The procedure is executed here once.\nsomething.resolve()\n\nanother_something = Something()\n# This resolution does nothing as the procedure \n# has been already executed.\nanother_something.resolve()\n```\n\n### Workflow Graph\n\nHere is a full example of a simple workflow graph.\n\n```python\nimport json\nimport pathlib\n\nimport fsdag\n\nclass Something(fsdag.Node[int]):\n    def _path(self) -> pathlib.Path:\n        return pathlib.Path(\"/some/path/something.json\")\n\n    def _save(self, artefact: int) -> None:\n        self._path().write_text(json.dumps(artefact))\n\n    def _load(self) -> int:\n        return json.loads(\n            self._path().read_text()\n        )  # type: ignore\n\n    def _compute(self) -> int:\n        return 1\n\n\nclass Another(fsdag.Node[int]):\n    def _path(self) -> pathlib.Path:\n        return pathlib.Path(\"/some/path/another.json\")\n\n    def _save(self, artefact: int) -> None:\n        self._path().write_text(json.dumps(artefact))\n\n    def _load(self) -> int:\n        return json.loads(\n            self._path().read_text()\n        )  # type: ignore\n\n    def _compute(self) -> int:\n        return 2\n\nclass Sum(fsdag.Node[int]):\n    def __init__(\n            self, \n            something: Something, \n            another: Another\n    ) -> None:\n        super().__init__()\n        self.something = something\n        self.another = another\n    \n    def _path(self) -> pathlib.Path:\n        return pathlib.Path(\"/some/path/sum.json\")\n\n    def _save(self, artefact: int) -> None:\n        self._path().write_text(json.dumps(artefact))\n\n    def _load(self) -> int:\n        return json.loads(\n            self._path().read_text()\n        )  # type: ignore\n\n    def _compute(self) -> int:\n        # Note the calls to ``resolve`` methods here.\n        return (\n            self.something.resolve() \n            + self.another.resolve()\n        )\n\nsomething = Something()\nanother = Another()\n\nresult = Sum(something=something, another=another)\n\n# The call to ``result.resolve`` will recursively and \n# lazily resolve the ``something`` and ``another``.\nprint(result.resolve())\n# Outputs: 3\n```\n",
    "bugtrack_url": null,
    "license": "MIT License",
    "summary": "Simply define DAG-workflows in Python where artefacts are stored on a filesystem.",
    "version": "1.0.1",
    "project_urls": {
        "repository": "https://github.com/mristin/fsdag"
    },
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "39f755caa425a4f901ec07fe5acd350c87fc38fbe1d44da7ff4a30e69a488285",
                "md5": "a69810555627cad3980d092003eb6f44",
                "sha256": "f8f060bb9fa2dec0a9512718b3f372a0463e6f46a8b43720e268b9472afd7442"
            },
            "downloads": -1,
            "filename": "fsdag-1.0.1-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "a69810555627cad3980d092003eb6f44",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.8",
            "size": 4965,
            "upload_time": "2024-12-10T04:36:23",
            "upload_time_iso_8601": "2024-12-10T04:36:23.688454Z",
            "url": "https://files.pythonhosted.org/packages/39/f7/55caa425a4f901ec07fe5acd350c87fc38fbe1d44da7ff4a30e69a488285/fsdag-1.0.1-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "41508b07eab2fad24ec9c9f97f45f4b2e612b8f9a2fcada29f52ce3addf8d9a9",
                "md5": "5ff5afd37b41a422b59d6eb754b1d3e2",
                "sha256": "04b114e8b322376a75001dd1556d45c7f0e94538eaa8fa32b435cd811bcf5295"
            },
            "downloads": -1,
            "filename": "fsdag-1.0.1.tar.gz",
            "has_sig": false,
            "md5_digest": "5ff5afd37b41a422b59d6eb754b1d3e2",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8",
            "size": 5425,
            "upload_time": "2024-12-10T04:36:26",
            "upload_time_iso_8601": "2024-12-10T04:36:26.102826Z",
            "url": "https://files.pythonhosted.org/packages/41/50/8b07eab2fad24ec9c9f97f45f4b2e612b8f9a2fcada29f52ce3addf8d9a9/fsdag-1.0.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-12-10 04:36:26",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "mristin",
    "github_project": "fsdag",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "fsdag"
}
        
Elapsed time: 0.38196s