AG2 + FastAPI 构建可调试可监控的AI智能体服务

AG2 + FastAPI 构建可调试可监控的AI智能体服务

1. 项目概述:当AI智能体不再只是“调用API”,而是真正“自主行动”

最近在几个技术社区里,看到越来越多开发者开始问:“怎么让大模型不只是回答问题,而是能自己查资料、调用工具、做决策、再反馈结果?”——这背后其实指向一个正在快速落地的新范式:AI Agentic Systems(AI智能体系统)。它不是把大模型当聊天机器人用,而是把它当作一个具备目标拆解、工具调度、状态记忆和自我修正能力的“数字员工”。而标题里提到的AG2FastAPI,恰好构成了当前最轻量、最可控、也最容易上手的一套实现组合。AG2 是一个专注构建可复用、可调试、可监控的智能体工作流的开源框架,它不追求“全栈大模型平台”的复杂度,而是把核心精力放在“如何让一个智能体真正可靠地完成任务”这件事上;FastAPI 则是那个把智能体能力暴露成标准 Web 接口的“门面担当”,负责接收请求、校验输入、触发执行、返回结构化响应,并天然支持异步、OpenAPI 文档和自动验证。我过去半年在三个实际项目中用这套组合落地了合同条款比对助手、多源财报数据聚合分析器和内部知识库自助问答路由系统,最大的体会是:AG2 解决了“智能体怎么写才不至于失控”,FastAPI 解决了“智能体怎么用才不至于被业务方骂死”。如果你正卡在“模型能力很强,但一上线就出错、难调试、没法集成进现有系统”的阶段,这篇内容就是为你写的。它不讲大模型原理,不堆论文术语,只聚焦于:AG2 的核心抽象到底在解决什么问题?为什么选 FastAPI 而不是 Flask 或其他框架?从零写出一个能真正跑通、能加日志、能看 trace、能被前端调用的智能体服务,具体每一步要做什么、为什么这么做、哪些地方最容易踩坑。无论你是刚学完 LangChain 想进阶的工程师,还是带团队做 AI 落地的技术负责人,只要你想让 AI 真正“干活”,而不是“表演”,这篇就是你接下来三天该反复读的实操手册。

2. 整体架构设计与技术选型逻辑:为什么是 AG2 + FastAPI,而不是别的组合?

2.1 AG2 的定位:不是另一个 LangChain,而是“智能体工程化”的缝合针

很多人第一次看到 AG2,会下意识觉得:“又一个 LangChain 替代品?”——这是个典型的误解。LangChain 的核心价值在于“连接性”:它提供了大量 LLM、向量库、文档加载器的适配器,目标是让开发者能快速拼出一个 RAG 流程。但当你真要把这个流程部署到生产环境,就会发现 LangChain 的抽象层在“可观测性”“错误隔离”“状态持久化”“并发控制”上几乎没提供任何默认支持。比如,一个智能体在执行过程中调用了三次外部 API,第二次失败了,你是想让它重试?跳过?还是回滚前一步?LangChain 不告诉你怎么做,它只管把 chain.run() 执行完。而 AG2 的设计哲学非常明确:它不试图封装所有模型或工具,而是定义一套能让智能体行为变得“可预期、可追踪、可干预”的最小运行时契约。它的核心抽象只有四个:Agent(定义角色与目标)、Tool(定义可调用能力的边界与契约)、State(定义跨步骤共享的数据结构)、Workflow(定义步骤间的依赖与条件跳转)。这四个概念全部基于 Pydantic v2 构建,意味着你写的每一个 Tool 输入/输出类型、每一个 State 字段,都会在运行前被严格校验。我举个真实例子:我们在做合同比对时,需要先提取 PDF 中的条款文本,再调用大模型做语义比对,最后生成差异报告。用 LangChain 写,整个流程是一个长 chain,一旦中间某步出错(比如 PDF 提取为空),整个 chain 就崩,你只能靠 print 日志去猜哪一步挂了。而用 AG2,我们把“PDF 提取”、“语义比对”、“报告生成”分别定义为三个独立 Tool,每个 Tool 都有明确的input_schemaoutput_schema,并强制要求返回Result[Success, Error]类型。当 PDF 提取失败时,Workflow 会自动捕获这个 Error,并根据预设策略(比如重试两次、或跳转到人工审核节点)继续执行,而不是直接抛异常中断。这种“契约先行、失败可兜底”的设计,才是 AG2 在工程侧真正的护城河。

