# Dria Workflows
Dria Workflows enables the creation of workflows for Dria Agents.
## Installation
You can install Dria Workflows using pip:
```bash
pip install dria_workflows
````
## Usage Example
Here's a simple example of how to use Dria Workflows:
```python
import logging
from dria_workflows import WorkflowBuilder, Operator, Write, Edge, validate_workflow_json
def main():
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
builder = WorkflowBuilder()
# Add a step to your workflow
builder.generative_step(id="write_poem", prompt="Write a poem as if you are Kahlil Gibran", operator=Operator.GENERATION, outputs=[Write.new("poem")])
# Define the flow of your workflow
flow = [Edge(source="write_poem", target="_end")]
builder.flow(flow)
# Set the return value of your workflow
builder.set_return_value("poem")
# Build your workflow
workflow = builder.build()
# Validate your workflow
validate_workflow_json(workflow.model_dump_json(indent=2, exclude_unset=True, exclude_none=True))
# Save workflow
workflow.save("poem_workflow.json")
if __name__ == "__main__":
main()
```
# Here is a more complex workflow
```python
import logging
from dria_workflows import WorkflowBuilder, ConditionBuilder, Operator, Write, GetAll, Read, Push, Edge, Expression, validate_workflow_json
def main():
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Give a starting memory as input
builder = WorkflowBuilder(memory={"topic_1":"Linear Algebra", "topic_2":"CUDA"})
# Add steps to your workflow
builder.generative_step(id="create_query", prompt="Write down a search query related to following topics: {{topic_1}} and {{topic_2}}. If any, avoid asking questions asked before: {{history}}", operator=Operator.GENERATION, inputs=[GetAll.new("history", False)], outputs=[Write.new("search_query")])
builder.generative_step(id="search", prompt="{{search_query}}", operator=Operator.FUNCTION_CALLING, outputs=[Write.new("result"), Push.new("history")])
builder.generative_step(id="evaluate", prompt="Evaluate if search result is related and high quality to given question by saying Yes or No. Question: {{search_query}} , Search Result: {{result}}. Only output Yes or No and nothing else.", operator=Operator.GENERATION, outputs=[Write.new("is_valid")])
# Define the flow of your workflow
flow = [
Edge(source="create_query", target="search"),
Edge(source="search", target="evaluate"),
Edge(source="evaluate", target="_end", condition=ConditionBuilder.build(expected="Yes", target_if_not="create_query", expression=Expression.CONTAINS, input=Read.new("is_valid", True))),
]
builder.flow(flow)
# Set the return value of your workflow
builder.set_return_value("result")
# Build your workflow
workflow = builder.build()
validate_workflow_json(workflow.model_dump_json(indent=2, exclude_unset=True, exclude_none=True))
workflow.save("search_workflow.json")
if __name__ == "__main__":
main()
```
Detailed docs soon.
[andthattoo](https://x.com/andthatto)
Raw data
{
"_id": null,
"home_page": "https://github.com/firstbatchxyz/dria-workflows",
"name": "dria_workflows",
"maintainer": null,
"docs_url": null,
"requires_python": "<4.0,>=3.10",
"maintainer_email": null,
"keywords": "workflows, dria, agents, llm, ai",
"author": "andthattoo",
"author_email": "omer@firstbatch.xyz",
"download_url": "https://files.pythonhosted.org/packages/a1/bf/2a52e4f615090a218e7a769574050c6e7c1b9700b3cf1d4da43aafdd7270/dria_workflows-0.3.5.tar.gz",
"platform": null,
"description": "# Dria Workflows\n\nDria Workflows enables the creation of workflows for Dria Agents.\n\n## Installation\n\nYou can install Dria Workflows using pip:\n```bash\npip install dria_workflows\n````\n\n## Usage Example\n\nHere's a simple example of how to use Dria Workflows:\n\n```python\nimport logging\nfrom dria_workflows import WorkflowBuilder, Operator, Write, Edge, validate_workflow_json\n\n\ndef main():\n logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')\n\n builder = WorkflowBuilder()\n\n # Add a step to your workflow\n builder.generative_step(id=\"write_poem\", prompt=\"Write a poem as if you are Kahlil Gibran\", operator=Operator.GENERATION, outputs=[Write.new(\"poem\")])\n \n # Define the flow of your workflow\n flow = [Edge(source=\"write_poem\", target=\"_end\")]\n builder.flow(flow)\n \n # Set the return value of your workflow\n builder.set_return_value(\"poem\")\n \n # Build your workflow\n workflow = builder.build()\n\n # Validate your workflow\n validate_workflow_json(workflow.model_dump_json(indent=2, exclude_unset=True, exclude_none=True))\n\n # Save workflow\n workflow.save(\"poem_workflow.json\")\n\n\nif __name__ == \"__main__\":\n main()\n```\n\n# Here is a more complex workflow\n\n```python\nimport logging\nfrom dria_workflows import WorkflowBuilder, ConditionBuilder, Operator, Write, GetAll, Read, Push, Edge, Expression, validate_workflow_json\n\n\ndef main():\n logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')\n\n # Give a starting memory as input\n builder = WorkflowBuilder(memory={\"topic_1\":\"Linear Algebra\", \"topic_2\":\"CUDA\"})\n\n # Add steps to your workflow\n builder.generative_step(id=\"create_query\", prompt=\"Write down a search query related to following topics: {{topic_1}} and {{topic_2}}. If any, avoid asking questions asked before: {{history}}\", operator=Operator.GENERATION, inputs=[GetAll.new(\"history\", False)], outputs=[Write.new(\"search_query\")])\n builder.generative_step(id=\"search\", prompt=\"{{search_query}}\", operator=Operator.FUNCTION_CALLING, outputs=[Write.new(\"result\"), Push.new(\"history\")])\n builder.generative_step(id=\"evaluate\", prompt=\"Evaluate if search result is related and high quality to given question by saying Yes or No. Question: {{search_query}} , Search Result: {{result}}. Only output Yes or No and nothing else.\", operator=Operator.GENERATION, outputs=[Write.new(\"is_valid\")])\n\n # Define the flow of your workflow\n flow = [\n Edge(source=\"create_query\", target=\"search\"),\n Edge(source=\"search\", target=\"evaluate\"),\n Edge(source=\"evaluate\", target=\"_end\", condition=ConditionBuilder.build(expected=\"Yes\", target_if_not=\"create_query\", expression=Expression.CONTAINS, input=Read.new(\"is_valid\", True))),\n ]\n builder.flow(flow)\n\n # Set the return value of your workflow\n builder.set_return_value(\"result\")\n\n # Build your workflow\n workflow = builder.build()\n validate_workflow_json(workflow.model_dump_json(indent=2, exclude_unset=True, exclude_none=True))\n\n workflow.save(\"search_workflow.json\")\n\n\nif __name__ == \"__main__\":\n main()\n```\n\nDetailed docs soon.\n[andthattoo](https://x.com/andthatto)\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Enables creation of workflows for Dria Agents",
"version": "0.3.5",
"project_urls": {
"Homepage": "https://github.com/firstbatchxyz/dria-workflows",
"Repository": "https://github.com/firstbatchxyz/dria-workflows"
},
"split_keywords": [
"workflows",
" dria",
" agents",
" llm",
" ai"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "8bb1965a0f98d4ee1792e8cd9bc86fd8ffd14b35908f9b37cbbbb0abe4f93c94",
"md5": "38732e30b23de9de82678f16bc343cd5",
"sha256": "ea74810375ef17b13a65366d185202974b452ddc00662bfa65c98075c642f3bd"
},
"downloads": -1,
"filename": "dria_workflows-0.3.5-py3-none-any.whl",
"has_sig": false,
"md5_digest": "38732e30b23de9de82678f16bc343cd5",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<4.0,>=3.10",
"size": 28090,
"upload_time": "2024-12-04T18:42:09",
"upload_time_iso_8601": "2024-12-04T18:42:09.961927Z",
"url": "https://files.pythonhosted.org/packages/8b/b1/965a0f98d4ee1792e8cd9bc86fd8ffd14b35908f9b37cbbbb0abe4f93c94/dria_workflows-0.3.5-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "a1bf2a52e4f615090a218e7a769574050c6e7c1b9700b3cf1d4da43aafdd7270",
"md5": "420ef98556cd0876da913c12bb20f201",
"sha256": "d6464c8cd1f511ad78933f1a2695b3a45a208033b7bfd634e5c378e4effe9d6a"
},
"downloads": -1,
"filename": "dria_workflows-0.3.5.tar.gz",
"has_sig": false,
"md5_digest": "420ef98556cd0876da913c12bb20f201",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<4.0,>=3.10",
"size": 21621,
"upload_time": "2024-12-04T18:42:11",
"upload_time_iso_8601": "2024-12-04T18:42:11.720436Z",
"url": "https://files.pythonhosted.org/packages/a1/bf/2a52e4f615090a218e7a769574050c6e7c1b9700b3cf1d4da43aafdd7270/dria_workflows-0.3.5.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-12-04 18:42:11",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "firstbatchxyz",
"github_project": "dria-workflows",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"lcname": "dria_workflows"
}