当前位置: 首页 > news >正文

LangGraph 12节点智能体工作流编排:从状态设计到条件路由的实战拆解

LangGraph 12节点智能体工作流编排:从状态设计到条件路由的实战拆解

写在前面

LangChain/LangGraph生态里最常被问到的问题是:"Agent和工作流到底怎么选?"

我的答案是:大多数生产级NL2SQL场景,不需要自由的Agent Loop,需要的是结构化的状态图工作流。Agent(ReAct模式)适合开放性工具调用场景,但NL2SQL的流程是确定的——关键词提取、召回、过滤、生成、校验,步骤固定,只是每步的"决策"由LLM完成。

这篇文章完整拆解小欧问数项目中12节点LangGraph工作流的设计:状态怎么定义、节点怎么拆分、并行怎么编排、条件路由怎么做。


一、整体拓扑:一眼看懂12节点

START│▼
extract_keywords(关键词提取)│├──▶ recall_column(字段召回)────┐│                                  │├──▶ recall_value(值召回)────────┼──▶ merge_retrieved_info(信息融合)│                                  │└──▶ recall_metric(指标召回)──┘│▼┌──────────────────────┐│   filter_table(表过滤)──┐│   filter_metric(指标过滤)┼──▶ add_extra_context(注入上下文)└──────────────────────┘│▼generate_sql(SQL生成)│▼validate_sql(SQL校验)/      \error=None   error!=None│            │▼            ▼execute_sql   correct_sql(SQL修正)│            ││            ▼│      execute_sql▼END

核心特征

  • 前三路(column/value/metric)并行执行
  • filter_table和filter_metric 并行执行
  • validate_sql后接条件路由:成功直接执行,失败走修正节点

二、状态设计:DataAgentState

LangGraph的核心是状态(State)——每个节点的输入和输出都是State的一部分。设计State的原则是:节点之间只通过State通信,不直接调用

class DataAgentState(TypedDict):query: str              # 原始查询(全程只读)keywords: list[str]     # 关键词(extract_keywords写,三路召回读)retrieved_columns: list[ColumnInfo]  # 召回字段(三路召回写,merge读)retrieved_values: list[ValueInfo]    # 召回值retrieved_metrics: list[MetricInfo]  # 召回指标table_infos: list[TableInfoState]    # 融合后的表结构(merge写,filter读)metric_infos: list[MetricInfoState]  # 融合后的指标(merge写,filter读)date_info: DateInfoState   # 日期上下文(add_extra_context写,generate_sql读)db_info: DBInfoState       # 数据库环境sql: str                   # 生成的SQL(generate_sql写,validate读)error: str                 # 校验错误(validate写,correct读)

设计原则

  1. 字段命名即数据流:从字段名就能看出哪个节点写、哪个节点读
  2. TypedDict而非dataclass:LangGraph要求State是TypedDict,支持部分更新
  3. 节点只返回增量:每个节点只返回它负责更新的字段,LangGraph自动合并

三、Context设计:依赖注入

State是"流转的数据",Context是"固定的依赖"。所有Repository、Client通过Context注入,节点不直接创建连接。

class DataAgentContext(TypedDict):embedding_client: HuggingFaceEndpointEmbeddingscolumn_qdrant_repository: ColumnQdrantRepositoryvalue_es_repository: ValueESRepositorymetric_qdrant_repository: MetricQdrantRepositorymeta_mysql_repository: MetaMySQLRepositorydw_mysql_repository: DWMySQLRepository

这种设计的好处:

  • 测试时可替换为Mock
  • 节点不关心连接管理,生命周期由外层控制
  • 符合LangGraph的context_schema设计范式

四、节点设计原则

4.1 单职责

每个节点只做一件事,通过函数签名就能看出它读什么、写什么:

async def extract_keywords(state: DataAgentState, runtime: Runtime[DataAgentContext]):query = state["query"]          # 读:query# ...分词逻辑return {"keywords": keywords}   # 写:keywordsasync def recall_column(state: DataAgentState, runtime: Runtime[DataAgentContext]):query = state["query"]          # 读:querykeywords = state["keywords"]    # 读:keywords# ...召回逻辑return {"retrieved_columns": columns}  # 写:retrieved_columns

4.2 统一错误处理模式

每个节点内部try-except,错误向上抛出(由LangGraph框架处理),同时通过SSE通知前端:

async def recall_column(state, runtime):writer = runtime.stream_writerwriter({"type": "progress", "step": "召回字段", "status": "running"})try:# ...业务逻辑writer({"type": "progress", "step": "召回字段", "status": "success"})return {"retrieved_columns": retrieved_columns}except Exception as e:writer({"type": "progress", "step": "召回字段", "status": "error"})raise

4.3 SSE实时推送

每个节点通过runtime.stream_writer推送进度,客户端收到的是实时的事件流:

