symphonizer


Namesymphonizer JSON
Version 0.1.2 PyPI version JSON
download
home_pagehttps://github.com/robertbetts/Symphonizer
SummaryOrchestrate anything with low overhead, fast, synchronized scheduling.
upload_time2023-09-21 16:35:32
maintainer
docs_urlNone
authorRobert Betts
requires_python>=3.9,<3.12
licenseApache-2.0
keywords python ai llm agents orchestration process-management task-management dag process-flow
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Symphonizer
## Task Orchestration using Directed Acyclic Graphs

[![codecov](https://codecov.io/gh/robertbetts/nuropb/branch/main/graph/badge.svg?token=DVSBZY794D)](https://codecov.io/gh/robertbetts/Symphonizer)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
[![CodeFactor](https://www.codefactor.io/repository/github/robertbetts/symphonizer/badge)](https://www.codefactor.io/repository/github/robertbetts/symphonizer)
[![License: Apache 2.0](https://img.shields.io/pypi/l/giteo)](https://www.apache.org/licenses/LICENSE-2.0.txt)

Orchestrate anything with low overhead, fast, synchronized scheduling. Symphonizer is very well suited to orchestrating 
distributed API requests and dependency ordered tasks. This makes Symphonizer well suited in orchestrating machine learning
for model execution, LLVM agent chaining, and any other processes/tasks that can be represented as a directed acyclic graph. 

**Ideal use cases include for Symphonizer:**
* Idempotent, ordered, processes and flows
* Remote API requests
* Orchestrating calls to hybrid cloud, Lambda and Serverless Functions 
* Machine learning and model execution
* LLVM agent chaining
* Any dependency ordered API requests

**Use cases to avoid:**
* Distributed transactions with ACID guarantees
* Data pipelines with large data payloads
* Symphonizer is NOT a workflow engine
* Symphonizer is NOT a distributed transaction coordinator
* Symphonizer is NOT a transaction database

**Use cases that may require consideration before using Symphonizer:**
* Distributed transactions with eventual consistency
* Very long running processes - Hours, Days

## Symphonizer in the wild
Symphonizer was initially developed to facilitate the orchestration of autonomous applications in an unstable
distributed system and has subsequently found a nice home in orchestrating machine LLM agents and prompt 
engineering.

## Symphonizer Model
**DAGNote**: Hashable -> Is a Graph Node with a JSON serializable payload. 

**NodeRunner**: Callable[..., Any] -> Is a Function or callable class that takes a DAGNote payload as an argument.
- NodeRunners are most effective when their actions are idempotent.
- NodeRunners are intended to serve as execution proxies having low compute overhead. 
- An instance of a node runner can only be executed once and is then immutable, this is to ensure idempotency. 
  In order to retry or execute a NodeRunner again, a new instance is required. the NodeRunner.retry_task() method 
  will clone a new child NodeRunner. 

**Composition**: object -> Is a class instantiated with a Dict[Hashable, Set[Hashable]] that represents 
a directed acyclic graph. Read further in the graphlib standard library documentation.
- Each DAGNote's NodeRunner is executed in topological order until all nodes have been executed.
- All execution is synchronized and asynchronously run in memory. 

**Perform**: object -> Is a class that orchestrates the execution of a Compositions. Perform is instantiated with a
high water and low water mark. When the number of concurrent running Compositions reach the high water mark, new 
composition execution is blocked until the number of running Compositions falls below the low water mark. During
this time all new Compositions are queued until the low water mark is reached. 

## Getting started

```
pip install symphonizer
```

## Example

Here is a basic end to end example of Symphonizer. This example is also available in the examples directory.
```python  
import asyncio
import time
import random

from symphonizer.node_runner import NodeRunner
from symphonizer.composition import Composition, DAGNote
from symphonizer.perform import Perform
```
With this example we will customise the Composition class in and add a sleep time to the processing of each node.  
```python
class Compose(Composition):
    @classmethod
    def configure_node_runner(cls, node: DAGNote):
        """ Configure the node runner as appropriate for the DAGNote, the flexibility of this method allows for
        different node runners to be used for different nodes in the DAG or alternatively the same node runner
        where the execute method is configured differently for different nodes in the DAG.
        """
        async def execute(**params):
            sleep_time = random.uniform(0.001, 0.1)
            # print(f"Processing node {node}, sleep for {sleep_time}")
            await asyncio.sleep(sleep_time)

        return NodeRunner().prepare(node=node).run(execute)
```
Next is the main body of the example which you would ammend to suit your needs. 
```python
async def main():
    sample_graph = {
        DAGNote("D"): {DAGNote("B"), DAGNote("C")},
        DAGNote("C"): {DAGNote("A")},
        DAGNote("B"): {DAGNote("A")}
    }
    dag_count_target = 1000
    dag_count_completed = 0
    dag_tracker = {}
    test_stop_future = asyncio.Future()
    perform = Perform()
    start_time = time.time()

    def scheduler_done_cb(instance, status, error=None, elapsed_time: float = 0):
        nonlocal dag_count_completed
        dag_tracker.pop(instance.instance_id, None)
        dag_count_completed += 1
        if dag_count_completed == dag_count_target:
            test_stop_future.set_result(None)

    async def schedule_dag():
        dag = Compose(
            sample_graph,
            schedule_done_cb=scheduler_done_cb,
        )
        asyncio.create_task(perform.add(dag))
        await asyncio.sleep(0)

    print("Starting to schedule DAGs")

    _ = list([await schedule_dag() for _ in range(dag_count_target)])

    print(f"added {dag_count_target} DAGs to perform")
    await asyncio.sleep(0.001)
    await test_stop_future
    end_time = time.time()
    print(f"All DAGs processed in {end_time-start_time} seconds")
    print(f"Average DAG processing time {round((end_time-start_time)/dag_count_target, 4)} seconds")

if __name__ == "__main__":
    asyncio.run(main())
```

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/robertbetts/Symphonizer",
    "name": "symphonizer",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.9,<3.12",
    "maintainer_email": "",
    "keywords": "python,ai,llm,agents,orchestration,process-management,task-management,dag,process-flow",
    "author": "Robert Betts",
    "author_email": "robert.betts@yahoo.com",
    "download_url": "https://files.pythonhosted.org/packages/13/9c/5219c1190c5a50a5f3000d9c123747b70dc9b9b74bdf19e9643d3188ea78/symphonizer-0.1.2.tar.gz",
    "platform": null,
    "description": "# Symphonizer\n## Task Orchestration using Directed Acyclic Graphs\n\n[![codecov](https://codecov.io/gh/robertbetts/nuropb/branch/main/graph/badge.svg?token=DVSBZY794D)](https://codecov.io/gh/robertbetts/Symphonizer)\n[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)\n[![CodeFactor](https://www.codefactor.io/repository/github/robertbetts/symphonizer/badge)](https://www.codefactor.io/repository/github/robertbetts/symphonizer)\n[![License: Apache 2.0](https://img.shields.io/pypi/l/giteo)](https://www.apache.org/licenses/LICENSE-2.0.txt)\n\nOrchestrate anything with low overhead, fast, synchronized scheduling. Symphonizer is very well suited to orchestrating \ndistributed API requests and dependency ordered tasks. This makes Symphonizer well suited in orchestrating machine learning\nfor model execution, LLVM agent chaining, and any other processes/tasks that can be represented as a directed acyclic graph. \n\n**Ideal use cases include for Symphonizer:**\n* Idempotent, ordered, processes and flows\n* Remote API requests\n* Orchestrating calls to hybrid cloud, Lambda and Serverless Functions \n* Machine learning and model execution\n* LLVM agent chaining\n* Any dependency ordered API requests\n\n**Use cases to avoid:**\n* Distributed transactions with ACID guarantees\n* Data pipelines with large data payloads\n* Symphonizer is NOT a workflow engine\n* Symphonizer is NOT a distributed transaction coordinator\n* Symphonizer is NOT a transaction database\n\n**Use cases that may require consideration before using Symphonizer:**\n* Distributed transactions with eventual consistency\n* Very long running processes - Hours, Days\n\n## Symphonizer in the wild\nSymphonizer was initially developed to facilitate the orchestration of autonomous applications in an unstable\ndistributed system and has subsequently found a nice home in orchestrating machine LLM agents and prompt \nengineering.\n\n## Symphonizer Model\n**DAGNote**: Hashable -> Is a Graph Node with a JSON serializable payload. \n\n**NodeRunner**: Callable[..., Any] -> Is a Function or callable class that takes a DAGNote payload as an argument.\n- NodeRunners are most effective when their actions are idempotent.\n- NodeRunners are intended to serve as execution proxies having low compute overhead. \n- An instance of a node runner can only be executed once and is then immutable, this is to ensure idempotency. \n  In order to retry or execute a NodeRunner again, a new instance is required. the NodeRunner.retry_task() method \n  will clone a new child NodeRunner. \n\n**Composition**: object -> Is a class instantiated with a Dict[Hashable, Set[Hashable]] that represents \na directed acyclic graph. Read further in the graphlib standard library documentation.\n- Each DAGNote's NodeRunner is executed in topological order until all nodes have been executed.\n- All execution is synchronized and asynchronously run in memory. \n\n**Perform**: object -> Is a class that orchestrates the execution of a Compositions. Perform is instantiated with a\nhigh water and low water mark. When the number of concurrent running Compositions reach the high water mark, new \ncomposition execution is blocked until the number of running Compositions falls below the low water mark. During\nthis time all new Compositions are queued until the low water mark is reached. \n\n## Getting started\n\n```\npip install symphonizer\n```\n\n## Example\n\nHere is a basic end to end example of Symphonizer. This example is also available in the examples directory.\n```python  \nimport asyncio\nimport time\nimport random\n\nfrom symphonizer.node_runner import NodeRunner\nfrom symphonizer.composition import Composition, DAGNote\nfrom symphonizer.perform import Perform\n```\nWith this example we will customise the Composition class in and add a sleep time to the processing of each node.  \n```python\nclass Compose(Composition):\n    @classmethod\n    def configure_node_runner(cls, node: DAGNote):\n        \"\"\" Configure the node runner as appropriate for the DAGNote, the flexibility of this method allows for\n        different node runners to be used for different nodes in the DAG or alternatively the same node runner\n        where the execute method is configured differently for different nodes in the DAG.\n        \"\"\"\n        async def execute(**params):\n            sleep_time = random.uniform(0.001, 0.1)\n            # print(f\"Processing node {node}, sleep for {sleep_time}\")\n            await asyncio.sleep(sleep_time)\n\n        return NodeRunner().prepare(node=node).run(execute)\n```\nNext is the main body of the example which you would ammend to suit your needs. \n```python\nasync def main():\n    sample_graph = {\n        DAGNote(\"D\"): {DAGNote(\"B\"), DAGNote(\"C\")},\n        DAGNote(\"C\"): {DAGNote(\"A\")},\n        DAGNote(\"B\"): {DAGNote(\"A\")}\n    }\n    dag_count_target = 1000\n    dag_count_completed = 0\n    dag_tracker = {}\n    test_stop_future = asyncio.Future()\n    perform = Perform()\n    start_time = time.time()\n\n    def scheduler_done_cb(instance, status, error=None, elapsed_time: float = 0):\n        nonlocal dag_count_completed\n        dag_tracker.pop(instance.instance_id, None)\n        dag_count_completed += 1\n        if dag_count_completed == dag_count_target:\n            test_stop_future.set_result(None)\n\n    async def schedule_dag():\n        dag = Compose(\n            sample_graph,\n            schedule_done_cb=scheduler_done_cb,\n        )\n        asyncio.create_task(perform.add(dag))\n        await asyncio.sleep(0)\n\n    print(\"Starting to schedule DAGs\")\n\n    _ = list([await schedule_dag() for _ in range(dag_count_target)])\n\n    print(f\"added {dag_count_target} DAGs to perform\")\n    await asyncio.sleep(0.001)\n    await test_stop_future\n    end_time = time.time()\n    print(f\"All DAGs processed in {end_time-start_time} seconds\")\n    print(f\"Average DAG processing time {round((end_time-start_time)/dag_count_target, 4)} seconds\")\n\nif __name__ == \"__main__\":\n    asyncio.run(main())\n```\n",
    "bugtrack_url": null,
    "license": "Apache-2.0",
    "summary": "Orchestrate anything with low overhead, fast, synchronized scheduling.",
    "version": "0.1.2",
    "project_urls": {
        "Homepage": "https://github.com/robertbetts/Symphonizer",
        "Repository": "https://github.com/robertbetts/Symphonizer"
    },
    "split_keywords": [
        "python",
        "ai",
        "llm",
        "agents",
        "orchestration",
        "process-management",
        "task-management",
        "dag",
        "process-flow"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "bb1bff9d06dac333d372b39c45e2eec5002f883b034a617cc8362cc1268a89ba",
                "md5": "6d0d05ab21d35464013f569b58b0076d",
                "sha256": "f18c7ca83fe5169af5fc5ad600afd129660bc468d2e4f44ca586809f556de565"
            },
            "downloads": -1,
            "filename": "symphonizer-0.1.2-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "6d0d05ab21d35464013f569b58b0076d",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.9,<3.12",
            "size": 19695,
            "upload_time": "2023-09-21T16:35:30",
            "upload_time_iso_8601": "2023-09-21T16:35:30.461024Z",
            "url": "https://files.pythonhosted.org/packages/bb/1b/ff9d06dac333d372b39c45e2eec5002f883b034a617cc8362cc1268a89ba/symphonizer-0.1.2-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "139c5219c1190c5a50a5f3000d9c123747b70dc9b9b74bdf19e9643d3188ea78",
                "md5": "643ad649947aa327ce142956f2fd7e39",
                "sha256": "0c66cd4981acdac3e14c23dc4138ca9e2a7de905b500a1fe79c4994a0c79c34b"
            },
            "downloads": -1,
            "filename": "symphonizer-0.1.2.tar.gz",
            "has_sig": false,
            "md5_digest": "643ad649947aa327ce142956f2fd7e39",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.9,<3.12",
            "size": 18887,
            "upload_time": "2023-09-21T16:35:32",
            "upload_time_iso_8601": "2023-09-21T16:35:32.068617Z",
            "url": "https://files.pythonhosted.org/packages/13/9c/5219c1190c5a50a5f3000d9c123747b70dc9b9b74bdf19e9643d3188ea78/symphonizer-0.1.2.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2023-09-21 16:35:32",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "robertbetts",
    "github_project": "Symphonizer",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "symphonizer"
}
        
Elapsed time: 0.17215s