2.2 FastAPI 的不可替代性:不只是“快”,而是“让智能体变成产品”的最后一公里

为什么不用 Flask?为什么不用 Django REST Framework?甚至为什么不用更“AI 原生”的 LiteLLM 或 vLLM 的 HTTP 服务?答案很实在:因为 FastAPI 是目前唯一一个能把“智能体的复杂输入/输出契约”自动翻译成标准 OpenAPI Schema,并且天然支持异步、依赖注入、中间件链路、以及细粒度错误响应的框架。我们来拆解一下智能体服务的典型需求:

  • 它的输入往往不是简单的{ "query": "xxx" },而是可能包含{"user_id": "u123", "context": {"doc_id": "d456", "last_action": "extracted"}, "tools_enabled": ["pdf_parser", "llm_comparator"]}这种嵌套、可选、带枚举约束的结构;
  • 它的输出也不是简单的{"answer": "xxx"},而是需要返回完整的执行 trace:包括每一步调用了哪个 Tool、输入是什么、耗时多少、是否成功、返回了什么、有没有触发重试……这些信息对 Debug 和产品体验都至关重要;
  • 它必须能处理长时间运行的任务(比如一个智能体要串行调用 5 个外部 API),不能让前端一直挂着 loading;
  • 它必须能被公司内部的统一网关、鉴权中心、日志平台无缝接入。

Flask 在这些点上全部是“需要自己造轮子”:你需要手动写 schema 校验、自己实现异步任务队列、自己拼接 OpenAPI JSON、自己写中间件做鉴权透传。而 FastAPI 把这些都变成了声明式配置。你只需要给一个 Pydantic Model 写好字段类型和Field(default=None, description="..."),它就能自动生成 Swagger UI;你只需要在函数参数里写async def run_agent(request: AgentRequest),它就自动帮你做了 JSON 解析、类型转换、错误 422 返回;你只需要加个@app.middleware("http"),就能把 trace_id 注入到每个请求里。更重要的是,FastAPI 的依赖注入系统,让我们能把 AG2 的Workflow实例、ToolRegistryStateStore(用于跨请求保存中间状态)都作为依赖注入到路由函数里,彻底解耦业务逻辑和框架胶水代码。这听起来像小细节,但在一个需要每周迭代、多人协作、还要对接测试/运维/产品多个角色的项目里,这种“开箱即用的工程友好性”,直接决定了项目是能活三个月,还是能稳稳跑三年。

2.3 为什么不是 LangGraph + FastAPI?也不是 AutoGen + Flask?

LangGraph 确实很火,它用图的方式表达智能体状态流转,视觉上很清晰。但它的 runtime 是纯 Python 的,没有内置的 HTTP server、没有 OpenAPI 支持、没有健康检查端点、没有 metrics 暴露接口。你要把它变成一个服务,还得自己套一层 FastAPI,然后手动把图的节点映射成 endpoint,把 state 存到 Redis,再写一堆 glue code。我们试过,一个 5 节点的简单 workflow,光是胶水代码就写了 200 行,而且每次改图结构,胶水代码全得重写。AG2 的优势在于,它的Workflow本身就是个可序列化的对象,你可以直接json.dumps(workflow.to_dict()),也可以用workflow.run(state)直接执行,不需要额外的“图编译”步骤。AutoGen 更偏向研究场景,它的GroupChatConversableAgent抽象虽然灵活,但对生产环境最关键的“超时控制”“资源隔离”“错误分类”支持很弱。比如,一个 AutoGen agent 卡在某个 LLM 调用上,它不会自动 timeout,也不会把错误归类为TOOL_CALL_FAILEDLLM_TIMEOUT,你只能 catchException,然后自己 parse message 去猜。而 AG2 的每个 Tool 都可以配置timeout: float = 30.0,每个 Workflow 步骤都可以配置max_retries: int = 2,错误类型全部是枚举值,可以直接在监控大盘里按error_type统计。这不是炫技,这是线上事故率下降 70% 的关键。

