文章目录
- HumanInTheLoopMiddleware — 人工审批
- 代码示例
HumanInTheLoopMiddleware — 人工审批
作用:在 Agent 执行特定工具前暂停等待人工审批,支持批准、编辑或拒绝。
前提条件:需要配置checkpointer(检查点器)来维护中断状态。
构造参数:
| 参数 | 类型 | 说明 |
|---|---|---|
interrupt_on | dict | 工具名到审批配置的映射。True表示需要审批,False表示不需要,字典可指定allowed_decisions |
代码示例:
fromlangchain.agentsimportcreate_agentfromlangchain.agents.middlewareimportHumanInTheLoopMiddlewarefromlanggraph.checkpoint.memoryimportInMemorySaverfromsrc.utils.configimportConfigfromrichimportprintasrprint model=Config.get_default_model()defyour_read_email_tool(email_id:str)->str:"""Mock function to read an email by its ID."""returnf"Email content for ID:{email_id}"defyour_send_email_tool(recipient:str,subject:str,body:str)->str:"""Mock function to send an email."""returnf"Email sent to{recipient}with subject '{subject}'"agent=create_agent(model=model,tools=[your_read_email_tool,your_send_email_tool],checkpointer=InMemorySaver(),middleware=[HumanInTheLoopMiddleware(interrupt_on={"your_send_email_tool":{"allowed_decisions":["approve","edit","reject"],},"your_read_email_tool":True,}),],)config={"configurable":{"thread_id":"demo-thread-1"}}forchunkinagent.stream({"messages":[{"role":"user","content":"请读取邮件ID为123的邮件"}]},config=config,):rprint(chunk)# 运行时,当 Agent 尝试调用 send_email 时会中断# 人工可以通过 Command 对象审批/修改/拒绝代码示例
importjsonimportsysfromlangchain.agentsimportcreate_agentfromlangchain.agents.middlewareimportHumanInTheLoopMiddlewarefromlanggraph.checkpoint.memoryimportInMemorySaverfromlanggraph.typesimportCommandfromsrc.utils.configimportConfig model=Config.get_default_model()defyour_read_email_tool(email_id:str)->str:"""读取指定ID的邮件内容"""emails={"123":"【邮件 #123】发件人: 王经理 | 主题: 项目进展 | 明天下午3点开会,请提前准备Q2报告。","456":"【邮件 #456】发件人: HR | 主题: 年假通知 | 您的年假还剩5天,请于12月底前使用完毕。",}returnemails.get(email_id,f"[系统] 未找到 ID={email_id}的邮件")defyour_send_email_tool(recipient:str,subject:str,body:str)->str:"""发送邮件"""returnf"✅ 邮件已发送 | 收件人:{recipient}| 主题:{subject}"defyour_delete_email_tool(email_id:str)->str:"""删除指定ID的邮件"""returnf"🗑️ 邮件 ID={email_id}已删除"# ============================================================# 创建 agent# ============================================================checkpointer=InMemorySaver()agent=create_agent(model=model,tools=[your_read_email_tool,your_send_email_tool,your_delete_email_tool],checkpointer=checkpointer,middleware=[HumanInTheLoopMiddleware(interrupt_on={"your_send_email_tool":{"allowed_decisions":["approve","edit","reject"],},"your_delete_email_tool":{"allowed_decisions":["approve","reject"],},"your_read_email_tool":True,}),],)# ============================================================# 安全 input — 兼容管道输入(非 TTY 时 EOF 返回默认值)# ============================================================def_safe_input(prompt:str,default:str="")->str:"""安全的 input 封装,管道模式下 EOF 不会崩溃,返回默认值"""ifnotsys.stdin.isatty():try:val=input(prompt)returnvalifval.strip()elsedefaultexceptEOFError:returndefaultreturninput(prompt).strip()ordefault# ============================================================# 交互式 HITL 流程# ============================================================defrun_with_hitl(user_message:str,thread_id:str):""" 执行 agent 并在遇到中断时通过终端交互收集人工决策。 """config={"configurable":{"thread_id":thread_id}}input_data:dict|Command={"messages":[{"role":"user","content":user_message}]}whileTrue:interrupt_data=Noneforchunkinagent.stream(input_data,config=config):if"__interrupt__"inchunk:interrupt_data=chunk["__interrupt__"][0].valueelse:_print_chunk(chunk)ifinterrupt_dataisNone:break# 没有中断,流程结束# ---------- 收集人工决策 ----------decisions=_collect_decisions(interrupt_data)input_data=Command(resume={"decisions":decisions})def_print_chunk(chunk:dict):"""打印 stream 输出 — 区分真实工具执行和中间件注入的人工决策消息"""fornode_name,node_outputinchunk.items():ifnode_outputisNone:continuemessages=node_output.get("messages",[])formsginmessages:msg_type=type(msg).__name__ content=getattr(msg,"content","")name=getattr(msg,"name",None)tool_calls=getattr(msg,"tool_calls",None)status=getattr(msg,"status",None)# --- AIMessage ---ifmsg_type=="AIMessage"andtool_calls:# 中间件输出的 AIMessage 是修改后的副本(已在上轮打印过),跳过ifnode_name=="HumanInTheLoopMiddleware.after_model":continuefortcintool_calls:print(f" 🤖 LLM 计划调用 →{tc['name']}({_fmt_args(tc['args'])})")elifmsg_type=="AIMessage"andcontent:print(f" 🤖 LLM 回复 →{content}")# --- ToolMessage ---elifmsg_type=="ToolMessage":ifnode_name=="HumanInTheLoopMiddleware.after_model":# 中间件注入的人工决策消息 — 工具并未真正执行ifstatus=="error":print(f" 🚫 人工拒绝 [{name}] →{content[:120]}")else:print(f" ✅ 人工批准 [{name}] → 即将执行")else:# 真实工具执行结果icon="❌"ifstatus=="error"else"🔧"print(f"{icon}工具执行 [{name}] →{content[:120]}")# --- HumanMessage ---elifmsg_type=="HumanMessage":print(f" 👤 用户 →{content}")def_collect_decisions(interrupt_data:dict)->list[dict]:"""在终端中交互式收集人工决策"""requests=interrupt_data["action_requests"]configs=interrupt_data["review_configs"]_print_separator(f"🔔 需要人工审批 — 共{len(requests)}个工具调用")decisions=[]fori,(req,cfg)inenumerate(zip(requests,configs)):tool_name=req["name"]tool_args=req["args"]allowed=cfg["allowed_decisions"]print(f"\n 📌 工具 [{i+1}/{len(requests)}]:{tool_name}")print(f" 参数:{_fmt_args(tool_args)}")print(f" 可选:{', '.join(allowed)}")decision=_prompt_decision(tool_name,tool_args,allowed)decisions.append(decision)print(f" ✅ 已记录 →{_describe_decision(decision)}")_print_separator("")returndecisionsdef_prompt_decision(tool_name:str,tool_args:dict,allowed:list[str])->dict:"""提示用户输入决策"""whileTrue:choice=_safe_input(" 👉 请输入操作: ").strip().lower()ifchoicenotinallowed:print(f" ⚠️ 无效操作,可选:{', '.join(allowed)}")continueifchoice=="approve":return{"type":"approve"}ifchoice=="edit":if"edit"notinallowed:print(" ⚠️ 该工具不支持 edit")continueprint(f" 📝 请输入新参数 JSON (回车保留原值):")print(f" 原参数:{_fmt_args(tool_args)}")new_json=_safe_input(" → ")new_args=_parse_edit_args(tool_args,new_json)return{"type":"edit","edited_action":{"name":tool_name,"args":new_args}}ifchoice=="reject":reason=_safe_input(" 💬 拒绝理由 (可留空): ")decision={"type":"reject"}ifreason:decision["message"]=reasonreturndecisionifchoice=="respond":msg=_safe_input(" 💬 回复内容: ").strip()ifnotmsg:print(" ⚠️ respond 必须填写回复内容")continuereturn{"type":"respond","message":msg}def_parse_edit_args(original:dict,new_json:str)->dict:"""解析编辑后的参数 — 接受 JSON 或 key=value 格式"""ifnotnew_json:returndict(original)new_json=new_json.strip()# 尝试 JSON 格式: {"recipient": "李四"}ifnew_json.startswith("{"):try:merged=dict(original)merged.update(json.loads(new_json))returnmergedexceptjson.JSONDecodeError:print(f" ⚠️ JSON 解析失败,保留原值")returndict(original)# key=value 格式: recipient=李四result=dict(original)forpartinnew_json.split(","):part=part.strip()if"="inpart:key,_,value=part.partition("=")key,value=key.strip(),value.strip()ifkeyinresult:result[key]=_coerce_type(value,result[key])returnresultdef_coerce_type(value_str:str,original):"""尝试将字符串还原为原始类型"""ifisinstance(original,bool):returnvalue_str.lower()in("true","yes","1")ifisinstance(original,int):try:returnint(value_str)exceptValueError:returnvalue_strifisinstance(original,float):try:returnfloat(value_str)exceptValueError:returnvalue_strreturnvalue_strdef_describe_decision(d:dict)->str:"""人类可读的决策摘要"""t=d["type"]ift=="approve":return"批准 ✅"ift=="edit":returnf"编辑 →{_fmt_args(d['edited_action']['args'])}"ift=="reject":msg=d.get("message","")returnf"拒绝 ❌{msg}"ift=="respond":returnf"人工回复 💬{d.get('message','')}"returntdef_fmt_args(args:dict)->str:returnjson.dumps(args,ensure_ascii=False)def_print_separator(title:str):w=55iftitle:pad=max(0,(w-len(title)-2)//2)print("\n"+"─"*pad+f"{title}"+"─"*pad)else:print("─"*w)# ============================================================# 交互式演示入口# ============================================================if__name__=="__main__":print("="*55)print(" 🤝 Human-in-the-Loop 交互式演示")ifnotsys.stdin.isatty():print(" (管道模式 — 从 stdin 读取决策)")print("="*55)# ---- 场景1:读邮件 ----print("\n📬 场景1: 用户请求读取邮件")print("-"*55)run_with_hitl("帮我读取邮件ID为123的邮件",thread_id="thread-1")# ---- 场景2:发邮件 ----print("\n📧 场景2: 用户请求发送邮件")print("-"*55)run_with_hitl("帮我给张三发一封邮件,主题: 请假,内容: 明天请假一天",thread_id="thread-2")# ---- 场景3:删邮件 ----print("\n🗑️ 场景3: 用户请求删除邮件")print("-"*55)run_with_hitl("帮我删除邮件ID为456的邮件",thread_id="thread-3")print("\n"+"="*55)print(" ✅ 演示结束,感谢配合!")print("="*55)