当前位置: 首页 > news >正文

人机协作——interrupt 与审批网关 — LangGraph 实战——构建跨平台爆款图文 Agent 第4篇

第4章:人机协作——interrupt 与审批网关

本章目标

读完本章你会:

  • 能解释 Human-in-the-Loop 为什么是生产级 Agent 的必需品而非可选项
  • 能用 interrupt() 在图执行的任意节点暂停,等待人类输入
  • 能用 Command(resume=...) 传入人类决策并恢复执行
  • 能设计一个完整的"AI 生成 → 人类审核 → 批准/驳回 → 发布"工作流

知识讲解

从一个生活例子开始

你是一家新媒体编辑部的编辑。你的工作流是这样的:

选题策划 → 撰稿 → 主编审核 → ┬→ 通过 → 排版发布└→ 驳回 → 修改 → 主编再审

关键在于主编审核这个环节。没有它,文章直接发出去了——标题党、事实错误、观点跑偏,全是你的锅。有它,AI 可以尽情发挥,因为最终把关的是人。

LangGraph 里,这个"主编审核"环节就是 interrupt()。它在图的执行路径上设了一道闸门——Agent 跑到这里就停下,把结果展示给你,等你的指令。你可以:

  • 批准:继续往下走,进入发布环节
  • 驳回:带着修改意见回到生成环节,Agent 重新写
  • 甚至修改 State 里的任意字段再继续——不只是"过/不过"二选一
编辑部的角色 LangGraph 的对应
撰稿编辑 Agent 的生成节点
主编 你(人类操作者)
审稿环节 interrupt()
"通过,发吧" / "重写导语" Command(resume=...)

工作原理

interrupt() 做了什么?

interrupt() 是 LangGraph 内置的函数。当一个节点调用它时:

  1. 图立即暂停执行——后续节点不会被调用
  2. Checkpointer 保存当前 State——包括 interrupt 发生时的完整上下文
  3. 控制权交还给调用方——invoke()stream() 会返回一个特殊结果,包含 interrupt 信息
  4. 等待 Command(resume=...) 恢复——调用方准备好后,传入人类决策,图从暂停处继续
agent 节点执行 → 调用 interrupt("请审核") → ⏸️ 暂停↓人类审核... → Command(resume={"approved": True}) → ▶️ 继续

和 checkpointer 的关系

interrupt() 能工作的前提是图有 checkpointer。没有 checkpointer,中断时的 State 无处存放,恢复也无从谈起。这是第 3 章是本章前置依赖的原因。

实际上,interrupt() 的实现就是:在当前 super-step 的末尾,把 State 写入 checkpoint,然后抛出 GraphInterrupt 异常——这个异常被 LangGraph 运行时捕获,把控制权还给调用方。恢复时,从 checkpoint 读取 State,从 interrupt() 的下一行继续执行。

不只"通过/不通过"——Command 的三种恢复方式

# 方式 1:只恢复继续(最简单)
Command(resume="批准发布")# 方式 2:恢复同时更新 State
Command(resume="驳回", update={"feedback": "标题不够吸引人,重写"})# 方式 3:恢复同时跳转到指定节点(动态改道)
Command(resume="驳回", update={"feedback": "重写"}, goto="generate_content")

方式 3 特别有用——驳回时可以直接跳回生成节点,跳过"发布"节点的前置检查。

思考一下: 如果你在 interrupt() 暂停后不小心关掉了终端,下次重启还能继续吗?提示:想想第 3 章 SqliteSaver 的持久化能力。


代码实战

基础版:理解 interrupt 的暂停与恢复

先从一个最简的二节点图开始,纯粹理解 interrupt() 的机制。新建文件 chapter04_interrupt_basics.py

