1. 项目概述:为什么我们需要一个并发测试工具?
最近在做一个AI问答系统的测试项目,客户的核心诉求很明确:系统上线前,必须验证在高并发用户提问场景下的稳定性和响应质量。简单用Postman或者单线程脚本跑一下,根本模拟不出真实压力。市面上成熟的压测工具,像JMeter、LoadRunner,对这类需要模拟真实浏览器交互、处理动态Token、执行复杂前端逻辑的场景,配置起来又异常繁琐,脚本维护成本高。
于是,我们决定自己动手,用 Python + Playwright 造一个轮子。这个工具的目标不是取代专业的性能测试工具,而是填补一个精准的缺口——针对需要复杂UI交互和状态保持的Web应用(尤其是AI对话类应用),进行高保真、可编程的并发压力测试。它不仅能模拟成百上千个“真实用户”同时打开网页、登录、连续提问,还能精准收集每个请求的响应时间、成功率,甚至对AI回答的内容进行基础的质量校验。
如果你也在为类似的问题头疼——比如,想测试你的ChatGPT套壳应用在100人同时使用时会不会卡顿、丢会话,或者想验证一个智能客服系统在流量高峰时的表现——那么这套实践方案或许能给你提供一个清晰的解决路径。接下来,我会从设计思路、关键技术拆解、代码实现到踩坑实录,完整地分享这次开发实战。
2. 工具整体设计与核心思路拆解
2.1 核心需求与挑战分析
在动手之前,我们先明确要解决的具体问题和面临的挑战:
- 真实并发模拟:需要模拟数十甚至上百个独立用户会话,每个会话包含完整的登录、保持登录状态、连续多轮问答。这与简单的HTTP接口压测有本质区别,因为涉及浏览器上下文(Cookies, LocalStorage)、WebSocket连接等状态的隔离。
- 复杂交互的自动化:AI问答界面通常不是简单的表单提交。它可能涉及:
- 动态元素等待(如“正在思考”的加载动画消失)。
- 消息流的持续监听(回答是逐字输出还是一次性返回)。
- 文件上传、代码块渲染等富交互组件的操作。
- 数据采集与断言:测试不仅要看系统是否“挂掉”,还要关注业务指标:
- 性能指标:单次问答响应时间(Time to First Token, Time to Last Token)、页面加载时间。
- 业务指标:问答是否成功(网络请求成功且前端无报错)、回答内容是否相关(可设置简单关键词匹配或调用NLP服务做基础校验)。
- 可维护性与可配置性:测试脚本需要易于根据测试场景(并发用户数、提问内容、持续时间)进行配置,并且当被测系统前端发生UI变更时,脚本能相对容易地适配。
2.2 技术选型:为什么是Python + Playwright?
面对上述挑战,我们评估了几种方案:
- Selenium Grid:经典方案,但需要维护一个Selenium Hub和Node集群,资源消耗大,且对于大规模并发,管理和调试比较麻烦。
- Puppeteer Cluster:基于Node.js,能力很强,但团队Python技术栈更熟悉,且希望测试逻辑能与后端的数据分析脚本更好地集成。
- Playwright for Python:最终胜出。理由如下:
- 强大的异步支持:Playwright Python API 原生支持
asyncio,这是实现高效并发的基石。我们可以用asyncio.gather轻松管理数百个并发的浏览器上下文(Context)和页面(Page),而无需自己管理复杂的线程池。 - 自动等待与可靠性:Playwright 的操作(如
click,fill)内置了智能等待,会等待元素可操作、网络空闲等,大大减少了编写time.sleep的需求,让测试脚本更健壮。 - 多浏览器支持与无头模式:一套脚本可跑 Chromium, Firefox, WebKit。无头(Headless)模式在服务器上运行资源占用低,且现代无头模式已能完美执行几乎所有渲染和交互。
- 丰富的调试工具:
playwright codegen可以录制脚本,playwright inspector可以实时调试,trace viewer可以录制并可视化整个操作过程,这对编写和排查复杂交互脚本极其友好。 - 网络拦截与Mock:可以轻松拦截和修改网络请求,这对于模拟异常场景(如网络延迟、API失败)或注入测试数据非常有用。
- 强大的异步支持:Playwright Python API 原生支持
因此,技术栈确定为:Python 3.8+ + Playwright + asyncio + pytest(可选,用于组织测试用例)。
2.3 架构设计:一个轻量级并发测试运行器
我们的工具不追求大而全的测试平台,核心是一个并发的测试任务运行器。其架构可以抽象为以下几个模块:
- 配置管理模块:读取YAML或JSON配置文件,定义全局参数,如基础URL、总并发用户数、每个用户的提问次数、提问间隔、提问语料库、登录账号列表等。
- 用户会话池:核心模块。负责创建和管理一批独立的“虚拟用户”。每个用户是一个独立的
asyncio.Task,绑定一个Playwright的BrowserContext。Context天然隔离了Cookies、本地存储,完美模拟独立用户。 - 业务流程封装:将登录、提问、登出等操作封装成可重用的异步函数。每个虚拟用户任务就是循环执行这些业务流程。
- 监控与数据收集器:在每个关键操作步骤(如页面加载、请求发送、响应接收)打点,记录耗时和结果状态。数据可以实时输出到控制台,也可以写入CSV、数据库或时序数据库(如InfluxDB)供后续分析。
- 结果汇总与报告生成器:所有虚拟用户任务结束后,汇总成功率、平均响应时间、百分位数(P95, P99)等指标,生成HTML或Markdown格式的测试报告。
这个架构的优点是清晰、轻量、易于扩展。你可以很容易地增加新的业务流程(如“先搜索再提问”),或者更换数据收集的后端。
3. 核心细节解析与实操要点
3.1 环境搭建与Playwright初始化
工欲善其事,必先利其器。第一步是搭建一个稳定可复现的测试环境。
# 1. 创建项目并初始化虚拟环境(强烈推荐) python -m venv venv source venv/bin/activate # Linux/Mac # venv\Scripts\activate # Windows # 2. 安装核心依赖 pip install playwright pip install pytest pytest-asyncio # 如果使用pytest组织用例 pip install pyyaml # 用于读取YAML配置 pip install pandas # 用于数据处理和报告生成 # 3. 安装Playwright所需的浏览器内核 playwright install chromium # 通常安装Chromium就够了,更轻量关键点与避坑指南:
- 虚拟环境是必须的:避免全局Python包污染,尤其是Playwright会安装浏览器,放在虚拟环境里管理更干净。
- 选择Chromium:对于自动化测试,Chromium足够稳定且性能最好。Firefox和WebKit可能在渲染细节上有差异,除非你有跨浏览器测试需求,否则用Chromium即可。
- 网络问题:
playwright install可能会因为网络问题失败。可以尝试设置环境变量使用国内镜像,或者手动下载浏览器包。最稳的办法是提前在有网的环境下载好,再拷贝到测试服务器。 - 无头模式:生产环境运行务必使用无头模式(
headless=True),可以节省大量资源。调试时可以先设为False观察浏览器行为。
3.2 实现高并发:asyncio与Playwright Context的配合
这是整个工具最核心的部分。目标是创建N个并发的虚拟用户,每个用户互不干扰。
import asyncio from playwright.async_api import async_playwright async def single_user_task(user_id, config): """ 单个虚拟用户的测试任务 :param user_id: 用户标识,可用于区分数据和登录账号 :param config: 全局配置字典 """ async with async_playwright() as p: # 为每个用户启动一个独立的浏览器实例(更彻底隔离,但资源消耗大) # browser = await p.chromium.launch(headless=True) # 更推荐:共用一个Browser实例,但为每个用户创建独立的Context(资源利用率高,且足够隔离) browser = await p.chromium.launch(headless=True) context = await browser.new_context() page = await context.new_page() try: # 1. 登录 await login(page, user_id, config) # 2. 执行多轮问答 for i in range(config['questions_per_user']): question = get_question(i, user_id, config) # 从语料库获取问题 start_time = asyncio.get_event_loop().time() answer = await ask_question(page, question) end_time = asyncio.get_event_loop().time() record_metrics(user_id, i, question, answer, end_time-start_time) await asyncio.sleep(config['think_time']) # 模拟用户思考间隔 # 3. 登出(可选) await logout(page) except Exception as e: print(f"User {user_id} 任务失败: {e}") record_failure(user_id, e) finally: await context.close() await browser.close() async def main(config): """ 主函数,并发运行所有用户任务 """ tasks = [] for i in range(config['concurrent_users']): task = asyncio.create_task(single_user_task(i, config)) tasks.append(task) # 等待所有并发任务完成 await asyncio.gather(*tasks, return_exceptions=True) # return_exceptions防止一个任务失败导致整个程序崩溃 print("所有用户任务执行完毕") generate_report() if __name__ == "__main__": config = load_config("config.yaml") asyncio.run(main(config))核心要点解析:
- Browser vs Context vs Page:
Browser:对应一个浏览器进程。创建成本高。Context:浏览器上下文。相当于一个独立的“隐身会话”,拥有独立的Cookie、本地存储、缓存。这是我们实现用户隔离的核心。所有用户共享同一个Browser进程,但各自拥有独立的Context,这在资源消耗和隔离性上取得了最佳平衡。Page:Context中的标签页。一个Context可以有多个Page。
asyncio.create_task与asyncio.gather:这是实现并发的关键。create_task将协程函数包装成任务,放入事件循环中调度。gather则并发地运行所有这些任务,并等待它们全部完成。return_exceptions=True参数至关重要,它确保即使某个虚拟用户任务因为网络波动或页面元素未找到而抛出异常,也不会影响其他用户任务的执行,主程序会收集所有异常,最后统一处理。这保证了测试的健壮性。- 资源限制:并发数不是越高越好。每个
Context都会占用内存和CPU。你需要根据测试机器的配置(内存、CPU核心数)来调整concurrent_users。一个经验值是,在8GB内存的机器上,稳定运行50-100个Chromium Context是可行的,但需要监控内存使用情况,防止OOM(内存溢出)。
3.3 编写健壮的页面交互函数
模拟用户操作是UI自动化的基本功,但对于动态加载的AI问答页面,需要格外小心。
async def ask_question(page, question): """ 在页面上完成一次提问并获取回答 """ # 1. 定位输入框并输入问题 - 使用更稳定的定位方式 # 避免使用易变的文本或索引,优先使用data-testid等测试属性,或稳定的CSS选择器 input_selector = "textarea[placeholder*='输入']" # 使用属性包含匹配 # 或者:input_selector = "[data-testid='question-input']" await page.wait_for_selector(input_selector, state="visible", timeout=10000) await page.fill(input_selector, question) # 2. 点击发送按钮 # 同样,优先寻找有明确标识的按钮 send_button_selector = "button:has-text('发送')" # 文本匹配 # 或者:send_button_selector = "button[type='submit']" await page.click(send_button_selector) # 3. 等待AI回答开始出现(这是关键!) # 策略一:等待“正在思考”之类的加载元素出现再消失 thinking_selector = ".thinking-indicator, [aria-label*='思考']" try: await page.wait_for_selector(thinking_selector, state="visible", timeout=5000) await page.wait_for_selector(thinking_selector, state="hidden", timeout=30000) # 等待其消失 except Exception as e: print(f"未检测到明确的‘思考中’状态,继续后续等待。") # 可能系统没有加载提示,直接进入下一步 # 策略二:等待回答区域出现新内容(更通用) answer_container_selector = ".chat-message-bot:last-of-type .content" # 等待新回答的容器出现 await page.wait_for_selector(answer_container_selector, timeout=30000) # 4. 获取完整的回答文本 # 对于流式输出,可能需要等待文本稳定 last_answer_text = "" for _ in range(10): # 最多轮询10次,每次间隔1秒 await asyncio.sleep(1) current_text = await page.text_content(answer_container_selector) if current_text and current_text != last_answer_text: last_answer_text = current_text else: # 文本连续两次没有变化,认为回答完毕 break # 5. (可选)简单的内容断言 if len(last_answer_text.strip()) < 5: raise AssertionError(f"回答内容过短或为空: {last_answer_text}") # 可以加入关键词检查,例如检查是否包含“错误”、“抱歉”等异常词汇 if "error" in last_answer_text.lower() or "sorry" in last_answer_text.lower(): print(f"警告:回答中可能包含错误信息: {last_answer_text[:100]}...") return last_answer_text避坑经验:
- 选择器策略:绝对不要依赖元素的绝对CSS路径(如
div > div:nth-child(3) > span),前端一个微小的改动就会导致脚本失效。优先与开发团队约定使用>async def ask_question_with_monitor(page, question): # 监听特定的API请求 response_promise = page.wait_for_response(lambda response: "/api/chat" in response.url) # ... 执行提问操作 ... response = await response_promise if response.status != 200: raise Exception(f"问答API请求失败: {response.status}") # 还可以解析response.json()来获取更精确的服务器响应时间
4. 实操过程与核心环节实现
4.1 配置化驱动测试
将测试参数外置到配置文件,使工具无需修改代码就能适应不同测试场景。
config.yaml示例:
base_url: "https://your-ai-app.com" concurrent_users: 20 ramp_up_time: 10 # 并发用户启动的斜坡时间(秒),避免瞬间打满 questions_per_user: 10 think_time_range: [1, 5] # 用户每次提问后的随机等待时间范围(秒) timeout_per_action: 30000 # 每个操作(如等待回答)的超时时间(毫秒) login: enabled: true url: "/login" username_selector: "#username" password_selector: "#password" submit_selector: "button[type='submit']" # 用户凭证池,可以从外部文件读取 credentials: - {username: "user1@test.com", password: "pass123"} - {username: "user2@test.com", password: "pass123"} question_pool: # 问题列表,可以是一个文件路径,或直接写在配置里 file: "./data/questions.txt" # 或者 # questions: # - "请解释一下量子计算的基本原理。" # - "用Python写一个快速排序函数。" metrics: output_format: "csv" # csv, json, influxdb csv_path: "./results/test_run_{{timestamp}}.csv"在代码中,使用pyyaml加载配置,并使用string.Template或jinja2来处理配置中的动态变量(如{{timestamp}})。
4.2 实现数据收集与监控
没有数据,性能测试就失去了意义。我们需要在关键路径上埋点。
import csv import time from datetime import datetime class MetricsCollector: def __init__(self, output_path): self.output_path = output_path self._init_csv() def _init_csv(self): with open(self.output_path, 'w', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow(['timestamp', 'user_id', 'question_id', 'question', 'answer_length', 'response_time_ms', 'status', 'error']) def record_success(self, user_id, q_id, question, answer, response_time): """记录一次成功的问答""" with open(self.output_path, 'a', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow([ datetime.now().isoformat(), user_id, q_id, question[:100], # 截断防止过长 len(answer), round(response_time * 1000, 2), # 秒转毫秒 'SUCCESS', '' ]) def record_failure(self, user_id, q_id, question, error_msg): """记录一次失败的问答""" with open(self.output_path, 'a', newline='', encoding='utf-8') as f: writer = csv.writer(f) writer.writerow([ datetime.now().isoformat(), user_id, q_id, question[:100], 0, 0, 'FAILURE', str(error_msg)[:200] # 截断错误信息 ]) # 在 single_user_task 中使用 collector = MetricsCollector(config['metrics']['csv_path']) # ... 在ask_question成功后 ... collector.record_success(user_id, i, question, answer, end_time-start_time)进阶监控:对于更复杂的场景,可以集成influxdb-client库,将实时指标写入InfluxDB,然后用Grafana制作实时监控看板,观察测试过程中系统的TPS、响应时间曲线、错误率等。
4.3 测试执行与资源管理
直接运行main函数可能会因为并发数过高导致机器资源耗尽。我们需要引入一些控制机制。
import asyncio import signal class ConcurrentTestRunner: def __init__(self, config): self.config = config self.tasks = [] self.stop_signal = False async def _ramp_up_users(self): """斜坡式启动用户,避免对系统造成瞬间冲击""" users_per_batch = max(1, self.config['concurrent_users'] // 10) delay = self.config['ramp_up_time'] / (self.config['concurrent_users'] / users_per_batch) for i in range(0, self.config['concurrent_users'], users_per_batch): if self.stop_signal: break batch = [] for j in range(users_per_batch): if i + j < self.config['concurrent_users']: user_id = i + j task = asyncio.create_task(single_user_task(user_id, self.config)) batch.append(task) self.tasks.extend(batch) print(f"已启动批次 {i//users_per_batch + 1}, 当前活跃任务数: {len(self.tasks)}") await asyncio.sleep(delay) # 批次间等待 async def run(self): """运行测试""" print(f"开始测试,计划并发用户数: {self.config['concurrent_users']}") await self._ramp_up_users() print("所有用户任务已启动,等待执行完毕...") # 等待所有任务完成,或收到停止信号 done, pending = await asyncio.wait(self.tasks, return_when=asyncio.FIRST_EXCEPTION, timeout=self.config.get('test_duration', None)) # 处理结果... def stop(self): """优雅停止""" self.stop_signal = True for task in self.tasks: task.cancel() print("收到停止信号,正在取消任务...") # 处理Ctrl+C信号,优雅退出 def signal_handler(runner): def handler(signum, frame): print("\n检测到中断信号,开始优雅停止...") runner.stop() return handler async def main(): config = load_config("config.yaml") runner = ConcurrentTestRunner(config) # 注册信号处理器 signal.signal(signal.SIGINT, signal_handler(runner)) try: await runner.run() except asyncio.CancelledError: print("测试被取消") finally: await generate_report(config) # 即使被中断,也尝试生成部分报告 if __name__ == "__main__": asyncio.run(main())这个ConcurrentTestRunner类提供了两个重要特性:
- 斜坡启动:分批启动虚拟用户,模拟真实世界中用户逐渐涌入的场景,比瞬间并发更能暴露一些资源竞争问题。
- 优雅停止:捕获
Ctrl+C信号,取消所有正在运行的任务,并尝试生成一份包含已收集数据的测试报告,避免测试数据丢失。
5. 常见问题与排查技巧实录
在实际开发和使用过程中,我们踩过不少坑。这里总结几个最具代表性的问题及其解决方案。
5.1 问题一:并发数上去后,出现大量超时或浏览器崩溃
- 现象:当设置
concurrent_users=100时,运行几分钟后,大量任务报TimeoutError或TargetClosedError(浏览器页面崩溃)。 - 根因分析:
- 内存耗尽:每个Playwright Context都会消耗内存(几十到上百MB)。100个Context可能吃掉数GB内存,导致系统开始交换(Swap),性能急剧下降,最终浏览器进程被OOM Killer终止。
- CPU过载:浏览器渲染和JavaScript执行是CPU密集型任务。并发数超过CPU核心数太多,会导致严重的上下文切换开销,所有任务都变慢,触发超时。
- 端口耗尽:每个网络请求都可能打开一个临时端口。超高并发下,可能耗尽本地可用端口范围。
- 解决方案:
- 限制并发数:根据测试机器硬件配置设定一个合理的上限。一个粗略的公式:
最大并发数 ≈ (可用内存GB * 1024) / 每个Context预估内存MB。例如,8GB内存预留2GB给系统,每个Context预估80MB,则(6*1024)/80 ≈ 76。建议从较低并发数(如20)开始,逐步增加,同时监控系统资源(htop,nmon)。 - 优化Context配置:创建Context时,可以禁用不必要的功能来节省资源:
context = await browser.new_context( viewport={'width': 1280, 'height': 720}, # 固定视口大小 java_script_enabled=True, # 必须为True ignore_https_errors=True, # 忽略HTTPS证书错误(测试环境) # 禁用图片、视频、字体加载,大幅提升速度并减少内存 bypass_csp=True, # 绕过内容安全策略(谨慎使用) ) # 更激进的资源拦截 await context.route("**/*.{png,jpg,jpeg,svg,gif,woff,woff2}", lambda route: route.abort()) - 使用更轻量的浏览器:Playwright的
chromium.launch可以传递args参数来禁用更多功能,如--disable-gpu,--disable-dev-shm-usage(在Docker中常用),--no-sandbox。 - 分布式执行:如果单机资源无法满足,可以考虑将用户任务分发到多台机器上执行,需要一个中心节点来协调任务分配和汇总结果。
- 限制并发数:根据测试机器硬件配置设定一个合理的上限。一个粗略的公式:
5.2 问题二:元素定位失败,脚本不稳定
- 现象:脚本有时能成功,有时失败,报错
Error: locator.click: Timeout 30000ms exceeded。 - 根因分析:
- 页面加载时间波动:网络延迟或后端响应慢导致页面元素出现时间超过脚本设置的默认等待时间。
- 动态内容导致选择器失效:前端框架(如React, Vue)在数据更新时可能会改变DOM结构或属性,使得之前稳定的选择器失效。
- 竞争条件:在元素还未完全可交互(如禁用状态)时就尝试点击。
- 解决方案:
- 增加智能等待:不要滥用
page.wait_for_timeout(5000)这种固定等待。使用Playwright内置的等待条件:# 等待元素可见并可交互 await page.locator("button.submit").wait_for(state="visible") await page.locator("button.submit").wait_for(state="attached") # 甚至可以直接在操作中等待 await page.locator("button.submit").click(timeout=10000) # 给click操作本身设置更长超时 - 使用更稳健的定位器:
- 文本定位:
page.locator("text=登录")。但注意文本可能变化或国际化。 - CSS + 文本:
page.locator("button:has-text('发送')")。 - XPath:虽然强大但易碎,谨慎使用。
page.locator("xpath=//button[@aria-label='Send message']")。 - 最佳实践:与前端开发约定,为关键测试元素添加
>import functools import asyncio async def retry_on_failure(func, max_attempts=3, delay=1): for attempt in range(max_attempts): try: return await func() except Exception as e: if attempt == max_attempts - 1: raise print(f"尝试 {func.__name__} 失败 (第{attempt+1}次),{delay}秒后重试... 错误: {e}") await asyncio.sleep(delay) # 使用 await retry_on_failure(lambda: page.click("button.submit"))
- 文本定位:
5.3 问题三:如何验证AI回答的内容质量?
- 挑战:性能测试关注“快不快”,但功能测试还要关注“对不对”。如何自动化判断AI的回答是否相关、准确?
- 解决方案(根据投入成本分级):
- 基础校验(低成本):
- 非空检查:回答长度大于阈值。
- 关键词匹配:对于特定问题,检查回答中是否包含预期的关键词。例如,问“Python的创始人是谁?”,检查回答中是否包含“Guido van Rossum”。
- 否定词检查:检查是否包含“抱歉”、“我不知道”、“错误”等词汇,这可能表示模型未能回答问题。
- 中级校验(中等成本):
- 嵌入向量相似度:使用句子转换器(如
sentence-transformers)将问题和回答都转换为向量,计算余弦相似度。设定一个阈值,低于阈值则认为不相关。这比关键词匹配更灵活。 - 调用另一个LLM进行评估:使用一个轻量级或免费的LLM API(如OpenAI GPT-3.5-turbo、Claude Haiku),设计Prompt让其判断“回答是否针对问题”。这种方法更智能,但成本较高且速度慢。
- 嵌入向量相似度:使用句子转换器(如
- 高级校验(高成本,离线进行):
- 将测试中收集的所有问答对保存下来,由人工或更复杂的评估流程进行事后分析。这通常用于版本间的回归测试对比。
- 基础校验(低成本):
在我们的工具中,我们实现了基础校验和简单的嵌入相似度校验,作为可选的断言模块。关键在于,这些校验不能严重影响测试执行速度,所以复杂的评估应异步进行或放在测试后分析阶段。
5.4 问题四:测试报告不够直观
- 痛点:CSV文件数据虽然详细,但不够直观,难以快速发现问题。
- 解决方案:使用
pandas+matplotlib或plotly生成可视化报告。
将生成的图表嵌入到HTML报告中,可以一目了然地看到性能趋势和瓶颈点。import pandas as pd import matplotlib.pyplot as plt def generate_visual_report(csv_path): df = pd.read_csv(csv_path) # 计算总体指标 total_requests = len(df) success_rate = (df['status'] == 'SUCCESS').sum() / total_requests * 100 avg_response_time = df[df['status']=='SUCCESS']['response_time_ms'].mean() p95_response_time = df[df['status']=='SUCCESS']['response_time_ms'].quantile(0.95) print(f"总请求数: {total_requests}") print(f"成功率: {success_rate:.2f}%") print(f"平均响应时间: {avg_response_time:.2f} ms") print(f"P95响应时间: {p95_response_time:.2f} ms") # 绘制响应时间分布直方图 plt.figure(figsize=(10, 6)) success_df = df[df['status']=='SUCCESS'] plt.hist(success_df['response_time_ms'], bins=50, edgecolor='black', alpha=0.7) plt.axvline(avg_response_time, color='r', linestyle='--', label=f'平均 ({avg_response_time:.0f}ms)') plt.axvline(p95_response_time, color='g', linestyle='--', label=f'P95 ({p95_response_time:.0f}ms)') plt.xlabel('响应时间 (ms)') plt.ylabel('频次') plt.title('AI问答响应时间分布') plt.legend() plt.grid(True, alpha=0.3) plt.savefig('./results/response_time_distribution.png', dpi=150) plt.close() # 绘制随时间变化的成功率折线图(如果数据有时间戳) if 'timestamp' in df.columns: df['timestamp'] = pd.to_datetime(df['timestamp']) df.set_index('timestamp', inplace=True) # 按分钟聚合成功率 success_series = df['status'].resample('1min').apply(lambda x: (x=='SUCCESS').sum() / len(x) * 100) plt.figure(figsize=(12, 5)) success_series.plot() plt.axhline(95, color='r', linestyle='--', label='SLA 95%') plt.ylabel('成功率 (%)') plt.title('每分钟成功率趋势') plt.legend() plt.grid(True, alpha=0.3) plt.tight_layout() plt.savefig('./results/success_rate_trend.png', dpi=150) plt.close()
开发这个工具的过程,是一个典型的“用自动化解决自动化测试中的痛点”的案例。它可能没有商业压测工具那么功能全面,但贵在高度定制、贴合业务、成本可控。通过Python和Playwright的组合,我们能够以相对低的复杂度,构建出一个能模拟真实用户行为、提供深入洞察的并发测试利器。最关键的是,整个技术栈对测试团队非常友好,易于维护和扩展。下次当你需要对一个复杂的Web应用进行压力测试时,不妨考虑一下这个思路。
- 增加智能等待:不要滥用