{"type": "progress", "step": "抽取关键字", "status": "running"}
{"type": "progress", "step": "抽取关键字", "status": "success"}
{"type": "progress", "step": "召回字段", "status": "running"}
{"type": "progress", "step": "召回字段", "status": "success"}
...
{"type": "result", "data": [...]}

这让前端可以展示"正在召回字段...正在过滤表格...正在生成SQL..."的实时进度条。


五、并行编排:LangGraph的并行边

LangGraph支持扇出(Fan-out)/ 扇入(Fan-in)模式,实现多路并行:

# 关键词提取完成后,三路并行
graph_builder.add_edge("extract_keywords", "recall_column")
graph_builder.add_edge("extract_keywords", "recall_value")
graph_builder.add_edge("extract_keywords", "recall_metric")# 三路汇入同一个merge节点(扇入)
graph_builder.add_edge("recall_column", "merge_retrieved_info")
graph_builder.add_edge("recall_value", "merge_retrieved_info")
graph_builder.add_edge("recall_metric", "merge_retrieved_info")

LangGraph的行为

  • extract_keywords完成后,三个recall节点同时启动
  • merge_retrieved_info会等待三个前驱节点全部完成后才执行
  • 并行节点的State更新是原子合并的,不存在竞态

同样的模式用在filter阶段:

graph_builder.add_edge("merge_retrieved_info", "filter_table")
graph_builder.add_edge("merge_retrieved_info", "filter_metric")
graph_builder.add_edge("filter_table", "add_extra_context")
graph_builder.add_edge("filter_metric", "add_extra_context")

六、条件路由:validate_sql后的分支

这是整个工作流里唯一的"动态决策点":

graph_builder.add_conditional_edges("validate_sql",lambda state: "execute_sql" if state["error"] is None else "correct_sql",{"execute_sql": "execute_sql", "correct_sql": "correct_sql"}
)

路由逻辑

  • validate_sql节点用EXPLAIN {sql}验证SQL语法和表字段存在性
  • 验证通过:error=None,直接走execute_sql
  • 验证失败:error=具体错误信息,走correct_sql让LLM根据错误信息修正
# validate_sql.py
async def validate_sql(state, runtime):try:await dw_mysql_repository.validate_sql(sql)  # EXPLAINreturn {"error": None}except Exception as e:return {"error": str(e)}

为什么不直接重试? 因为correct_sql节点有完整的上下文(表结构、指标定义、原始查询、错误信息),能做有针对性的修正,而不是盲目重试。


七、节点拆解:filter_table的LLM过滤

filter_table是一个典型的"LLM做决策、代码做执行"的节点。

7.1 问题背景

三路召回后可能有10+张表、50+个字段。直接全部塞给generate_sql,不仅Token爆炸,LLM还会被无关字段干扰。

7.2 LLM过滤

# 让LLM判断:回答这个问题,只需要哪些表的哪些字段?
prompt = PromptTemplate(template=load_prompt("filter_table_info"), ...)
result = await chain.ainvoke({"query": query, "table_infos": yaml.dump(table_infos)})# result格式:{"fact_order": ["order_amount", "region_id"], "dim_region": ["region_id", "region_name"]}

LLM输出的是一个精简的表-字段映射,代码再据此裁剪State里的table_infos

for table_info in table_infos[:]:if table_info["name"] not in result:table_infos.remove(table_info)  # 整张表不需要,移除else:selected_columns = result[table_info["name"]]for column_info in table_info["columns"][:]:if column_info["name"] not in selected_columns:table_info["columns"].remove(column_info)  # 字段不需要,移除

效果:过滤前可能15张表60个字段,过滤后只剩2-3张表10个字段,generate_sql的Prompt Token减少60%+。


八、上下文注入:add_extra_context

这个节点容易被忽略,但对SQL质量影响很大。它注入两类信息:

8.1 时间信息

today = datetime.today()
date_info = DateInfoState(date=today.strftime("%Y-%m-%d"),weekday=today.strftime("%A"),quarter=f"Q{(today.month - 1) // 3 + 1}"
)

用户说"去年"、"上个季度"、"本月",LLM需要知道"现在是哪年哪月"才能正确推断时间范围。

8.2 数据库环境信息

db_info = await dw_mysql_repository.get_db_info()
# {"dialect": "mysql", "version": "8.0.32"}

不同数据库的SQL语法有差异(MySQL的LIMIT vs SQL Server的TOP vs Oracle的ROWNUM),告知LLM具体的数据库类型和版本,能避免生成不兼容的SQL。


九、SSE流式输出:从工作流到前端

整个工作流的输出是SSE(Server-Sent Events)流:

# query_service.py
async def query(self, query: str):async for chunk in graph.astream(input=state, context=context, stream_mode="custom"):yield f"data: {json.dumps(chunk, ensure_ascii=False)}\n\n"# query_router.py
@query_router.post("/api/query")
async def query(query: QuerySchema, query_service=Depends(get_query_service)):return StreamingResponse(query_service.query(query.query),media_type="text/event-stream")

