znflow


Nameznflow JSON
Version 0.1.14 PyPI version JSON
download
home_page
SummaryA general purpose framework for building and running computational graphs.
upload_time2023-09-21 12:55:49
maintainer
docs_urlNone
authorzincwarecode
requires_python>=3.8,<4.0
licenseApache-2.0
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            [![zincware](https://img.shields.io/badge/Powered%20by-zincware-darkcyan)](https://github.com/zincware)
[![Coverage Status](https://coveralls.io/repos/github/zincware/ZnFlow/badge.svg?branch=main)](https://coveralls.io/github/zincware/ZnFlow?branch=main)
[![PyPI version](https://badge.fury.io/py/znflow.svg)](https://badge.fury.io/py/znflow)
[![Binder](https://mybinder.org/badge_logo.svg)](https://mybinder.org/v2/gh/zincware/ZnFlow/HEAD)

# ZnFlow

The `ZnFlow` package provides a basic structure for building computational
graphs based on functions or classes. It is designed as a lightweight
abstraction layer to

- learn graph computing.
- build your own packages on top of it.

## Installation

```shell
pip install znflow
```

## Usage

### Connecting Functions

With ZnFlow you can connect functions to each other by using the `@nodify`
decorator. Inside the `znflow.DiGraph` the decorator will return a
`FunctionFuture` object that can be used to connect the function to other nodes.
The `FunctionFuture` object will also be used to retrieve the result of the
function. Outside the `znflow.DiGraph` the function behaves as a normal
function.

```python
import znflow

@znflow.nodify
def compute_mean(x, y):
    return (x + y) / 2

print(compute_mean(2, 8))
# >>> 5

with znflow.DiGraph() as graph:
    mean = compute_mean(2, 8)

graph.run()
print(mean.result)
# >>> 5

with znflow.DiGraph() as graph:
    n1 = compute_mean(2, 8)
    n2 = compute_mean(13, 7)
    n3 = compute_mean(n1, n2)

graph.run()
print(n3.result)
# >>> 7.5
```

### Connecting Classes

It is also possible to connect classes. They can be connected either directly or
via class attributes. This is possible by returning `znflow.Connections` inside
the `znflow.DiGraph` context manager. Outside the `znflow.DiGraph` the class
behaves as a normal class.

In the following example we use a dataclass, but it works with all Python
classes that inherit from `znflow.Node`.

```python
import znflow
import dataclasses

@znflow.nodify
def compute_mean(x, y):
    return (x + y) / 2

@dataclasses.dataclass
class ComputeMean(znflow.Node):
    x: float
    y: float

    results: float = None

    def run(self):
        self.results = (self.x + self.y) / 2

with znflow.DiGraph() as graph:
    n1 = ComputeMean(2, 8)
    n2 = compute_mean(13, 7)
    # connecting classes and functions to a Node
    n3 = ComputeMean(n1.results, n2)

graph.run()
print(n3.results)
# >>> 7.5
```

### Dask Support

ZnFlow comes with support for [Dask](https://www.dask.org/) to run your graph:

- in parallel.
- through e.g. SLURM (see https://jobqueue.dask.org/en/latest/api.html).
- with a nice GUI to track progress.

All you need to do is install ZnFlow with Dask `pip install znflow[dask]`. We
can then extend the example from above. This will run `n1` and `n2` in parallel.
You can investigate the graph on the Dask dashboard (typically
http://127.0.0.1:8787/graph or via the client object in Jupyter.)

```python
import znflow
import dataclasses
from dask.distributed import Client

@znflow.nodify
def compute_mean(x, y):
    return (x + y) / 2

@dataclasses.dataclass
class ComputeMean(znflow.Node):
    x: float
    y: float

    results: float = None

    def run(self):
        self.results = (self.x + self.y) / 2

with znflow.DiGraph() as graph:
    n1 = ComputeMean(2, 8)
    n2 = compute_mean(13, 7)
    # connecting classes and functions to a Node
    n3 = ComputeMean(n1.results, n2)

client = Client()
deployment = znflow.deployment.Deployment(graph=graph, client=client)
deployment.submit_graph()

n3 = deployment.get_results(n3)
print(n3)
# >>> ComputeMean(x=5.0, y=10.0, results=7.5)
```

We need to get the updated instance from the Dask worker via
`Deployment.get_results`. Due to the way Dask works, an inplace update is not
possible. To retrieve the full graph, you can use
`Deployment.get_results(graph.nodes)` instead.

### Working with lists

ZnFlow supports some special features for working with lists. In the following
example we want to `combine` two lists.

```python
import znflow

@znflow.nodify
def arange(size: int) -> list:
    return list(range(size))

print(arange(2) + arange(3))
>>> [0, 1, 0, 1, 2]

with znflow.DiGraph() as graph:
    lst = arange(2) + arange(3)

graph.run()
print(lst.result)
>>> [0, 1, 0, 1, 2]
```

This functionality is restricted to lists. There are some further features that
allow combining `data: list[list]` by either using
`data: list = znflow.combine(data)` which has an optional `attribute=None`
argument to be used in the case of classes or you can simply use
`data: list = sum(data, [])`.

### Attributes Access

Inside the `with znflow.DiGraph()` context manager, accessing class attributes
yields `znflow.Connector` objects. Sometimes, it may be required to obtain the
actual attribute value instead of a `znflow.Connector` object. It is not
recommended to run class methods inside the `with znflow.DiGraph()` context
manager since it should be exclusively used for building the graph and not for
actual computation.

In the case of properties or other descriptor-based attributes, it might be
necessary to access the actual attribute value. This can be achieved using the
`znflow.get_attribute` method, which supports all features from `getattr` and
can be imported as such:

```python
from znflow import get_attribute as getattr
```

Here's an example of how to use `znflow.get_attribute`:

```python
import znflow

class POW2(znflow.Node):
    """Compute the square of x."""
    x_factor: float = 0.5
    results: float = None
    _x: float = None

    @property
    def x(self):
        return self._x

    @x.setter
    def x(self, value):
        # using "self._x = value * self.x_factor" inside "znflow.DiGraph()" would run
        # "value * Connector(self, "x_factor")" which is not possible (TypeError)
        # therefore we use znflow.get_attribute.
        self._x = value * znflow.get_attribute(self, "x_factor")

    def run(self):
        self.results = self.x**2

with znflow.DiGraph() as graph:
    n1 = POW2()
    n1.x = 4.0

graph.run()
assert n1.results == 4.0

```

Instead, you can also use the `znflow.disable_graph` decorator / context manager
to disable the graph for a specific block of code or the `znflow.Property` as a
drop-in replacement for `property`.

### Groups

It is possible to create groups of `znflow.nodify` or `znflow.Nodes` independent
from the graph structure. To create a group you can use
`with graph.group(<name>)`. To access the group members, use
`graph.get_group(<name>) -> list`.

```python
import znflow

@znflow.nodify
def compute_mean(x, y):
    return (x + y) / 2

graph = znflow.DiGraph()

with graph.group("grp1"):
    n1 = compute_mean(2, 4)

assert n1.uuid in graph.get_group("grp1")
```

## Supported Frameworks

ZnFlow includes tests to ensure compatibility with:

- "Plain classes"
- `dataclasses`
- `ZnInit`
- `attrs`

It is currently **not** compatible with pydantic. I don't know what pydantic
does internally and wasn't able to find a workaround.

            

Raw data

            {
    "_id": null,
    "home_page": "",
    "name": "znflow",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.8,<4.0",
    "maintainer_email": "",
    "keywords": "",
    "author": "zincwarecode",
    "author_email": "zincwarecode@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/f7/d7/5cc3e4046f029baefed575320d846e86a2b00dd9075adf7c07be6ebb5e4b/znflow-0.1.14.tar.gz",
    "platform": null,
    "description": "[![zincware](https://img.shields.io/badge/Powered%20by-zincware-darkcyan)](https://github.com/zincware)\n[![Coverage Status](https://coveralls.io/repos/github/zincware/ZnFlow/badge.svg?branch=main)](https://coveralls.io/github/zincware/ZnFlow?branch=main)\n[![PyPI version](https://badge.fury.io/py/znflow.svg)](https://badge.fury.io/py/znflow)\n[![Binder](https://mybinder.org/badge_logo.svg)](https://mybinder.org/v2/gh/zincware/ZnFlow/HEAD)\n\n# ZnFlow\n\nThe `ZnFlow` package provides a basic structure for building computational\ngraphs based on functions or classes. It is designed as a lightweight\nabstraction layer to\n\n- learn graph computing.\n- build your own packages on top of it.\n\n## Installation\n\n```shell\npip install znflow\n```\n\n## Usage\n\n### Connecting Functions\n\nWith ZnFlow you can connect functions to each other by using the `@nodify`\ndecorator. Inside the `znflow.DiGraph` the decorator will return a\n`FunctionFuture` object that can be used to connect the function to other nodes.\nThe `FunctionFuture` object will also be used to retrieve the result of the\nfunction. Outside the `znflow.DiGraph` the function behaves as a normal\nfunction.\n\n```python\nimport znflow\n\n@znflow.nodify\ndef compute_mean(x, y):\n    return (x + y) / 2\n\nprint(compute_mean(2, 8))\n# >>> 5\n\nwith znflow.DiGraph() as graph:\n    mean = compute_mean(2, 8)\n\ngraph.run()\nprint(mean.result)\n# >>> 5\n\nwith znflow.DiGraph() as graph:\n    n1 = compute_mean(2, 8)\n    n2 = compute_mean(13, 7)\n    n3 = compute_mean(n1, n2)\n\ngraph.run()\nprint(n3.result)\n# >>> 7.5\n```\n\n### Connecting Classes\n\nIt is also possible to connect classes. They can be connected either directly or\nvia class attributes. This is possible by returning `znflow.Connections` inside\nthe `znflow.DiGraph` context manager. Outside the `znflow.DiGraph` the class\nbehaves as a normal class.\n\nIn the following example we use a dataclass, but it works with all Python\nclasses that inherit from `znflow.Node`.\n\n```python\nimport znflow\nimport dataclasses\n\n@znflow.nodify\ndef compute_mean(x, y):\n    return (x + y) / 2\n\n@dataclasses.dataclass\nclass ComputeMean(znflow.Node):\n    x: float\n    y: float\n\n    results: float = None\n\n    def run(self):\n        self.results = (self.x + self.y) / 2\n\nwith znflow.DiGraph() as graph:\n    n1 = ComputeMean(2, 8)\n    n2 = compute_mean(13, 7)\n    # connecting classes and functions to a Node\n    n3 = ComputeMean(n1.results, n2)\n\ngraph.run()\nprint(n3.results)\n# >>> 7.5\n```\n\n### Dask Support\n\nZnFlow comes with support for [Dask](https://www.dask.org/) to run your graph:\n\n- in parallel.\n- through e.g. SLURM (see https://jobqueue.dask.org/en/latest/api.html).\n- with a nice GUI to track progress.\n\nAll you need to do is install ZnFlow with Dask `pip install znflow[dask]`. We\ncan then extend the example from above. This will run `n1` and `n2` in parallel.\nYou can investigate the graph on the Dask dashboard (typically\nhttp://127.0.0.1:8787/graph or via the client object in Jupyter.)\n\n```python\nimport znflow\nimport dataclasses\nfrom dask.distributed import Client\n\n@znflow.nodify\ndef compute_mean(x, y):\n    return (x + y) / 2\n\n@dataclasses.dataclass\nclass ComputeMean(znflow.Node):\n    x: float\n    y: float\n\n    results: float = None\n\n    def run(self):\n        self.results = (self.x + self.y) / 2\n\nwith znflow.DiGraph() as graph:\n    n1 = ComputeMean(2, 8)\n    n2 = compute_mean(13, 7)\n    # connecting classes and functions to a Node\n    n3 = ComputeMean(n1.results, n2)\n\nclient = Client()\ndeployment = znflow.deployment.Deployment(graph=graph, client=client)\ndeployment.submit_graph()\n\nn3 = deployment.get_results(n3)\nprint(n3)\n# >>> ComputeMean(x=5.0, y=10.0, results=7.5)\n```\n\nWe need to get the updated instance from the Dask worker via\n`Deployment.get_results`. Due to the way Dask works, an inplace update is not\npossible. To retrieve the full graph, you can use\n`Deployment.get_results(graph.nodes)` instead.\n\n### Working with lists\n\nZnFlow supports some special features for working with lists. In the following\nexample we want to `combine` two lists.\n\n```python\nimport znflow\n\n@znflow.nodify\ndef arange(size: int) -> list:\n    return list(range(size))\n\nprint(arange(2) + arange(3))\n>>> [0, 1, 0, 1, 2]\n\nwith znflow.DiGraph() as graph:\n    lst = arange(2) + arange(3)\n\ngraph.run()\nprint(lst.result)\n>>> [0, 1, 0, 1, 2]\n```\n\nThis functionality is restricted to lists. There are some further features that\nallow combining `data: list[list]` by either using\n`data: list = znflow.combine(data)` which has an optional `attribute=None`\nargument to be used in the case of classes or you can simply use\n`data: list = sum(data, [])`.\n\n### Attributes Access\n\nInside the `with znflow.DiGraph()` context manager, accessing class attributes\nyields `znflow.Connector` objects. Sometimes, it may be required to obtain the\nactual attribute value instead of a `znflow.Connector` object. It is not\nrecommended to run class methods inside the `with znflow.DiGraph()` context\nmanager since it should be exclusively used for building the graph and not for\nactual computation.\n\nIn the case of properties or other descriptor-based attributes, it might be\nnecessary to access the actual attribute value. This can be achieved using the\n`znflow.get_attribute` method, which supports all features from `getattr` and\ncan be imported as such:\n\n```python\nfrom znflow import get_attribute as getattr\n```\n\nHere's an example of how to use `znflow.get_attribute`:\n\n```python\nimport znflow\n\nclass POW2(znflow.Node):\n    \"\"\"Compute the square of x.\"\"\"\n    x_factor: float = 0.5\n    results: float = None\n    _x: float = None\n\n    @property\n    def x(self):\n        return self._x\n\n    @x.setter\n    def x(self, value):\n        # using \"self._x = value * self.x_factor\" inside \"znflow.DiGraph()\" would run\n        # \"value * Connector(self, \"x_factor\")\" which is not possible (TypeError)\n        # therefore we use znflow.get_attribute.\n        self._x = value * znflow.get_attribute(self, \"x_factor\")\n\n    def run(self):\n        self.results = self.x**2\n\nwith znflow.DiGraph() as graph:\n    n1 = POW2()\n    n1.x = 4.0\n\ngraph.run()\nassert n1.results == 4.0\n\n```\n\nInstead, you can also use the `znflow.disable_graph` decorator / context manager\nto disable the graph for a specific block of code or the `znflow.Property` as a\ndrop-in replacement for `property`.\n\n### Groups\n\nIt is possible to create groups of `znflow.nodify` or `znflow.Nodes` independent\nfrom the graph structure. To create a group you can use\n`with graph.group(<name>)`. To access the group members, use\n`graph.get_group(<name>) -> list`.\n\n```python\nimport znflow\n\n@znflow.nodify\ndef compute_mean(x, y):\n    return (x + y) / 2\n\ngraph = znflow.DiGraph()\n\nwith graph.group(\"grp1\"):\n    n1 = compute_mean(2, 4)\n\nassert n1.uuid in graph.get_group(\"grp1\")\n```\n\n## Supported Frameworks\n\nZnFlow includes tests to ensure compatibility with:\n\n- \"Plain classes\"\n- `dataclasses`\n- `ZnInit`\n- `attrs`\n\nIt is currently **not** compatible with pydantic. I don't know what pydantic\ndoes internally and wasn't able to find a workaround.\n",
    "bugtrack_url": null,
    "license": "Apache-2.0",
    "summary": "A general purpose framework for building and running computational graphs.",
    "version": "0.1.14",
    "project_urls": null,
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "132345235450b43b74f771f385c127aeab64141ff3ab4220276aca180cdc6f64",
                "md5": "6e5753fb3f79834cd7b9a956727b0e93",
                "sha256": "f94f21cdaece949754e6dd5beaedfb078b9331ca49e32d9a2dfaa4ac1d8f8324"
            },
            "downloads": -1,
            "filename": "znflow-0.1.14-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "6e5753fb3f79834cd7b9a956727b0e93",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.8,<4.0",
            "size": 20360,
            "upload_time": "2023-09-21T12:55:47",
            "upload_time_iso_8601": "2023-09-21T12:55:47.333472Z",
            "url": "https://files.pythonhosted.org/packages/13/23/45235450b43b74f771f385c127aeab64141ff3ab4220276aca180cdc6f64/znflow-0.1.14-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "f7d75cc3e4046f029baefed575320d846e86a2b00dd9075adf7c07be6ebb5e4b",
                "md5": "863c5eef5ce767f46d56a460f88db186",
                "sha256": "bf85dbb4c816a3c1ae98ed62f75feb10a22032e8bccf8d016ee6d406873c9c03"
            },
            "downloads": -1,
            "filename": "znflow-0.1.14.tar.gz",
            "has_sig": false,
            "md5_digest": "863c5eef5ce767f46d56a460f88db186",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8,<4.0",
            "size": 19232,
            "upload_time": "2023-09-21T12:55:49",
            "upload_time_iso_8601": "2023-09-21T12:55:49.015739Z",
            "url": "https://files.pythonhosted.org/packages/f7/d7/5cc3e4046f029baefed575320d846e86a2b00dd9075adf7c07be6ebb5e4b/znflow-0.1.14.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2023-09-21 12:55:49",
    "github": false,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "lcname": "znflow"
}
        
Elapsed time: 0.20332s