Plan-and-Execute:先规划再执行
🦞 一只用 AI Agent 搭副业产线的程序员
ReAct 的优点是灵活——走一步看一步。但缺点也很明显:没有全局视角。它在第 3 步做的事,可能和第 8 步做的事重复了。它发现了新信息后,可能要走回头路。
Plan-and-Execute 是另一种思路:先生成完整计划,再一次性执行。
如果你把 ReAct 比作「边走边问路」,Plan-and-Execute 就是「先看地图确定路线,再出发」。
两种模式对比
举个例子:任务是「把 Go 1.21 项目升级到 1.23,确保所有依赖兼容」。
ReAct 的做法(边走边想):
Step 1: 读取 go.mod → 看到 1.21 和一堆依赖 Step 2: 改 go.mod 版本号 → 1.21 → 1.23 Step 3: go mod tidy → 报错,某个依赖不兼容 Step 4: 查这个依赖的最新版本 → 查到了 Step 5: 更新依赖 → 又报错,另一个间接依赖冲突 Step 6: ...(在迷宫里面撞墙)Plan-and-Execute 的做法(先规划再执行):
规划阶段: 1. 读取 go.mod,列出所有直接依赖 2. 逐一检查每个依赖是否兼容 Go 1.23 3. 对不兼容的依赖,找到兼容版本 4. 更新 go.mod 中的版本号 5. 更新不兼容的依赖版本 6. 执行 go mod tidy 7. 编译验证 8. 运行测试 执行阶段:按顺序执行 1→2→3→...→8Plan-and-Execute 把任务变成了一个有向无环图(DAG)——每个步骤有明确的依赖关系,执行引擎按拓扑顺序跑。
实现:一个轻量级的 DAG 执行器
packagemainimport("context""fmt""sync""time")// Task 单个任务节点typeTaskstruct{IDstring`json:"id"`Namestring`json:"name"`Descriptionstring`json:"description"`DependsOn[]string`json:"depends_on"`// 依赖的任务 ID 列表Actionstring`json:"action"`// 要执行的操作指令}// Plan LLM 生成的执行计划typePlanstruct{Goalstring`json:"goal"`Tasks[]Task`json:"tasks"`}// TaskResult 任务执行结果typeTaskResultstruct{TaskIDstringOutputstringErrorerror}// DAGExecutor DAG 执行器typeDAGExecutorstruct{executorfunc(ctx context.Context,task Task)(string,error)}funcNewDAGExecutor(executorfunc(ctx context.Context,task Task)(string,error))*DAGExecutor{return&DAGExecutor{executor:executor}}// Execute 按拓扑顺序执行计划func(e*DAGExecutor)Execute(ctx context.Context,plan Plan)(map[string]TaskResult,error){// 构建依赖图inDegree:=make(map[string]int)dependents:=make(map[string][]string)// 谁依赖我taskMap:=make(map[string]Task)for_,t:=rangeplan.Tasks{taskMap[t.ID]=t inDegree[t.ID]=len(t.DependsOn)for_,dep:=ranget.DependsOn{dependents[dep]=append(dependents[dep],t.ID)}}// 验证:检查所有依赖是否存在for_,t:=rangeplan.Tasks{for_,dep:=ranget.DependsOn{if_,ok:=taskMap[dep];!ok{returnnil,fmt.Errorf("任务 %s 依赖了不存在的任务 %s",t.ID,dep)}}}// 检测循环依赖ifcycle:=detectCycle(taskMap);cycle!=nil{returnnil,fmt.Errorf("检测到循环依赖: %v",cycle)}results:=make(map[string]TaskResult)varmu sync.Mutex// 找到所有入度为 0 的节点(没有依赖,可以直接执行)ready:=make(chanTask,len(plan.Tasks))for_,t:=rangeplan.Tasks{ifinDegree[t.ID]==0{ready<-t}}varwg sync.WaitGroup errCh:=make(chanerror,len(plan.Tasks))// 并发执行for{select{casetask:=<-ready:wg.Add(1)gofunc(t Task){deferwg.Done()fmt.Printf("▶ 执行: [%s] %s\n",t.ID,t.Name)output,err:=e.executor(ctx,t)mu.Lock()results[t.ID]=TaskResult{TaskID:t.ID,Output:output,Error:err}mu.Unlock()iferr!=nil{fmt.Printf("❌ [%s] 失败: %v\n",t.ID,err)errCh<-fmt.Errorf("任务 %s 失败: %w",t.ID,err)return}fmt.Printf("✅ [%s] 完成: %s\n",t.ID,truncate(output,80))// 通知依赖此任务的其他任务for_,dependent:=rangedependents[t.ID]{mu.Lock()inDegree[dependent]--ifinDegree[dependent]==0{ready<-taskMap[dependent]}mu.Unlock()}}(task)default:// 没有就绪的任务了,等待完成wg.Wait()// 检查是否有错误select{caseerr:=<-errCh:returnresults,errdefault:// 检查是否有未完成的任务(死锁)mu.Lock()remaining:=0for_,d:=rangeinDegree{ifd>0{remaining++}}mu.Unlock()ifremaining>0{returnresults,fmt.Errorf("执行完成但 %d 个任务未执行(可能存在依赖问题)",remaining)}returnresults,nil}}}}// detectCycle 检测 DAG 中是否有环funcdetectCycle(tasksmap[string]Task)[]string{visited:=make(map[string]bool)recStack:=make(map[string]bool)varcycle[]stringvardfsfunc(idstring)booldfs=func(idstring)bool{visited[id]=truerecStack[id]=truefor_,dep:=rangetasks[id].DependsOn{if!visited[dep]{ifdfs(dep){cycle=append(cycle,id)returntrue}}elseifrecStack[dep]{cycle=append(cycle,dep,id)returntrue}}recStack[id]=falsereturnfalse}forid:=rangetasks{if!visited[id]{ifdfs(id){returncycle}}}returnnil}functruncate(sstring,nint)string{iflen(s)>n{returns[:n]+"..."}returns}// ──────────── 两阶段实现:规划 + 执行 ────────────typePlanAndExecuteAgentstruct{plannerfunc(goalstring)(Plan,error)// 调用 LLM 生成计划executor*DAGExecutor}func(a*PlanAndExecuteAgent)Run(ctx context.Context,goalstring)(map[string]TaskResult,error){// 阶段 1:规划fmt.Println("📋 规划阶段:生成执行计划...")plan,err:=a.planner(goal)iferr!=nil{returnnil,fmt.Errorf("规划阶段失败: %w",err)}fmt.Printf("生成了 %d 个任务:\n",len(plan.Tasks))for_,t:=rangeplan.Tasks{fmt.Printf(" [%s] %s (依赖: %v)\n",t.ID,t.Name,t.DependsOn)}// 阶段 2:执行fmt.Println("\n⚡ 执行阶段:按拓扑顺序运行...")returna.executor.Execute(ctx,plan)}// ──────────── 示例:Go 项目升级计划 ────────────funcmain(){// 模拟 LLM 规划器mockPlanner:=func(goalstring)(Plan,error){returnPlan{Goal:goal,Tasks:[]Task{{ID:"1",Name:"读取 go.mod",Description:"获取当前版本和依赖",DependsOn:[]string{},Action:"read go.mod"},{ID:"2",Name:"列出直接依赖",Description:"解析所有直接依赖",DependsOn:[]string{"1"},Action:"list direct deps from go.mod"},{ID:"3",Name:"逐检查依赖兼容性",Description:"对每依赖检查 Go 1.23 兼容",DependsOn:[]string{"2"},Action:"check each dep for Go 1.23"},{ID:"4",Name:"找到不兼容依赖的新版本",Description:"对不兼容的依赖查询可用版本",DependsOn:[]string{"3"},Action:"find compatible versions"},{ID:"5",Name:"更新 go.mod 版本号",Description:"将 go 1.21 改为 go 1.23",DependsOn:[]string{"1"},Action:"update go version in go.mod"},{ID:"6",Name:"更新依赖版本",Description:"更新不兼容的依赖到兼容版本",DependsOn:[]string{"4","5"},Action:"update dep versions"},{ID:"7",Name:"执行 go mod tidy",Description:"清理和下载依赖",DependsOn:[]string{"6"},Action:"go mod tidy"},{ID:"8",Name:"编译验证",Description:"确保项目能编译通过",DependsOn:[]string{"7"},Action:"go build ./..."},{ID:"9",Name:"运行测试",Description:"确保所有测试通过",DependsOn:[]string{"8"},Action:"go test ./..."},},},nil}// 模拟执行器 - 实际项目这里接真正的 LLM 调用和工具执行mockExecutor:=func(ctx context.Context,t Task)(string,error){time.Sleep(100*time.Millisecond)// 模拟执行时间returnfmt.Sprintf("[%s] 成功: 已完成 %s",t.ID,t.Action),nil}dagExec:=NewDAGExecutor(mockExecutor)agent:=&PlanAndExecuteAgent{planner:mockPlanner,executor:dagExec,}results,err:=agent.Run(context.Background(),"将 Go 项目从 1.21 升级到 1.23,确保所有依赖兼容")iferr!=nil{fmt.Printf("❌ 执行失败: %v\n",err)return}fmt.Printf("\n✅ 全部完成 (%d 个任务)\n",len(results))forid,r:=rangeresults{fmt.Printf(" [%s] %s\n",id,r.Output)}}DAG 执行器的核心逻辑
// 1. 拓扑排序:入度为 0 的任务先执行for_,t:=rangetasks{iflen(t.DependsOn)==0{ready<-t// 没有依赖,立即执行}}// 2. 当任务完成时,通知依赖它的人for_,dependent:=rangedependents[task.ID]{inDegree[dependent]--ifinDegree[dependent]==0{ready<-dependent// 所有依赖都完成了,可以执行}}这就是大学数据结构的拓扑排序——当年学的时候觉得没用,现在成了 Agent 执行引擎的核心。
Plan-and-Execute vs ReAct:选哪个
| 维度 | ReAct | Plan-and-Execute |
|---|---|---|
| 规划 | 逐步推理,走一步看一步 | 一次性生成完整计划 |
| 灵活性 | 高:随时根据观察调整 | 低:计划生成后较难更改 |
| 效率 | 可能有重复步骤 | 去重后执行,效率更高 |
| 并发 | 不能并发(步骤串行) | 无依赖的任务可并发执行 |
| Token 消耗 | 每步都要推理 | 规划阶段大,执行阶段小 |
| 失败恢复 | 自然重试 | 需要重新规划或跳过失败节点 |
| 适用场景 | 探索性任务、Bug 排查 | 步骤明确的任务、批处理 |
实战判断标准:
如果任务步骤可以事前预测 80% 以上 → Plan-and-Execute
如果任务路径高度依赖中间结果 → ReAct
你也可以组合使用:用 Plan-and-Execute 做整体框架,每个 Task 内部用 ReAct 做微小决策。
一个真实的坑:计划太细
我用 Plan-and-Execute 跑过一次代码迁移任务,LLM 生成的计划有 47 个步骤。其中 13 个步骤是多余的——比如「检查 go.sum 文件存在性」「确认 go 命令可用」这种应该在环境检查阶段统一做的事。
教训:在给 LLM 的规划 Prompt 里加上「选择合适的粒度,不要列出环境检查或者可以通过自动化工具一步完成的步骤」。计划不是越细越好。
总结
ReAct 和 Plan-and-Execute 是 Agent 设计的两大范式。实战中的 Agent 往往两者都用——Plan-and-Execute 给骨架,ReAct 填血肉。
下一篇我们跳出单个 Agent,看一个更高维度的问题:什么时候该把任务拆给多个 Agent 并行做?
关注我,别错过。
🦞 一只用 AI Agent 搭副业产线的程序员
全平台同名:虾哥不加班
需要定制 AI 工具?来聊聊 → lob_ai源码:GitHub - lobster-bujiaban