"""
第 4 章 基础演示 A:interrupt 的暂停与恢复机制
理解图如何在中途等待人类输入
"""
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from langgraph.types import interrupt, Command    #  interrupt 和 Command# ============================================================
# State
# ============================================================
class ReviewState(TypedDict):draft: str          # AI 生成的草稿feedback: str       # 人类的审核意见final: str          # 最终版本# ============================================================
# 节点——interrupt 放在"等待审核"节点中
# ============================================================
def generate_draft(state: ReviewState) -> dict:"""模拟 AI 生成内容"""print("[generate_draft] AI 正在撰写草稿...")draft = "LangGraph 是 2026 年最值得学习的 AI Agent 框架——它用图的思维编排复杂工作流。"return {"draft": draft}def human_review(state: ReviewState) -> dict:""" 人类审核节点——调用 interrupt 暂停"""draft = state["draft"]print(f"\n{'='*50}")print(f" 待审核内容:\n{draft}")print(f"{'='*50}")#  interrupt()——图在这里暂停# 参数是你想展示给人类的信息human_decision = interrupt("请审核以上内容。输入 'approve' 批准,或输入修改建议。")# ⚠ 下面这行代码在人类调用 Command(resume=...) 之后才会执行print(f"[human_review] 收到审核结果: {human_decision}")# 根据人类输入做不同处理if human_decision == "approve":return {"feedback": "approved"}else:return {"feedback": human_decision}def finalize(state: ReviewState) -> dict:"""根据审核结果决定最终版本"""if state["feedback"] == "approved":final = f"✅ 已发布:{state['draft']}"else:final = f" 需修改(意见:{state['feedback']})——原稿:{state['draft']}"print(f"[finalize] {final}")return {"final": final}# ============================================================
# 构建图
# ============================================================
def build_review_graph():builder = StateGraph(ReviewState)builder.add_node("generate_draft", generate_draft)builder.add_node("human_review", human_review)builder.add_node("finalize", finalize)builder.add_edge(START, "generate_draft")builder.add_edge("generate_draft", "human_review")builder.add_edge("human_review", "finalize")builder.add_edge("finalize", END)# ⚠ interrupt 必须配合 checkpointerreturn builder.compile(checkpointer=MemorySaver())# ============================================================
# 运行——分两步:先 invoke 触发中断,再 invoke 传入人类决策
# ============================================================
if __name__ == "__main__":graph = build_review_graph()config = {"configurable": {"thread_id": "review-demo-1"}}# --- 第一步:invoke——图跑到 human_review 节点中的 interrupt 就会暂停 ---print(" 启动图...")result = graph.invoke({"draft": "", "feedback": "", "final": ""},config)# 图暂停后会返回当前 State,包含 interrupt 信息print(f"\n⏸️ 图已暂停")print(f"   draft: {result['draft'][:60]}...")print(f"   feedback: {result['feedback']}")print(f"   final: {result['final']}")# --- 第二步:人类做决定后,用 Command 恢复 ---print("\n" + "=" * 50)print(" 人类审核:批准发布!")print("=" * 50)#  Command(resume=...) 传入人类的决定,图从 interrupt 的下一行继续result = graph.invoke(Command(resume="approve"),    # resume 的值就是 interrupt() 的返回值config                        #  必须用同一个 config(同一个 thread_id))print(f"\n✅ 最终结果: {result['final']}")

运行这段代码,观察控制台输出:

  1. 第一次 invoke 打印了待审核内容后没有继续——图在 interrupt() 处暂停了
  2. 第二次 invoke 传入 Command(resume="approve") 后,human_review 节点从 interrupt() 的下一行继续执行,最终走到 finalize

逐行解析

interrupt("请审核以上内容...")

这是本章最核心的 API。参数是一个字符串(人类可读的描述),返回值是 Command(resume=...) 中传入的值。关键理解:interrupt() 被调用时,它下面的代码不会立即执行——控制权交还给 Python 调用方。只有当你再次 invoke 并传入 Command(resume=...) 后,interrupt() 才会返回那个值,继续往下走。

Command(resume="approve")

Command 是恢复执行的核心原语。resume 参数的值会作为 interrupt() 的返回值。没有它,图永远停在中断点。

