https://github.com/hjlarry/dsl-test.git
🚀 高性能、可扩展的分布式工作流引擎
cd /Users/hejl/projects/dsl/workflow-engine
cargo build
# 使用示例工作流
cargo run -- -f examples/example.yaml
# 使用日志输出
RUST_LOG=info cargo run -- -f examples/example.yaml
# 运行自定义工作流
cargo run -- -f /path/to/your/workflow.yaml
# 传递输入参数 (覆盖全局变量)
cargo run -- -f examples/example.yaml -i message="Hello from CLI" -i count=42
cargo build --release
./target/release/workflow-engine -f examples/example.yaml
# 启动服务器 (默认端口 3000)
cargo run -- serve --port 3000
触发工作流:
curl -X POST http://localhost:3000/execute \
-H "Content-Type: application/json" \
-d '{
"file": "examples/example.yaml",
"inputs": {
"message": "Hello from Webhook",
"count": 100
}
}'
响应:
{
"status": "success",
"outputs": {
"node_id": { ... }
},
"error": null
}
架构:
./target/release/workflow-engine coordinator -p 8080
# Worker 1
./target/release/workflow-engine worker -i worker-1 -p 3001 -c http://localhost:8080
# Worker 2
./target/release/workflow-engine worker -i worker-2 -p 3002 -c http://localhost:8080
./target/release/workflow-engine submit -f benchmarks/distributed_test.yaml -c http://localhost:8080
性能优势:
name: "工作流名称"
version: "1.0"
# 全局变量 (所有节点可访问)
global:
key1: "value1"
key2: 123
nodes:
- id: "唯一ID"
type: "节点类型"
name: "显示名称"
needs: ["依赖的节点ID"] # 可选,为空表示初始节点
params:
# 节点特定参数
目前支持 12 种节点类型:
- id: "my_shell"
type: "shell"
name: "执行脚本"
params:
command: "echo 'Hello' && ls -la"
- id: "api_call"
type: "http"
name: "调用API"
params:
method: "GET" # or POST
url: "https://api.example.com/data"
body: # POST时使用
key: "value"
- id: "wait"
type: "delay"
name: "等待"
params:
milliseconds: 1000 # 等待1秒
- id: "condition"
type: "switch"
name: "条件判断"
params:
condition: "{{ nodes.fetch.output.status }} > 100"
true_value: "高分处理"
false_value: "常规处理"
支持的条件运算符:
== (等于), != (不等于)> (大于), < (小于)>= (大于等于), <= (小于等于)true, false (布尔字面量)- id: "data_process"
type: "script"
name: "数据处理"
params:
language: "python" # or "javascript", "js", "node"
script: |
import json
data = {"result": 42}
print(json.dumps(data))
支持的语言:
python / python3 (需要安装 Python 3)javascript / js / node (需要安装 Node.js)- id: "ai_analyze"
type: "llm"
name: "AI分析"
params:
model: "gpt-4" # or "gpt-3.5-turbo"
system: "你是数据分析专家"
prompt: "分析这些数据:{{ nodes.fetch.output }}"
temperature: 0.7 # 可选,默认0.7
max_tokens: 500 # 可选
配置:
OPENAI_API_KEY.env 文件(参考 .env.example)base_url 参数使用兼容服务{
"content": "AI生成的文本",
"model": "gpt-4",
"usage": {
"prompt_tokens": 100,
"completion_tokens": 50,
"total_tokens": 150
}
}
- id: "extract"
type: "transform"
name: "提取数据"
params:
input: "{{ nodes.api.output.body }}"
# 单字段提取
path: "$.data.users[*].name"
# 或多字段提取
extract:
names: "$.users[*].name"
emails: "$.users[*].email"
JSONPath 语法示例:
$.data - 获取 data 字段$.users[0] - 第一个用户$.users[*].name - 所有用户的 name$[0:3] - 前3个元素# 写文件
- id: "save"
type: "file"
params:
operation: "write" # or "read", "append"
path: "/tmp/result.json"
content: "{{ nodes.process.output }}"
# 读文件
- id: "load"
type: "file"
params:
operation: "read"
path: "./config.json"
- id: "batch_process"
type: "loop"
name: "批量处理"
params:
items: "{{ nodes.fetch.output.urls }}" # 数组
steps:
- id: "process_item"
type: "shell"
params:
command: "echo 'Processing {{ loop.item }} at index {{ loop.index }}'"
- id: "save_item"
type: "file"
needs: ["process_item"]
params:
operation: "write"
path: "/tmp/item_{{ loop.index }}.txt"
content: "{{ loop.item }}"
Loop 上下文变量:
{{ loop.item }} - 当前迭代的元素{{ loop.index }} - 当前索引 (从0开始){{ loop.total }} - 总元素数量{
"iterations": [
{"process_item": {...}, "save_item": {...}},
{"process_item": {...}, "save_item": {...}}
],
"count": 2
}
- id: "ask_name"
type: "input"
name: "询问用户名"
params:
prompt: "请输入你的名字:"
default: "Guest" # 可选,按回车使用默认值
输出: 用户输入的字符串
"Alice"
- id: "set_var"
type: "assign"
params:
assignments:
- key: "count"
value: 1
mode: "set"
- id: "call_tool"
type: "mcp"
params:
server:
command: "npx"
args: ["-y", "@modelcontextprotocol/server-filesystem", "/Users/me/files"]
tool: "read_file"
arguments:
path: "/Users/me/files/test.txt"
**支持的模式**:
- `set`: 设置变量值(覆盖)
- `append`: 将值追加到数组(如果变量不是数组,操作会被忽略)
**输出**: 返回所有更新后的变量值json
{
"counter": 42,
"results": [...]
}
### 变量引用 (Variable Substitution)
在 `params` 中使用 `{{ }}` 语法引用变量:yaml
{{ global.api_key }} - 引用全局变量{{ nodes.fetch_data.output }} - 引用其他节点的输出{{ nodes.http_call.output.body }} - 嵌套字段访问{{ loop.item }} - 循环中的当前元素 (仅在 Loop 节点内){{ loop.index }} - 循环索引 (仅在 Loop 节点内)yaml
global:
api_url: "https://api.example.com"
nodes:
### 内存系统 (Memory System)
- **全局内存 (Global Memory)**: `{{ global.key }}`
- 在 YAML 的 `global` 部分定义
- 所有节点可读可写
- 线程安全
- **节点内存 (Node Memory)**: `{{ nodes.node_id.output }}`
- 每个节点执行完会存储输出
- 只能访问已完成的依赖节点的输出
### 并行执行示例yaml
nodes:
# 节点A先执行