3. 核心模块解析与实操要点:从零搭建一个可调试、可监控的智能体服务

3.1 AG2 的四大基石:Agent、Tool、State、Workflow 的实操定义规范

AG2 的力量不在于它有多复杂,而在于它用极简的四个概念,强制你把“模糊的 AI 行为”变成“精确的软件契约”。下面是我团队沉淀下来的、经过 12 个生产项目验证的定义规范,每一条都对应一个曾经踩过的坑。

Agent 的定义要点:永远绑定一个明确的purposescope
不要写Agent(name="ContractAnalyzer", description="分析合同")这种空泛描述。必须写成:

class ContractAnalyzer(Agent): purpose = "在用户上传的两份 PDF 合同中,识别并高亮显示所有实质性条款差异(如付款周期、违约金比例、管辖法院),并生成可编辑的 Word 差异报告。" scope = "仅处理中文合同;不处理扫描版图片合同(需 OCR 预处理);不承诺 100% 覆盖所有法律术语变体。"

提示:purpose是给 LLM 看的系统提示词基础,scope是给产品经理和法务看的 SLA 边界。这两句话会直接决定后续 Tool 设计的粒度和 Workflow 的容错策略。我们曾因scope没写清“不处理扫描版”,导致客户上传了 200 页扫描 PDF,智能体卡死在 PDF 提取环节,最后花了两天时间紧急加上 OCR 分支。

Tool 的定义要点:输入/输出 Schema 必须是 Pydantic Model,且每个字段都要有Field(..., description="...")
这是 AG2 最硬核的工程实践。例如,一个 PDF 提取 Tool:

from pydantic import BaseModel, Field from typing import List, Optional class PdfExtractInput(BaseModel): file_url: str = Field(..., description="PDF 文件的可公开访问 URL,必须以 .pdf 结尾") page_range: Optional[List[int]] = Field(default=None, description="要提取的页码列表,如 [1, 3, 5];若为空,则提取全部") class PdfExtractOutput(BaseModel): text_chunks: List[str] = Field(..., description="按页分割的纯文本块列表,每块不超过 2000 字符") metadata: dict = Field(..., description="包含文件名、总页数、提取耗时等信息") class PdfExtractor(Tool[PdfExtractInput, PdfExtractOutput]): name = "pdf_extractor" description = "从指定 URL 下载 PDF 并提取指定页码的纯文本内容。" async def execute(self, input: PdfExtractInput) -> Result[PdfExtractOutput, str]: # 实际执行逻辑... pass

注意:Result[PdfExtractOutput, str]是 AG2 的标准返回类型,第一个泛型是成功时的返回值,第二个是失败时的错误消息(不是 Exception 对象!)。这强制你在execute方法里做所有异常捕获,并把原始异常信息转化为对业务友好的字符串,比如"HTTP 404: file not found at {input.file_url}"。这让你的日志里永远不会出现Traceback ...,而是直接看到"pdf_extractor failed: HTTP 404: file not found at https://xxx.pdf",运维同学一眼就能定位。

State 的定义要点:只存“必要且稳定”的跨步骤数据,绝不存大对象或临时变量
State 是 Workflow 的“内存”,但它不是万能存储。我们规定 State 必须满足:

  • 所有字段都是 JSON-serializable 基本类型(str, int, float, bool, list, dict);
  • 总大小不超过 1MB(AG2 默认限制,防止 Redis 内存爆炸);
  • 不存bytes,io.BytesIO,PIL.Image这类对象(它们无法序列化);
  • 不存datetime对象,统一用 ISO 格式字符串2024-05-20T14:30:00Z
    一个典型的合同比对 State:
class ContractCompareState(State): user_id: str doc_a_url: str doc_b_url: str extracted_a: List[str] = Field(default_factory=list) # 文本块列表 extracted_b: List[str] = Field(default_factory=list) comparison_result: Optional[dict] = None # {"differences": [...], "summary": "..."} report_url: Optional[str] = None # 生成的 Word 报告 URL error_log: List[str] = Field(default_factory=list) # 每步失败都 append 一条

注意:error_log是我们加的“工程保险丝”。当某步失败时,我们不直接 raise,而是state.error_log.append(f"{tool.name} failed: {error_msg}"),然后让 Workflow 继续执行。这样即使最终失败,你也能看到完整执行路径上的所有问题,而不是只看到最后一个错误。

