# LLM Flow Engine
[π¨π³ δΈζηζ¬](https://github.com/liguobao/llm-flow-engine/blob/main/docs/README_zh.md) | πΊπΈ English
A DSL-based LLM workflow engine that supports multi-model collaboration, dependency management, and result aggregation. Define complex AI workflows through YAML configuration files and enable collaborative work between multiple LLM models.
## β¨ Key Features
- **π§ DSL Workflow Definition** - Define complex LLM workflows using YAML format
- **π DAG Dependency Management** - Support directed acyclic graph node dependencies and parallel execution
- **π Placeholder Resolution** - Use `${node.output}` syntax for inter-node data passing
- **π€ Multi-Model Support** - Support calling different LLM models and result aggregation
- **βοΈ Flexible Configuration** - Custom model configuration and parameter management
- **β‘ Async Execution** - Efficient asynchronous task processing and error retry
- **π Result Aggregation** - Built-in various result merging and analysis functions
- **π§ Extensible Architecture** - Support custom functions and model adapters
## π Quick Start
### Prerequisites
- Python 3.8+
- aiohttp >= 3.8.0
- pyyaml >= 6.0
- loguru >= 0.7.0
### Installation
```bash
pip install llm-flow-engine
```
### Basic Usage
```python
import asyncio
from llm_flow_engine import FlowEngine, ModelConfigProvider
async def main():
# 1. Configure models (auto-discovery)
provider = await ModelConfigProvider.from_host_async(
api_host="http://127.0.0.1:11434",
platform="ollama"
)
# 2. Create engine
engine = FlowEngine(provider)
# 3. Execute workflow
dsl_content = """
metadata:
version: "1.0"
description: "Simple Q&A workflow"
input:
type: "start"
name: "workflow_input"
data:
question: ""
executors:
- name: answer_step
type: task
func: llm_simple_call
custom_vars:
user_input: "${workflow_input.question}"
model: "llama2"
output:
type: "end"
name: "workflow_output"
data:
answer: "${answer_step.output}"
"""
result = await engine.execute_dsl(
dsl_content,
inputs={"workflow_input": {"question": "What is AI?"}}
)
print(f"Result: {result}")
if __name__ == "__main__":
asyncio.run(main())
```
## π Project Structure
```text
llm_flow_engine/
βββ __init__.py # Main package initialization
βββ flow_engine.py # Main engine entry point
βββ dsl_loader.py # DSL parser
βββ workflow.py # Unified workflow management
βββ executor.py # Task executor
βββ executor_result.py # Execution result wrapper
βββ builtin_functions.py # Built-in function library
βββ model_config.py # Model configuration management
βββ utils.py # Utility functions
examples/
βββ demo_example.py # Complete example demo
βββ demo_qa.yaml # Workflow DSL example
βββ model_config_demo.py # Model configuration demo
```
## π§ Model Configuration
### Method 1: Auto-Discovery (Recommended)
```python
# Auto-discover Ollama models
provider = await ModelConfigProvider.from_host_async(
api_host="http://127.0.0.1:11434",
platform="ollama"
)
```
### Method 2: Manual Configuration
```python
# Create provider and add models manually
provider = ModelConfigProvider()
# Add OpenAI model
provider.add_single_model(
model_name="gpt-4",
platform="openai",
api_url="https://api.openai.com/v1/chat/completions",
api_key="your-api-key",
max_tokens=4096
)
# Add custom model
provider.add_single_model(
model_name="custom-llm",
platform="openai_compatible",
api_url="https://your-api.com/v1/chat/completions",
api_key="your-api-key",
max_tokens=2048
)
```
## π DSL Workflow Format
### Basic Structure
```yaml
metadata:
version: "1.0"
description: "Workflow description"
input:
type: "start"
name: "workflow_input"
data:
key: "value"
executors:
- name: task1
type: task
func: function_name
custom_vars:
param1: "${input.key}"
param2: "static_value"
depends_on: [] # Dependencies
timeout: 30 # Timeout in seconds
retry: 2 # Retry count
output:
type: "end"
name: "workflow_output"
data:
result: "${task1.output}"
```
### Multi-Model Workflow Example
```yaml
metadata:
version: "1.0"
description: "Multi-model Q&A with analysis"
input:
type: "start"
name: "workflow_input"
data:
question: ""
executors:
# Parallel model calls
- name: model1_answer
type: task
func: llm_simple_call
custom_vars:
user_input: "${workflow_input.question}"
model: "llama2"
timeout: 30
- name: model2_answer
type: task
func: llm_simple_call
custom_vars:
user_input: "${workflow_input.question}"
model: "mistral"
timeout: 30
# Analysis step (depends on both models)
- name: analysis
type: task
func: llm_simple_call
custom_vars:
user_input: "Compare these answers: 1) ${model1_answer.output} 2) ${model2_answer.output}"
model: "llama2"
depends_on: ["model1_answer", "model2_answer"]
output:
type: "end"
name: "workflow_output"
data:
original_question: "${workflow_input.question}"
model1_response: "${model1_answer.output}"
model2_response: "${model2_answer.output}"
analysis: "${analysis.output}"
```
## π Built-in Functions
- **`llm_simple_call`** - Basic LLM model call
- **`text_process`** - Text preprocessing and formatting
- **`result_summary`** - Multi-result summarization
- **`data_transform`** - Data format transformation
## π§ͺ Running Examples
```bash
# Basic usage demo
python examples/demo_example.py
# Model configuration demo
python examples/model_config_demo.py
# Package usage demo
python examples/package_demo.py
```
## π Supported Platforms
- **Ollama** - Local LLM models
- **OpenAI** - GPT series models
- **OpenAI Compatible** - Any OpenAI-compatible API
- **Anthropic** - Claude series models
- **Custom** - Custom API endpoints
## π οΈ Development
### Setup Development Environment
```bash
git clone https://github.com/liguobao/llm-flow-engine.git
cd llm-flow-engine
# Install development dependencies
pip install -e ".[dev]"
# Run tests
pytest
# Format code
black .
```
### Project Validation
```bash
# Validate project structure and configuration
python validate_project.py
```
## π License
This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.
## π€ Contributing
Contributions are welcome! Please feel free to submit a Pull Request.
## π Support
- π Issues: [GitHub Issues](https://github.com/liguobao/llm-flow-engine/issues)
- π Documentation: [GitHub Wiki](https://github.com/liguobao/llm-flow-engine/wiki)
## π Star History
If you find this project helpful, please consider giving it a star! β
---
Made with β€οΈ by the LLM Flow Engine Team
Raw data
{
"_id": null,
"home_page": null,
"name": "llm-flow-engine",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.8",
"maintainer_email": "liguobao <codelover@qq.com>",
"keywords": "llm, workflow, dsl, ai, automation, multi-model, async, dag, chatgpt, ollama",
"author": null,
"author_email": "liguobao <codelover@qq.com>",
"download_url": "https://files.pythonhosted.org/packages/af/ff/6fdc15099e4151ccf9ff3b76348e9146918360866735c1f77d8de8d78609/llm_flow_engine-0.7.3.tar.gz",
"platform": null,
"description": "# LLM Flow Engine\n\n[\ud83c\udde8\ud83c\uddf3 \u4e2d\u6587\u7248\u672c](https://github.com/liguobao/llm-flow-engine/blob/main/docs/README_zh.md) | \ud83c\uddfa\ud83c\uddf8 English\n\nA DSL-based LLM workflow engine that supports multi-model collaboration, dependency management, and result aggregation. Define complex AI workflows through YAML configuration files and enable collaborative work between multiple LLM models.\n\n## \u2728 Key Features\n\n- **\ud83d\udd27 DSL Workflow Definition** - Define complex LLM workflows using YAML format\n- **\ud83d\udcca DAG Dependency Management** - Support directed acyclic graph node dependencies and parallel execution\n- **\ud83d\udd17 Placeholder Resolution** - Use `${node.output}` syntax for inter-node data passing\n- **\ud83e\udd16 Multi-Model Support** - Support calling different LLM models and result aggregation\n- **\u2699\ufe0f Flexible Configuration** - Custom model configuration and parameter management\n- **\u26a1 Async Execution** - Efficient asynchronous task processing and error retry\n- **\ud83d\udcc8 Result Aggregation** - Built-in various result merging and analysis functions\n- **\ud83d\udd27 Extensible Architecture** - Support custom functions and model adapters\n\n## \ud83d\ude80 Quick Start\n\n### Prerequisites\n\n- Python 3.8+\n- aiohttp >= 3.8.0\n- pyyaml >= 6.0\n- loguru >= 0.7.0\n\n### Installation\n\n```bash\npip install llm-flow-engine\n```\n\n### Basic Usage\n\n```python\nimport asyncio\nfrom llm_flow_engine import FlowEngine, ModelConfigProvider\n\nasync def main():\n # 1. Configure models (auto-discovery)\n provider = await ModelConfigProvider.from_host_async(\n api_host=\"http://127.0.0.1:11434\", \n platform=\"ollama\"\n )\n \n # 2. Create engine\n engine = FlowEngine(provider)\n \n # 3. Execute workflow\n dsl_content = \"\"\"\n metadata:\n version: \"1.0\"\n description: \"Simple Q&A workflow\"\n \n input:\n type: \"start\"\n name: \"workflow_input\"\n data:\n question: \"\"\n \n executors:\n - name: answer_step\n type: task\n func: llm_simple_call\n custom_vars:\n user_input: \"${workflow_input.question}\"\n model: \"llama2\"\n \n output:\n type: \"end\"\n name: \"workflow_output\"\n data:\n answer: \"${answer_step.output}\"\n \"\"\"\n \n result = await engine.execute_dsl(\n dsl_content, \n inputs={\"workflow_input\": {\"question\": \"What is AI?\"}}\n )\n \n print(f\"Result: {result}\")\n\nif __name__ == \"__main__\":\n asyncio.run(main())\n```\n\n## \ud83d\udccb Project Structure\n\n```text\nllm_flow_engine/\n\u251c\u2500\u2500 __init__.py # Main package initialization\n\u251c\u2500\u2500 flow_engine.py # Main engine entry point\n\u251c\u2500\u2500 dsl_loader.py # DSL parser\n\u251c\u2500\u2500 workflow.py # Unified workflow management\n\u251c\u2500\u2500 executor.py # Task executor\n\u251c\u2500\u2500 executor_result.py # Execution result wrapper\n\u251c\u2500\u2500 builtin_functions.py # Built-in function library\n\u251c\u2500\u2500 model_config.py # Model configuration management\n\u2514\u2500\u2500 utils.py # Utility functions\n\nexamples/\n\u251c\u2500\u2500 demo_example.py # Complete example demo\n\u251c\u2500\u2500 demo_qa.yaml # Workflow DSL example\n\u2514\u2500\u2500 model_config_demo.py # Model configuration demo\n```\n\n## \ud83d\udd27 Model Configuration\n\n### Method 1: Auto-Discovery (Recommended)\n\n```python\n# Auto-discover Ollama models\nprovider = await ModelConfigProvider.from_host_async(\n api_host=\"http://127.0.0.1:11434\",\n platform=\"ollama\"\n)\n```\n\n### Method 2: Manual Configuration\n\n```python\n# Create provider and add models manually\nprovider = ModelConfigProvider()\n\n# Add OpenAI model\nprovider.add_single_model(\n model_name=\"gpt-4\",\n platform=\"openai\",\n api_url=\"https://api.openai.com/v1/chat/completions\",\n api_key=\"your-api-key\",\n max_tokens=4096\n)\n\n# Add custom model\nprovider.add_single_model(\n model_name=\"custom-llm\",\n platform=\"openai_compatible\",\n api_url=\"https://your-api.com/v1/chat/completions\",\n api_key=\"your-api-key\",\n max_tokens=2048\n)\n```\n\n## \ud83d\udcdd DSL Workflow Format\n\n### Basic Structure\n\n```yaml\nmetadata:\n version: \"1.0\"\n description: \"Workflow description\"\n\ninput:\n type: \"start\"\n name: \"workflow_input\"\n data:\n key: \"value\"\n\nexecutors:\n - name: task1\n type: task\n func: function_name\n custom_vars:\n param1: \"${input.key}\"\n param2: \"static_value\"\n depends_on: [] # Dependencies\n timeout: 30 # Timeout in seconds\n retry: 2 # Retry count\n\noutput:\n type: \"end\"\n name: \"workflow_output\"\n data:\n result: \"${task1.output}\"\n```\n\n### Multi-Model Workflow Example\n\n```yaml\nmetadata:\n version: \"1.0\"\n description: \"Multi-model Q&A with analysis\"\n\ninput:\n type: \"start\"\n name: \"workflow_input\"\n data:\n question: \"\"\n\nexecutors:\n # Parallel model calls\n - name: model1_answer\n type: task\n func: llm_simple_call\n custom_vars:\n user_input: \"${workflow_input.question}\"\n model: \"llama2\"\n timeout: 30\n\n - name: model2_answer\n type: task\n func: llm_simple_call\n custom_vars:\n user_input: \"${workflow_input.question}\"\n model: \"mistral\"\n timeout: 30\n\n # Analysis step (depends on both models)\n - name: analysis\n type: task\n func: llm_simple_call\n custom_vars:\n user_input: \"Compare these answers: 1) ${model1_answer.output} 2) ${model2_answer.output}\"\n model: \"llama2\"\n depends_on: [\"model1_answer\", \"model2_answer\"]\n\noutput:\n type: \"end\"\n name: \"workflow_output\"\n data:\n original_question: \"${workflow_input.question}\"\n model1_response: \"${model1_answer.output}\"\n model2_response: \"${model2_answer.output}\"\n analysis: \"${analysis.output}\"\n```\n\n## \ud83d\udd0c Built-in Functions\n\n- **`llm_simple_call`** - Basic LLM model call\n- **`text_process`** - Text preprocessing and formatting\n- **`result_summary`** - Multi-result summarization\n- **`data_transform`** - Data format transformation\n\n## \ud83e\uddea Running Examples\n\n```bash\n# Basic usage demo\npython examples/demo_example.py\n\n# Model configuration demo \npython examples/model_config_demo.py\n\n# Package usage demo\npython examples/package_demo.py\n```\n\n## \ud83d\udcca Supported Platforms\n\n- **Ollama** - Local LLM models\n- **OpenAI** - GPT series models\n- **OpenAI Compatible** - Any OpenAI-compatible API\n- **Anthropic** - Claude series models\n- **Custom** - Custom API endpoints\n\n## \ud83d\udee0\ufe0f Development\n\n### Setup Development Environment\n\n```bash\ngit clone https://github.com/liguobao/llm-flow-engine.git\ncd llm-flow-engine\n\n# Install development dependencies\npip install -e \".[dev]\"\n\n# Run tests\npytest\n\n# Format code\nblack .\n```\n\n### Project Validation\n\n```bash\n# Validate project structure and configuration\npython validate_project.py\n```\n\n## \ud83d\udcc4 License\n\nThis project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.\n\n## \ud83e\udd1d Contributing\n\nContributions are welcome! Please feel free to submit a Pull Request.\n\n## \ud83d\udcde Support\n\n- \ud83d\udc1b Issues: [GitHub Issues](https://github.com/liguobao/llm-flow-engine/issues)\n- \ud83d\udcd6 Documentation: [GitHub Wiki](https://github.com/liguobao/llm-flow-engine/wiki)\n\n## \ud83c\udf1f Star History\n\nIf you find this project helpful, please consider giving it a star! \u2b50\n\n---\n\nMade with \u2764\ufe0f by the LLM Flow Engine Team\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "A DSL-based LLM workflow engine that supports multi-model collaboration, dependency management, and result aggregation",
"version": "0.7.3",
"project_urls": {
"Changelog": "https://github.com/liguobao/llm-flow-engine/releases",
"Documentation": "https://github.com/liguobao/llm-flow-engine/wiki",
"Homepage": "https://github.com/liguobao/llm-flow-engine",
"Issues": "https://github.com/liguobao/llm-flow-engine/issues",
"Repository": "https://github.com/liguobao/llm-flow-engine.git"
},
"split_keywords": [
"llm",
" workflow",
" dsl",
" ai",
" automation",
" multi-model",
" async",
" dag",
" chatgpt",
" ollama"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "9108337e48ca993084984d46159574ecc22af67a7bac934763d6a48da5ed2e36",
"md5": "9b9b3e6e37627276f0f73adb034e2647",
"sha256": "4eabec6d3bad59d2ac90a653990299630253012acca3ef05bad9b61152a2cb64"
},
"downloads": -1,
"filename": "llm_flow_engine-0.7.3-py3-none-any.whl",
"has_sig": false,
"md5_digest": "9b9b3e6e37627276f0f73adb034e2647",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.8",
"size": 42759,
"upload_time": "2025-08-06T16:50:37",
"upload_time_iso_8601": "2025-08-06T16:50:37.587263Z",
"url": "https://files.pythonhosted.org/packages/91/08/337e48ca993084984d46159574ecc22af67a7bac934763d6a48da5ed2e36/llm_flow_engine-0.7.3-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "afff6fdc15099e4151ccf9ff3b76348e9146918360866735c1f77d8de8d78609",
"md5": "b3d8bb7bfaf96699de882850d459c0c7",
"sha256": "a65835350b8419e4e7d8afa3bd443153956c98c9e689236d12812cc2c8d3fcc5"
},
"downloads": -1,
"filename": "llm_flow_engine-0.7.3.tar.gz",
"has_sig": false,
"md5_digest": "b3d8bb7bfaf96699de882850d459c0c7",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.8",
"size": 58643,
"upload_time": "2025-08-06T16:50:39",
"upload_time_iso_8601": "2025-08-06T16:50:39.216824Z",
"url": "https://files.pythonhosted.org/packages/af/ff/6fdc15099e4151ccf9ff3b76348e9146918360866735c1f77d8de8d78609/llm_flow_engine-0.7.3.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-08-06 16:50:39",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "liguobao",
"github_project": "llm-flow-engine",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"requirements": [
{
"name": "aiohttp",
"specs": [
[
">=",
"3.8.0"
]
]
},
{
"name": "pyyaml",
"specs": [
[
">=",
"6.0"
]
]
},
{
"name": "loguru",
"specs": [
[
">=",
"0.7.0"
]
]
},
{
"name": "requests",
"specs": []
},
{
"name": "pytest",
"specs": [
[
">=",
"7.0"
]
]
},
{
"name": "pytest-asyncio",
"specs": [
[
">=",
"0.21.0"
]
]
},
{
"name": "black",
"specs": [
[
">=",
"22.0"
]
]
},
{
"name": "flake8",
"specs": [
[
">=",
"4.0"
]
]
}
],
"lcname": "llm-flow-engine"
}