LangGraph实战:构建有状态AI工作流引擎

LangGraph实战:构建有状态AI工作流引擎

一、LangGraph vs 其他AI框架

LangGraph是LangChain团队推出的新一代AI应用开发框架,专门用于构建有状态、多步骤的AI工作流。与LangChain的DAG模型不同,LangGraph采用Cyclic Graph(循环图)结构,支持无限循环和条件分支,可以实现真正的工作流编排。

回到顶部

二、核心概念

StateGraph:状态图,工作流的核心 Node:节点,代表一个步骤或任务 Edge:边,定义节点间的流转关系 State:状态,整个工作流的共享状态 Checkpoint:检查点,支持断点续跑和回溯 ConditionalEdge:条件边,支持动态路由

回到顶部

三、环境搭建

pip install langgraph langchain langchain-openai import os os.environ["OPENAI_API_KEY"] = "your-api-key"

回到顶部

四、最简单的工作流

from langgraph.graph import StateGraph, END from typing import TypedDict # 定义状态 class GraphState(TypedDict): messages: list result: str # 定义节点 def node1(state): return {"result": "Step 1 完成 -> " + state.get("result", "")} def node2(state): return {"result": state.get("result", "") + "Step 2 完成"} # 创建图 workflow = StateGraph(GraphState) workflow.add_node("step1", node1) workflow.add_node("step2", node2) # 定义边 workflow.set_entry_point("step1") workflow.add_edge("step1", "step2") workflow.add_edge("step2", END) # 编译 app = workflow.compile() # 执行 result = app.invoke({"messages": [], "result": ""}) print(result)

回到顶部

五、有状态工作流:客服机器人

from langchain_openai import ChatOpenAI llm = ChatOpenAI(model="gpt-4o") class客服State(TypedDict): query: str intent: str response: str needs_human: bool # 意图识别节点 def识别意图(state): query = state["query"] prompt = f"判断用户意图:{query},只返回:咨询/投诉/查询/办理" intent = llm.invoke(prompt).content return {"intent": intent} # 分类处理 def处理咨询(state): response = llm.invoke(f"专业回答用户问题:{state['query']}").content return {"response": response} def处理投诉(state): response = "非常抱歉给您带来不便,我们会尽快处理您的问题。" return {"response": response, "needs_human": True} # 创建工作流 workflow = StateGraph(客服State) workflow.add_node("intent_classifier", 识别意图) workflow.add_node("handle_consult", 处理咨询) workflow.add_node("handle_complaint", 处理投诉) workflow.set_entry_point("intent_classifier") # 条件边:根据意图路由 def路由(state): intent = state.get("intent", "") if "投诉" in intent: return "handle_complaint" return "handle_consult" workflow.add_conditional_edges( "intent_classifier", 路由, {"handle_consult": "handle_consult", "handle_complaint": "handle_complaint"} ) workflow.add_edge("handle_consult", END) workflow.add_edge("handle_complaint", END) app = workflow.compile() result = app.invoke({"query": "我要投诉快递太慢", "intent": "", "response": "", "needs_human": False}) print(result)

回到顶部

六、循环工作流:智能迭代

# 循环工作流:代码审查 + 自动修复 class CodeState(TypedDict): code: str issues: list iterations: int def代码审查(state): issues = [] if "TODO" in state["code"]: issues.append("发现TODO待办") if len(state["code"]) > 500: issues.append("代码过长") return {"issues": issues} def代码修复(state): code = state["code"] for issue in state.get("issues", []): if "TODO" in issue: code = code.replace("TODO", "DONE") return {"code": code, "iterations": state.get("iterations", 0) + 1} def检查是否继续(state): if state.get("issues") and state.get("iterations", 0) < 3: return "fix" return "end" workflow = StateGraph(CodeState) workflow.add_node("review", 代码审查) workflow.add_node("fix", 代码修复) workflow.set_entry_point("review") workflow.add_conditional_edges( "review", 检查是否继续, {"fix": "fix", "end": END} ) workflow.add_edge("fix", "review") app = workflow.compile() result = app.invoke({"code": "def test(): TODO...", "issues": [], "iterations": 0}) print(result)

回到顶部

七、检查点与断点续跑

# 检查点配置 from langgraph.checkpoint.memory import MemorySaver checkpointer = MemorySaver() workflow = StateGraph(GraphState) workflow.add_node("step1", node1) workflow.add_node("step2", node2) workflow.set_entry_point("step1") workflow.add_edge("step1", "step2") workflow.add_edge("step2", END) # 编译时启用检查点 app = workflow.compile(checkpointer=checkpointer) # 首次执行(会保存检查点) config = {"configurable": {"thread_id": "user-123"}} result = app.invoke({"messages": [], "result": "Step 1 -> "}, config) print("首次执行:", result) # 断点续跑(从检查点恢复) result2 = app.invoke({"messages": [], "result": ""}, config) print("续跑结果:", result2)

回到顶部

八、Human-in-the-Loop

# 人类审核节点 from langgraph.prebuilt import ToolNode def审核节点(state): # 返回特殊信号,等待人类审核 return {"__interrupt__": True} def人类决策(state, decision): # 接收人类决策后继续执行 return {"human_decision": decision} workflow = StateGraph(GraphState) workflow.add_node("ai_process", lambda s: {"result": "AI处理完成"}) workflow.add_node("human_review", 审核节点) workflow.add_node("human_decide", lambda s: {"result": "人类批准: " + s.get("human_decision", "")}) workflow.set_entry_point("ai_process") workflow.add_edge("ai_process", "human_review") workflow.add_edge("human_review", "human_decide") workflow.add_edge("human_decide", END) app = workflow.compile(interrupt_before=["human_decide"]) # 执行到人类审核节点后暂停 result = app.invoke({"messages": [], "result": ""}) print("暂停等待审核:", result) # 人类审核后继续 app.invoke({"human_decision": "批准"}, config)

回到顶部

九、与Spring Boot集成

@Service public class LangGraphService { @Value("${langgraph.service.url}") private String serviceUrl; public String runWorkflow(String workflowId, Map<String, Object> input) { HttpHeaders headers = new HttpHeaders(); headers.setContentType(MediaType.APPLICATION_JSON); Map<String, Object> body = Map.of( "workflow_id", workflowId, "input", input, "thread_id", UUID.randomUUID().toString() ); HttpEntity<Map<String, Object>> entity = new HttpEntity<>(body, headers); try { ResponseEntity response = restTemplate.exchange( serviceUrl + "/execute", HttpMethod.POST, entity, Map.class ); return (String) response.getBody().get("result"); } catch (Exception e) { return "Error: " + e.getMessage(); } }