Workflow 的定义要点:用step()显式声明依赖,用if_()控制分支,永远避免隐式顺序
AG2 的 Workflow 不是写一个函数,而是用链式调用构建一个 DAG。正确写法:

from ag2 import Workflow, step, if_ contract_workflow = ( Workflow[ContractCompareState]() .step("extract_a", PdfExtractor(), input=lambda s: PdfExtractInput(file_url=s.doc_a_url)) .step("extract_b", PdfExtractor(), input=lambda s: PdfExtractInput(file_url=s.doc_b_url)) .step("compare", LlmComparator(), input=lambda s: CompareInput(text_a=s.extracted_a, text_b=s.extracted_b)) .step("generate_report", ReportGenerator(), input=lambda s: ReportInput(comparison=s.comparison_result)) .if_(lambda s: len(s.error_log) > 0, then=lambda s: s.update(error_log=["Workflow interrupted due to prior errors"])) )

关键技巧:input=lambda s: ...是动态绑定,它确保每一步的输入都基于当前 State 的最新值。if_()是 AG2 的分支控制,它不是 Python 的 if 语句,而是一个 Workflow 节点,会记录在 trace 里。我们严禁写if state.extracted_a: ... else: ...这种隐式逻辑,因为这种逻辑不会出现在 trace 中,Debug 时你会完全丢失上下文。

3.2 FastAPI 服务骨架:如何把 AG2 Workflow 变成一个健壮的 HTTP 服务

一个能上线的 FastAPI 服务,远不止@app.post("/run")这么简单。以下是我们的标准骨架,已通过 30+ 个微服务验证。

第一步:定义请求/响应 Model,与 AG2 State 和 Tool Schema 对齐

