智能体RAG客服系统4——middleware部分开发与agent主体开发
目录
一、三大中间件开发
1.Monitor2(工具执行监控)
2.Log Before Model(模型运行前日志打印)
3.report prompt switch(动态提示词切换)
二、Agent主体构建
1.初始化配置
2.流式执行方法 execute_stream
三、功能测试
1. 常规问答测试
2. 报告生成测试
四、总结与后续
一、三大中间件开发
项目共设计三个中间件,统一在agent_tool目录下新建middleware文件实现,各中间件功能及开发逻辑如下:
1.Monitor2(工具执行监控)
基于WRAP2 call装饰器开发,入参为request(工具请求数据封装)和handler(待执行函数),需导入ToolCallRequest、langchain.tools、langgraph、Callable及Python内置typing相关数据类型。
核心逻辑:return的是handler(request),中间的业务逻辑可以自定义。
这里写的就是前后打印日志(logger),记录调用工具名称、入参;捕获运行异常并输出错误信息,同时向上抛出异常终止程序,实现全流程工具调用监控。
from typing import Callable from utils.prompt_loader import load_system_prompts, load_report_prompts from langchain.agents import AgentState from langchain.agents.middleware import wrap_tool_call, before_model, dynamic_prompt, ModelRequest from langchain.tools.tool_node import ToolCallRequest from langchain_core.messages import ToolMessage from langgraph.runtime import Runtime from langgraph.types import Command from utils.logger_handler import logger@wrap_tool_call def monitor_tool( # 请求的数据封装 request: ToolCallRequest, # 执行的函数本身 handler: Callable[[ToolCallRequest], ToolMessage | Command], ) -> ToolMessage | Command: # 工具执行的监控 logger.info(f"[tool monitor]执行工具:{request.tool_call['name']}") logger.info(f"[tool monitor]传入参数:{request.tool_call['args']}") try: result = handler(request) logger.info(f"[tool monitor]工具{request.tool_call['name']}调用成功") if request.tool_call['name'] == "fill_context_for_report": request.runtime.context["report"] = True return result except Exception as e: logger.error(f"工具{request.tool_call['name']}调用失败,原因:{str(e)}") raise e2.Log Before Model(模型运行前日志打印)
导入before model装饰器,入参为Agent State(智能体状态的记录)与RunTime(执行上下文中的信息)。
功能分为两级日志:Info级别统计传入模型的消息总数;设为Debug级别,只打印最新消息内容与消息类型,其他啰嗦重复的不打印,该级别日志默认不展示,仅用于调试。中间件执行完毕后返回None。
@before_model def log_before_model( state: AgentState, # 整个Agent智能体中的状态记录 runtime: Runtime, # 记录了整个执行过程中的上下文信息 ): # 在模型执行前输出日志 logger.info(f"[log_before_model]即将调用模型,带有{len(state['messages'])}条消息。") logger.debug(f"[log_before_model]{type(state['messages'][-1]).__name__} | {state['messages'][-1].content.strip()}") return None3.report prompt switch(动态提示词切换)
用户用着用着可能想生成报告,该工具就是用于识别用户生成报告的需求,自动切换专属提示词。
◦ 基础规则:系统默认使用通用提示词main prompt,这个主提示词是帮助我们回答用户的问题以及输出规则、调用工具的说明。主提示词核心思考准则中有一条:
4. 【报告生成强约束】 若明确判断用户核心需求为生成 / 查询个人使用报告,需严格遵循「获取用户 ID→获取报告月份→调用 fill_context_for_report 工具→调用 fetch_external_data 工具」的固定执行流程,fill_context_for_report 为报告生成的必调用前置工具,未调用该工具禁止执行后续的 fetch_external_data 工具调用及报告生成操作。规则要求模型识别报告生成意图后,必须依次调用获取用户ID、获取报告月份、Fill context for report、fetch external data四个工具。
◦状态标记:在agent_tools.py新增工具Fill context for report,该工具无入参、无实际业务逻辑,仅用于触发监控。
无入参,无返回值,调用后触发中间件自动为报告生成的场景动态注入上下文信息,为后续提示词切换提供上下文信息
@tool(description="无入参,无返回值,调用后触发中间件自动为报告生成的场景动态注入上下文信息,为后续提示词切换提供上下文信息") def fill_context_for_report(): return "fill_context_for_report已调用"工具被调用后,会在RunTime上下文字典中注入report: true标记(默认值为false)。
◦ 提示词切换:通过dynamic report prompt装饰器,读取上下文内的report标记。标记为true则加载报告专用提示词,反之沿用通用系统提示词。
@dynamic_prompt # 每一次在生成提示词之前,调用此函数 def report_prompt_switch(request: ModelRequest): # 动态切换提示词 is_report = request.runtime.context.get("report", False) if is_report: # 是报告生成场景,返回报告生成提示词内容 return load_report_prompts() return load_system_prompts()二、Agent主体构建
在Agent目录新建react_Agent.py文件,定义react Agent类完成智能体搭建。
1.初始化配置
借助langchain的create_agent方法创建Agent实例,依次导入模型model工厂(通义千问Qwen3 Max)、系统提示词prompt、业务工具集tools、三大中间件middleware。
from langchain.agents import create_agent from model.factory import chat_model from utils.prompt_loader import load_system_prompts from agent.tools.agent_tools import (rag_summarize, get_weather, get_user_location, get_user_id, get_current_month, fetch_external_data, fill_context_for_report) from agent.tools.middleware import monitor_tool, log_before_model, report_prompt_switch业务工具包含:
rag_summarize, get_weather, get_user_location, get_user_id, get_current_month, fetch_external_data, fill_context_for_report
RAG总结函数、获取天气、获取用户位置、获取用户ID、获取当前月份、抓取外部数据、Fill context for report。
中间件包含:
monitor_tool, log_before_model, report_prompt_switch
工具执行监控、模型运行前打印日志、动态切换报告提示词。
from langchain.agents import create_agent from model.factory import chat_model from utils.prompt_loader import load_system_prompts from agent.tools.agent_tools import (rag_summarize, get_weather, get_user_location, get_user_id, get_current_month, fetch_external_data, fill_context_for_report) from agent.tools.middleware import monitor_tool, log_before_model, report_prompt_switch class ReactAgent: def __init__(self): self.agent = create_agent( model=chat_model, system_prompt=load_system_prompts(), tools=[rag_summarize, get_weather, get_user_location, get_user_id, get_current_month, fetch_external_data, fill_context_for_report], middleware=[monitor_tool, log_before_model, report_prompt_switch], )2.流式执行方法 execute_stream
◦ 封装用户提问为标准消息字典messages,包含role和content,初始化上下文{"report": false},避免标记判断报错。
◦ 调用Agent的stream流式接口执行任务,stream_mode为values
第三个参数context就是上下文runtime中的信息,就是我们做提示词切换的标记
通过迭代器yield逐段返回模型输出内容,仅推送非空消息内容。
def execute_stream(self, query: str): input_dict = { "messages": [ {"role": "user", "content": query}, ] } # 第三个参数context就是上下文runtime中的信息,就是我们做提示词切换的标记 for chunk in self.agent.stream(input_dict, stream_mode="values", context={"report": False}): latest_message = chunk["messages"][-1] if latest_message.content: yield latest_message.content.strip() + "\n"三、功能测试
1. 常规问答测试
测试问题:查询扫地机器人在当前地区气温下的保养方法。
if __name__ == '__main__': agent = ReactAgent() for chunk in agent.execute_stream("查询扫地机器人在当前地区气温下的保养方法"): print(chunk, end="", flush=True)执行流程:Agent依次调用获取位置、获取天气、RAG知识库检索等工具,结合多轮工具调用结果,ReAct思考-行动流程,最终输出答案。控制台仅展示Info级别日志,详细Debug日志会同步写入本地日志文件。
2. 报告生成测试
测试问题:生成个人使用报告。
if __name__ == '__main__': agent = ReactAgent() for chunk in agent.execute_stream("生成个人使用报告"): print(chunk, end="", flush=True)执行流程:模型识别报告生成意图,调用指定工具并修改上下文标记,触发提示词自动切换;随后调用获取用户ID、月份、外部数据等工具,结合RAG检索补充信息,最终按照报告专属提示词要求,输出Markdown格式报告。通过自定义打印标识,可验证提示词切换逻辑正常生效。
四、总结与后续
当前中间件、Agent均开发调试完成,代码可实现流式结果输出。后续将基于Streamlit开发交互式网页,为普通用户提供可视化使用界面。