两次 invoke 必须用相同的 config(相同的 thread_id

因为中断时的 State 存储在 checkpoint 中,恢复时需要通过同一个 thread_id 找到它。如果换了 thread_id,LangGraph 会把它当成全新的会话——找不到中断点,从 START 开始执行。

扩展版:LocalTrend 内容审核工作流

现在把 interrupt 接入 LocalTrend 项目——Agent 搜索趋势 → 分析风格 → 生成内容 → 暂停等人类审核 → 批准后生成发布稿。

"""
第 4 章 扩展演示:LocalTrend 内容审核工作流
AI 生成 → 人类审核 → 批准/驳回 → 发布
"""
import os
from typing import TypedDict, Annotated, Literal
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.types import interrupt, Commandfrom langchain_core.messages import HumanMessage, AIMessage, ToolMessage
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
import sqlite3# ============================================================
# 配置
# ============================================================
llm = ChatOpenAI(model="deepseek-chat",api_key=os.getenv("DEEPSEEK_API_KEY", "your-api-key-here"),base_url="https://api.deepseek.com",temperature=0.7,
)# ============================================================
# State——新增审核相关字段
# ============================================================
class LocalTrendState(TypedDict):messages: Annotated[list, add_messages]iteration_count: int#  审核相关字段generated_content: str      # AI 生成的待审核内容platform: str               # 目标平台(公众号/小红书/微博...)review_decision: str        # 审核结果:approved / rejectedreview_feedback: str        # 审核意见published: bool             # 是否已发布# ============================================================
# 工具
# ============================================================
@tool
def search_trending(query: str) -> str:"""搜索当前热门话题趋势"""print(f"   [工具] 搜索: {query}")knowledge = {"AI": "AI 爆款:'Agent 开发实战'、'RAG 从零到一'、'LLM 应用架构'是近期高流量话题。","职场": "职场爆款:'AI 时代的核心竞争力'、'副业月入过万复盘'情绪共鸣强。",}for key, val in knowledge.items():if key in query:return valreturn f"关于'{query}'的趋势分析。"@tool
def analyze_style(content: str) -> str:"""分析内容的写作风格和爆款特征"""print(f"   [工具] 分析风格")return "风格:标题含数字+情感词,开头用痛点切入,正文分点论述+案例,结尾设互动问题。"tools = [search_trending, analyze_style]
llm_with_tools = llm.bind_tools(tools)# ============================================================
# 节点
# ============================================================
def agent_node(state: LocalTrendState) -> dict:"""ReAct 思考节点"""response = llm_with_tools.invoke(state["messages"])return {"messages": [response],"iteration_count": state.get("iteration_count", 0) + 1,}def tool_node(state: LocalTrendState) -> dict:"""工具执行节点"""last_msg = state["messages"][-1]tool_map = {t.name: t for t in tools}results = []for tc in last_msg.tool_calls:result = tool_map[tc["name"]].invoke(tc["args"])results.append(ToolMessage(content=result, tool_call_id=tc["id"]))return {"messages": results}def should_continue(state: LocalTrendState) -> Literal["tools", "generate", "__end__"]:"""增强路由:ReAct 结束后进入内容生成"""last_msg = state["messages"][-1]if state.get("iteration_count", 0) >= 8:return "generate"if hasattr(last_msg, "tool_calls") and last_msg.tool_calls:return "tools"#  不再直接 END,而是进入内容生成阶段return "generate"# ============================================================
#  内容生成 + 审核节点
# ============================================================
def generate_content(state: LocalTrendState) -> dict:"""基于分析结果生成内容"""print("\n[generate_content] 正在基于趋势分析生成内容...")# 从消息历史中提取关键信息context = ""for msg in state["messages"]:if hasattr(msg, "content") and msg.content:if isinstance(msg, ToolMessage):context += str(msg.content) + "\n"platform = state.get("platform", "微信公众号")prompt = HumanMessage(content=(f"基于以下趋势分析,为{platform}写一篇爆款短文(200字以内):\n{context}\n\n"f"要求:标题有吸引力,开头有痛点,结尾有互动引导。"))response = llm.invoke([prompt])print(f"  生成完成,内容长度: {len(response.content)} 字符")return {"generated_content": response.content}def review_gate(state: LocalTrendState) -> dict:""" 审核网关——Human-in-the-Loop 的核心"""content = state["generated_content"]platform = state.get("platform", "微信公众号")print(f"\n{'='*60}")print(f" 待审核内容({platform}):")print(f"{'='*60}")print(content)print(f"{'='*60}")#  中断,等待人类决策decision = interrupt(f"请审核以上 {platform} 内容。\n"f"回复 'approve' 批准发布,或输入修改意见驳回。")print(f"[review_gate] 审核结果: {decision}")if decision.lower() == "approve" or decision.lower() == "批准":return {"review_decision": "approved", "review_feedback": ""}else:return {"review_decision": "rejected", "review_feedback": decision}def handle_approved(state: LocalTrendState) -> dict:""" 批准后的发布处理"""print("\n[handle_approved] ✅ 审核通过,准备发布...")final_content = f"【已发布】\n平台:{state['platform']}\n\n{state['generated_content']}"return {"published": True, "generated_content": final_content}def handle_rejected(state: LocalTrendState) -> dict:""" 驳回后的处理——把修改意见返回给 Agent 重新生成"""print(f"\n[handle_rejected] ❌ 审核驳回")print(f"   修改意见: {state['review_feedback']}")# 构造带修改意见的消息,让 Agent 重新生成feedback_msg = HumanMessage(content=f"上一篇内容被驳回。修改意见:{state['review_feedback']}。"f"请根据意见重新生成一篇内容(200字以内)。")return {"messages": [feedback_msg],"published": False,}def review_router(state: LocalTrendState) -> Literal["approved", "rejected"]:"""根据审核结果路由"""if state["review_decision"] == "approved":return "approved"return "rejected"# ============================================================
# 构建完整的 LocalTrend 审核工作流
# ============================================================
def build_localtrend_graph(db_path: str = "localtrend_review.db"):builder = StateGraph(LocalTrendState)# 原有节点builder.add_node("agent", agent_node)builder.add_node("tools", tool_node)#  新节点builder.add_node("generate_content", generate_content)builder.add_node("review_gate", review_gate)builder.add_node("handle_approved", handle_approved)builder.add_node("handle_rejected", handle_rejected)# 图的拓扑结构builder.add_edge(START, "agent")# ReAct 循环builder.add_conditional_edges("agent", should_continue,{"tools": "tools", "generate": "generate_content"})builder.add_edge("tools", "agent")#  内容生成 → 审核网关builder.add_edge("generate_content", "review_gate")#  审核分支builder.add_conditional_edges("review_gate", review_router,{"approved": "handle_approved", "rejected": "handle_rejected"})builder.add_edge("handle_approved", END)#  驳回后回到 agent_node——利用已有的 ReAct 能力重新生成builder.add_edge("handle_rejected", "agent")conn = sqlite3.connect(db_path, check_same_thread=False)return builder.compile(checkpointer=SqliteSaver(conn))# ============================================================
# 运行演练
# ============================================================
if __name__ == "__main__":graph = build_localtrend_graph()config = {"configurable": {"thread_id": "localtrend-review-1"}}# --- 第一步:启动探索 + 生成 ---print("=" * 60)print(" LocalTrend 启动:搜索 AI 趋势 → 分析 → 生成公众号内容")print("=" * 60)result = graph.invoke({"messages": [HumanMessage(content="搜索 AI 领域的爆款趋势,然后为微信公众号写一篇爆款文章。")],"iteration_count": 0,"generated_content": "","platform": "微信公众号","review_decision": "","review_feedback": "","published": False,},config)# 图在 review_gate 中断了print(f"\n⏸️ 图已在审核网关暂停")print(f"   等待人类审核...")# --- 第二步(模拟终端交互)---print("\n" + "=" * 60)print(" 进入人工审核环节")print("=" * 60)print("在真实应用中,这里会展示给用户在 UI 上审核。")print("本演示中,我们模拟两种场景:\n")# 场景 A:批准print("--- 场景 A:批准发布 ---")result = graph.invoke(Command(resume="approve"), config)print(f"发布状态: {'已发布 ✅' if result['published'] else '未发布'}")print(f"发布内容预览: {result['generated_content'][:120]}...")# 场景 B:用新 thread 模拟驳回流程print("\n\n--- 场景 B:驳回重写 ---")config_b = {"configurable": {"thread_id": "localtrend-review-2"}}# 第一步:启动graph.invoke({"messages": [HumanMessage(content="搜索 AI 爆款趋势,写一篇小红书风格的内容。")],"iteration_count": 0,"generated_content": "","platform": "小红书","review_decision": "","review_feedback": "","published": False,},config_b)print("⏸️ 暂停,等待审核...")# 第二步:驳回(模拟主编不满意)print("\n 主编驳回:'标题不够吸引人,小红书风格需要更多 emoji 和短句'")result_b = graph.invoke(Command(resume="标题不够吸引人,小红书风格需要更多 emoji 和短句"),config_b)# 驳回后回到 agent,agent 会调用 LLM 重新搜索和生成# 再次跑到 review_gate 中断print(f"\n⏸️ 再次暂停,等待二次审核...")print(f"   新内容: {result_b['generated_content'][:120]}...")# 第三步:批准修改后的版本result_b = graph.invoke(Command(resume="approve"), config_b)print(f"\n✅ 最终发布状态: {'已发布' if result_b['published'] else '未发布'}")

运行后观察

注意场景 B 的执行路径:

  1. Agent 搜索趋势 → 分析 → 生成内容 → review_gate 中断
  2. 你传入驳回意见 → handle_rejected 将意见包装为 HumanMessage → 回到 agent_node
  3. Agent 看到驳回意见,重新调用 LLM → 这次不会再自动调用工具(因为消息历史里已经有分析结果),直接在上下文中重写
  4. 新内容再次到达 review_gate再次中断等你的最终决定

这就是一个完整的人机协作循环:AI 干活 → 人类把关 → 不行就重做 → 通过就发布。

⚠️ 常见坑:驳回后回到 agent_node 时,iteration_count 可能已经很高。如果你的 should_continue 中设置了 >= 8 的上限,驳回导致的额外轮次可能会触及这个上限。在实际项目中,应该在驳回时重置迭代计数器——练习中会让你做这个改进。


本章小结

  1. Human-in-the-Loop 是生产级 Agent 的必需品——AI 生成的内容必须有人把关,特别是涉及对外发布、资金操作等高风险场景。
  2. interrupt() 在图执行路径上设暂停点——节点函数调用它时,图保存 State 并交还控制权。
  3. Command(resume=...) 恢复暂停的图——resume 的值成为 interrupt() 的返回值,节点从下一行继续执行。
  4. Checkpointer 是 interrupt 的前置依赖——没有 checkpointer,中断的 State 无处存储。
  5. 同一个 thread_id 跨越暂停和恢复——换了 thread_id 等于找不到"案发现场"。
  6. 驳回不是失败,是工作流的一部分——通过条件路由,驳回后可以回到生成节点重做,形成 AI-人类-AI 协作循环。

关键术语

术语 释义
Human-in-the-Loop (HITL) 人机协作模式,AI 执行关键步骤前暂停等待人类决策
interrupt() LangGraph 内置函数,在节点中调用后暂停图执行,等待 Command 恢复
Command 恢复图执行的原语,resume 传值给 interrupt,update 更新 State,goto 可动态跳转
审核网关 interrupt 的典型应用——在"生成"和"发布"之间设置审核节点
GraphInterrupt interrupt 底层抛出的异常,被 LangGraph 运行时静默捕获并转为暂停
resume Command 的核心参数,其值作为 interrupt() 的返回值传递给暂停的节点

http://www.zskr.cn/news/1537848.html

相关文章:

  • 期末python作业
  • 徐州漏水检测维修权威推荐:卫生间-厨房-阳台-屋顶天花板漏水维修:靠谱防水补漏公司团队TOP5推荐(2026最新深度调研实测榜单) - 即刻修防水
  • 微信群投票怎么发起,西瓜评选+云帆投票+腾讯投票,投票平台深度对比测评 - 投票小程序
  • 2026 沈阳 大连 RFID 公司推荐榜:双城高新企业,专业团队自研芯片、快速上门服务 - 资讯快报
  • 知识图谱增强RAG:构建可推理、可解释的结构化问答系统
  • 2026年广州留学中介推荐,GET OFFER就是性价比天花板 - 资讯快报
  • 6/16
  • 探秘正宗江西芝麻灰产地货源,解锁高品质石材背后的行业秘密与实用采购指南 - 资讯快报
  • 综合能力实训6.11
  • Anthropic 经典指南:如何构建有效的 AI Agent——从简单模式到自主系统
  • 当11个AI都押比利时赢,只有一个说了“不”——阶跃星辰凭什么敢反共识?
  • COCO转YOLO格式:坐标归一化与类别映射实战指南
  • 一体化污水处理设备谁家口碑好?你想知道的都在这 - 资讯快报
  • 2026年6月 口碑好的 烟台正规出国留学机构、烟台小语种培训机构排行 实测资质服务资源对比 - 起跑123
  • 2026宁波黄金回收门店TOP5:大盘价回收渠道盘点 - 宁波早知道
  • 靠谱焊工培训怎么选?信誉过硬机构实测避坑指南 - 湖南阳光技术
  • 图形工作站替代方案解析:云飞云云桌面承载三维建模的数据安全体系
  • 2026广州窗户隔热膜服务商综合实力排名及选购指南 - 资讯纵览
  • 邯郸夜间宠物医院如何选择? - 资讯纵览
  • 2026 优质工业油雾 / 油烟净化器供应商推荐榜单|食品行业油烟治理源头厂家甄选 - 资讯快报
  • 询盘翻10倍:广州企业短视频获客案例解析 - 资讯快报
  • 6月11号
  • DPA Classifier表管理实战:从哈希表到预填充表的设计与API应用
  • Django REST Framework实战:从零构建企业级API服务
  • 2026 沈阳塑钢窗选购:5 家靠谱品牌实测指南 - 资讯纵览
  • 完整部署指南:在OpenMind框架下运行Hebrew-GPT2-345M-Stage
  • 2026 浙江台州三门橡塑交通制品工厂 TOP5 推荐 源头大厂实力盘点 - 资讯快报
  • 嘉兴漏水检测维修权威推荐:卫生间-厨房-阳台-屋顶天花板漏水维修:靠谱防水补漏公司团队TOP5推荐(2026最新深度调研实测榜单) - 即刻修防水
  • Kronos金融时序预测模型:如何用AI破解市场语言,实现精准量化交易
  • 2026年口碑好的 最新的 烟台职教高考、春季高考培训学校排行:合规性与升学实力实测对比 - 起跑123