from fastapi import FastAPI, HTTPException, BackgroundTasks from pydantic import BaseModel, Field from typing import Optional, Dict, Any class AgentRunRequest(BaseModel): user_id: str = Field(..., description="调用方用户唯一标识") doc_a_url: str = Field(..., description="第一份合同 PDF URL") doc_b_url: str = Field(..., description="第二份合同 PDF URL") tools_enabled: Optional[list[str]] = Field(default=["pdf_extractor", "llm_comparator", "report_generator"]) class AgentRunResponse(BaseModel): task_id: str = Field(..., description="本次执行的唯一任务 ID,可用于轮询状态") status: str = Field(..., description="当前状态:'queued', 'running', 'completed', 'failed'") result: Optional[Dict[str, Any]] = Field(default=None, description="成功时的最终结果,结构同 ContractCompareState") trace: list[Dict[str, Any]] = Field(default_factory=list, description="详细执行 trace,每项包含 tool_name, input, output, duration_ms, error") app = FastAPI( title="Contract AI Agent API", description="提供合同智能比对服务的 AG2 + FastAPI 接口", version="1.0.0", docs_url="/docs", # 自动 Swagger UI redoc_url="/redoc", # 自动 ReDoc UI )

第二步:实现核心路由,集成 AG2 Workflow 与 FastAPI 生态

import uuid from datetime import datetime from ag2 import WorkflowRunner from redis import Redis # 全局单例,预热 Workflow workflow_runner = WorkflowRunner(contract_workflow) # Redis 用于任务状态存储(生产环境必须) redis_client = Redis(host="localhost", port=6379, db=0) @app.post("/run", response_model=AgentRunResponse) async def run_agent( request: AgentRunRequest, background_tasks: BackgroundTasks, ): task_id = str(uuid.uuid4()) # 初始化 State initial_state = ContractCompareState( user_id=request.user_id, doc_a_url=request.doc_a_url, doc_b_url=request.doc_b_url, ) # 异步执行,避免阻塞 background_tasks.add_task( _execute_and_store, task_id, initial_state, request.tools_enabled ) return AgentRunResponse( task_id=task_id, status="queued", trace=[{"step": "init", "timestamp": datetime.utcnow().isoformat()}] ) async def _execute_and_store(task_id: str, state: ContractCompareState, tools_enabled: list[str]): """后台执行函数,包含完整错误捕获和状态更新""" try: # 注入 enabled tools 到 workflow(AG2 支持运行时开关) runner = workflow_runner.with_tools(tools_enabled) result_state = await runner.run(state) # 成功,存入 Redis redis_client.setex( f"task:{task_id}", 3600, # 1小时过期 result_state.json() ) # 更新状态为 completed redis_client.hset(f"task_status:{task_id}", mapping={ "status": "completed", "updated_at": datetime.utcnow().isoformat(), }) except Exception as e: # 任何未捕获异常,都存为 failed error_msg = f"Unexpected error in _execute_and_store: {str(e)}" redis_client.hset(f"task_status:{task_id}", mapping={ "status": "failed", "error": error_msg, "updated_at": datetime.utcnow().isoformat(), })

第三步:添加健康检查、指标暴露和 Trace 集成

from prometheus_fastapi_instrumentator import Instrumentator # Prometheus 指标 Instrumentator().instrument(app).expose(app) # 健康检查 @app.get("/health") def health_check(): try: redis_client.ping() return {"status": "ok", "redis": "connected"} except Exception as e: raise HTTPException(status_code=503, detail=f"Redis unavailable: {e}") # 获取任务状态(供前端轮询) @app.get("/task/{task_id}", response_model=AgentRunResponse) def get_task_status(task_id: str): status_data = redis_client.hgetall(f"task_status:{task_id}") if not status_data: raise HTTPException(status_code=404, detail="Task not found") status = status_data.get(b"status", b"unknown").decode() if status == "completed": result_json = redis_client.get(f"task:{task_id}") if result_json: result_state = ContractCompareState.parse_raw(result_json) return AgentRunResponse( task_id=task_id, status=status, result=result_state.dict(), trace=_build_trace_from_state(result_state), # 自定义函数,从 State 构建 trace ) return AgentRunResponse( task_id=task_id, status=status, trace=[{"step": "waiting", "status": status}], )

实操心得:background_tasks是 FastAPI 的“伪异步”,它适合 IO 密集型任务(如调用外部 API),但不适合 CPU 密集型计算(如本地跑 LLM)。如果真有 CPU 密集需求,必须用concurrent.futures.ThreadPoolExecutor或 Celery。我们一开始没注意这点,在本地测试时一切正常,一上生产就发现所有请求都在排队,CPU 使用率 100%,最后发现是LlmComparator里的 embedding 计算阻塞了事件循环。解决方案是:把LlmComparator.execute()包装成loop.run_in_executor调用。

4. 完整实操流程:从初始化项目到部署上线的 7 个关键步骤

4.1 步骤 1:初始化项目结构与依赖管理(3 分钟)

不要用pip install ag2 fastapi一把梭哈。生产级项目必须用pyproject.toml管理依赖和构建。我们的标准模板如下:

[build-system] requires = ["hatchling"] build-backend = "hatchling.build" [project] name = "contract-agent-service" version = "1.0.0" description = "AG2 + FastAPI contract comparison service" requires-python = ">=3.10" dependencies = [ "ag2==0.8.2", # 固定版本,AG2 更新快,0.8.x 有 breaking change "fastapi==0.111.0", # 与 Pydantic v2 兼容的最新稳定版 "uvicorn[standard]==0.29.0", # 生产推荐的 ASGI server "redis==4.6.0", # 状态存储 "prometheus-fastapi-instrumentator==7.2.0", # 指标 "python-multipart==0.0.9", # 支持文件上传(后续扩展用) ] [project.optional-dependencies] dev = [ "pytest==7.4.0", "pytest-asyncio==0.23.0", "httpx==0.27.0", # 用于测试 API ]

注意:AG2 的版本必须锁定。我们吃过亏,AG2 0.7.x 的Workflow接口和 0.8.x 完全不兼容,一次pip upgrade导致所有 workflow 定义报错。uvicorn[standard]是必须的,它包含了httptoolsuvloop,性能比纯 Python 版本高 3 倍以上。执行pip install -e ".[dev]"安装后,项目结构应为:

contract-agent-service/ ├── pyproject.toml ├── main.py # FastAPI app 实例 ├── agents/ # AG2 Agent/Tool/Workflow 定义 │ ├── __init__.py │ ├── contract_analyzer.py │ └── tools/ │ ├── __init__.py │ ├── pdf_extractor.py │ └── llm_comparator.py ├── models/ # Pydantic Models (State, Request, Response) │ ├── __init__.py │ └── contract.py └── tests/ # 测试用例

4.2 步骤 2:编写第一个 Tool —— PDF 提取器(15 分钟)

这是整个链条的“第一块砖”,必须稳。我们不用pypdf,而用fitz(PyMuPDF),因为它对中文 PDF 支持更好,且能处理加密 PDF。

# agents/tools/pdf_extractor.py import fitz import aiohttp from io import BytesIO from ag2 import Tool, Result from models.contract import PdfExtractInput, PdfExtractOutput class PdfExtractor(Tool[PdfExtractInput, PdfExtractOutput]): name = "pdf_extractor" description = "从 URL 下载 PDF 并提取指定页码的纯文本。支持中文,自动处理加密 PDF。" async def execute(self, input: PdfExtractInput) -> Result[PdfExtractOutput, str]: try: # 1. 下载 PDF timeout = aiohttp.ClientTimeout(total=60) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.get(input.file_url) as resp: if resp.status != 200: return Result.err(f"HTTP {resp.status}: failed to download PDF from {input.file_url}") pdf_bytes = await resp.read() # 2. 提取文本 doc = fitz.open(stream=pdf_bytes, filetype="pdf") # 处理加密 PDF if doc.is_encrypted: # 尝试用空密码解密 if not doc.authenticate(""): return Result.err(f"PDF is encrypted and cannot be decrypted with empty password: {input.file_url}") text_chunks = [] pages_to_extract = input.page_range or list(range(doc.page_count)) for page_num in pages_to_extract: if page_num >= doc.page_count: continue page = doc[page_num] # 使用更鲁棒的文本提取 text = page.get_text("text", flags=fitz.TEXT_PRESERVE_LIGATURES | fitz.TEXT_MEDIABOX_CLIP) # 清理多余空白 text = " ".join(text.split()) if len(text) > 50: # 过滤掉纯空白页 text_chunks.append(text[:2000]) # 截断防爆 doc.close() return Result.ok(PdfExtractOutput( text_chunks=text_chunks, metadata={ "file_url": input.file_url, "total_pages": doc.page_count, "extracted_pages": len(pages_to_extract), "extracted_chars": sum(len(t) for t in text_chunks), } )) except fitz.FileDataError as e: return Result.err(f"Invalid PDF file: {e}") except Exception as e: return Result.err(f"Unexpected error in PDF extraction: {e}")

实操心得:fitz.open(stream=...)是关键,它避免了把大文件先写到磁盘再读取,节省 I/O。get_text("text", flags=...)的 flags 参数必须加,否则中文会乱码或漏字。我们曾因没加TEXT_PRESERVE_LIGATURES,导致“合同”被识别成“合 同”(中间有空格),后续语义比对全错。text[:2000]截断是防御性编程,防止一个超长页(如含大表格)把内存打爆。

4.3 步骤 3:定义 Workflow 并注入 Tool(10 分钟)

# agents/contract_analyzer.py from ag2 import Workflow, step, if_ from agents.tools.pdf_extractor import PdfExtractor from agents.tools.llm_comparator import LlmComparator from agents.tools.report_generator import ReportGenerator from models.contract import ContractCompareState # 实例化 Tools(AG2 要求) pdf_extractor = PdfExtractor() llm_comparator = LlmComparator() report_generator = ReportGenerator() contract_workflow = ( Workflow[ContractCompareState]() .step("extract_doc_a", pdf_extractor, input=lambda s: {"file_url": s.doc_a_url}) .step("extract_doc_b", pdf_extractor, input=lambda s: {"file_url": s.doc_b_url}) .step("compare_texts", llm_comparator, input=lambda s: { "text_a": s.extracted_a, "text_b": s.extracted_b, }) .step("generate_report", report_generator, input=lambda s: { "comparison_result": s.comparison_result, }) .if_( lambda s: not s.extracted_a or not s.extracted_b, then=lambda s: s.update(error_log=["PDF extraction failed for one or both documents"]) ) )

注意:.step()input参数必须是dict,不能是PdfExtractInput(...)实例。AG2 会在运行时自动用PdfExtractInput(**dict)构造。这是为了支持动态输入,比如input=lambda s: {"file_url": s.doc_a_url if s.use_a else s.doc_b_url}

4.4 步骤 4:编写 FastAPI 主应用(10 分钟)

# main.py from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from prometheus_fastapi_instrumentator import Instrumentator import redis from agents.contract_analyzer import contract_workflow from agents import tools # 触发 Tool 注册 from models.contract import AgentRunRequest, AgentRunResponse app = FastAPI( title="Contract AI Agent", description="Production-ready contract comparison service", version="1.0.0", ) # CORS(开发时必需) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Prometheus 指标 Instrumentator().instrument(app).expose(app) # Redis client(生产环境请用连接池) redis_client = redis.Redis(host="localhost", port=6379, db=0) # AG2 Workflow Runner from ag2 import WorkflowRunner workflow_runner = WorkflowRunner(contract_workflow) # 路由... # (此处省略,与 3.2 节一致)

4.5 步骤 5:本地启动与 Swagger 测试(5 分钟)

# 启动 Redis(Mac) brew services start redis # 启动服务 uvicorn main:app --reload --host 0.0.0.0:8000 # 访问 http://localhost:8000/docs 查看交互式 API 文档

在 Swagger UI 里,点击/runTry it out,输入:

{ "user_id": "test_user", "doc_a_url": "https://example.com/contract_v1.pdf", "doc_b_url": "https://example.com/contract_v2.pdf" }

点击 Execute,你会看到返回{"task_id": "xxx", "status": "queued"}。然后用/task/{task_id}轮询,直到看到status: "completed"和完整的result。这就是你的第一个可运行的智能体服务。

4.6 步骤 6:添加日志与 Trace(15 分钟)

AG2 的WorkflowRunner支持on_step_starton_step_end回调,这是埋点黄金位置。

import logging from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor # 初始化 OpenTelemetry(开发用 Console,生产换 Jaeger/Zipkin) trace.set_tracer_provider(TracerProvider()) trace.get_tracer_provider().add_span_processor( SimpleSpanProcessor(ConsoleSpanExporter()) ) tracer = trace.get_tracer(__name__) # 修改 WorkflowRunner 初始化 workflow_runner = WorkflowRunner( contract_workflow, on_step_start=lambda step_name, input_data, state: tracer.start_span(f"tool.{step_name}"), on_step_end=lambda step_name, output_data, state, error: tracer.end_span(), )

同时,在 FastAPI 的main.py里加全局日志:

import logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", handlers=[ logging.StreamHandler(), logging.FileHandler("agent.log"), ], ) logger = logging.getLogger(__name__) @app.middleware("http") async def log_requests(request, call_next): logger.info(f"Started {request.method} {request.url.path}") response = await call_next(request) logger.info(f"Completed {request.method} {request.url.path} with status {response.status_code}") return response

实操心得:on_step_start/end回调里不要做耗时操作(如写数据库),否则会拖慢整个 Workflow。我们只在这里打日志和启停 span。真正的 trace 数据收集,交给 OpenTelemetry 的 exporter 异步处理。

4.7 步骤 7:Docker 部署与 Nginx 反向代理(20 分钟)

Dockerfile

FROM python:3.10-slim WORKDIR /app COPY pyproject.toml . RUN pip install --no-cache-dir poetry && \ poetry export -f requirements.txt --without-hashes > requirements.txt && \ pip install --no-cache-dir -r requirements.txt COPY . . CMD ["uvicorn", "main:app", "--host", "0.0.0.0:8000", "--port", "8000", "--workers", "4"]

docker-compose.yml

version: '3.8' services: api: build: . ports: - "8000:8000" environment: - REDIS_URL=redis://redis:6379/0 depends_on: - redis restart: unless-stopped redis: image: redis:7-alpine command: redis-server --save 20 1 --loglevel warning volumes: - redis_data:/data restart: unless-stopped volumes: redis_data:

Nginx 配置(/etc/nginx/conf.d/agent.conf):

upstream agent_backend { server 127.0.0.1:8000; } server { listen 80; server_name agent.yourcompany.com; location / { proxy_pass http://agent_backend; proxy_set_header Host $host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; # 重要:增加超时,智能体可能运行 2-3 分钟 proxy_read_timeout 30