stream_mode="custom"意味着只有节点主动调用stream_writer的数据才会被推送,State的内部变更不会自动推送——这保证了推送内容是可控的、面向前端的


十、为什么不用ReAct Agent?

这是架构选型时最常见的疑问。对比一下:

维度 ReAct Agent 状态图工作流(本方案)
流程可控性 低,LLM自主决策下一步 高,节点拓扑固定
延迟可预测性 差,循环次数不确定 好,固定深度(最多12步)
并行能力 弱,通常是串行thought-action 强,扇出扇入原生支持
生产稳定性 LLM可能陷入死循环 不存在循环(correct_sql→execute_sql是终点)
适用场景 开放域工具调用 流程确定的数据处理

核心结论:当流程是确定的、只是节点内部的"决策"需要LLM时,用状态图工作流比Agent更稳定、更可控。


十一、总结

这套12节点工作流的核心设计思想:

  1. 状态即数据流:State字段命名体现数据流向,节点只返回增量更新
  2. 并行降低延迟:三路召回并行、双路过滤并行,总延迟取决于最慢的一路
  3. 条件路由做异常处理:validate→correct不是"重试",而是"有上下文的修正"
  4. LLM做决策、代码做执行:LLM负责"哪些表需要"、"SQL怎么写"等判断,代码负责"裁剪State"、"执行SQL"等确定性操作
  5. SSE实时透明:每个节点的进度实时推送,用户看到的不是"加载中..."而是具体到哪一步了

LangGraph最大的价值不是"让LLM调用工具",而是让复杂的多步骤AI流程变得可工程化、可测试、可监控。这才是生产级AI系统需要的。

http://www.zskr.cn/news/1482486.html

相关文章:

  • 人工智能的定义与基础(理论篇)
  • 广州修漏水别乱选!2026 实地筛选正规防水商家,居家堵漏少踩冤枉坑 - 宅安选房屋修缮
  • 2026年 真空热压炉/热压烧结炉/材料热压炉厂家推荐榜:高精度温控与核心烧结技术实力深度解析 - 品牌发掘
  • 从实验室到生产环境:手把手教你规划VMware vSphere 7.0的授权策略(附各版本功能对比)
  • GraphRAG vs 无向量RAG vs 向量RAG(2026年高级上下文工程指南)
  • 市面上有哪些是真正性价比高的AI智能降重工具(告别论文AI标记风险)
  • 敦煌母婴除甲醛CMA甲醛检测治理公司深度测评:绿呼吸环保稳居榜首 - 一修哥咨询
  • 宠乐圈互助平台
  • 如何彻底移除Windows Defender?5步终极指南释放系统性能
  • 东港母婴除甲醛CMA甲醛检测治理公司深度测评:绿呼吸环保稳居榜首 - 一修哥咨询
  • Windows 10下CausalML安装避坑全记录:从Visual C++到XGBoost版本冲突
  • Redis 有序集合(sorted set)
  • 3个实战技巧:高效使用Python工具完成网页截图与HTML转图片
  • 告别网络卡顿:手把手教你为RoCEv2配置DC-QCN拥塞控制(附Mellanox网卡实战)
  • 炸裂!OpenClaw+Hermes+RAG+Agent 中国标准落地,千行百业迎来 “数字员工” 革命
  • 四川建筑钢材经销商公司|带肋钢筋|螺纹钢|盘螺|盘圆|抗震钢筋 - 四川盛世钢联营销中心
  • 如何用Python实现高效抢票:告别演唱会门票秒光烦恼
  • IEEE会议投稿避坑指南:从LaTeX模板到PDF eXpress校验的完整流程(以CAC为例)
  • 都匀母婴除甲醛CMA甲醛检测治理公司深度测评:绿呼吸环保稳居榜首 - 一修哥咨询
  • 如何高效批量下载抖音无水印视频:从内容收藏到素材管理的完整解决方案
  • 深圳修漏水别盲目挑选!2026 实地甄选合规防水门店,家装堵漏避开各类消费圈套 - 宅安选房屋修缮
  • 元组-tuple基本操作
  • 2026年立车厂家/数控立车/高速立车/车铣复合立车/电机壳加工及汽车零部件加工立车最新推荐品牌榜单 - 品牌发掘
  • 从振动信号到健康预警:工业AIoT场景下的智能诊断实战指南
  • 三自由度Delta并联机器人的设计与仿真(设计源文件+万字报告+讲解)(支持资料、图片参考_降重降ai)
  • 一个会展人的真实记录:天津国家会展中心包车,我为什么连续3年只找这一家 - 米米Ada
  • 一马当先-beta冲刺
  • 3分钟搞定Windows激活:智能脚本的终极解决方案
  • 大庆母婴除甲醛CMA甲醛检测治理公司深度测评:绿呼吸环保稳居榜首 - 一修哥咨询
  • 四川钢管代理商公司|无缝钢管|焊管|镀锌管|螺旋焊管|方矩管 - 四川盛世钢联营销中心