当前位置: 首页 > news >正文

locust-WebSocket压测

连接WS的库有的是支持异步IO的,项目中我们推荐这样的库,但是压测时还是要选择同步的库

异步

安装

pip install websockets

代码示例

import asyncio
import websockets
import json
import randomasync def mytest():async with websockets.connect('wss://sockettest.xhkjedu.com/ws') as websocket:num = random.randint(0, 10000000)msg = {"b": {"num": num},"c": 123456,}msgstr = json.dumps(msg)await websocket.send(msgstr)print(f"↑: {msgstr}")greeting = await websocket.recv()print(f"↓: {greeting}")asyncio.get_event_loop().run_until_complete(mytest())

同步

官网地址

https://pypi.org/project/websocket-client/

安装

pip install websocket-client

示例

from websocket import create_connection
import json
import randomws = create_connection("wss://sockettest.xhkjedu.com/ws")num = random.randint(0, 10000000)
msg = {"b": {"num": num},"c": 123456,
}msgstr = json.dumps(msg)
print("Sending " + msgstr)
ws.send(msgstr)result = ws.recv()
print("Received '%s'" % result)
ws.close()
import websocketdef on_message(ws, message):print(ws)print(message)def on_error(ws, error):print(ws)print(error)def on_close(ws):print(ws)print("### closed ###")websocket.enableTrace(True)
ws = websocket.WebSocketApp("ws://127.0.0.1:8888/track",on_message=on_message,on_error=on_error,on_close=on_close)ws.run_forever()

Websocket压测

Jmeter要测试websocket接口,需要先下载安装一个websocket samplers by peter doornbosch的插件

而locust因为是代码实现,所以可以进行任何的测试,引用相应的库即可。

from locust import User, task, events
import time
from websocket import create_connection
import json
import randomdef success_call(name, recvText, total_time):events.request_success.fire(request_type="[Success]",name=name,response_time=total_time,response_length=len(recvText))def fail_call(name, total_time, e):events.request_failure.fire(request_type="[Fail]",name=name,response_time=total_time,response_length=0,exception=e,)class WebSocketClient(object):def __init__(self, host):self.host = hostself.ws = Nonedef connect(self, burl):self.ws = create_connection(burl)def recv(self):return self.ws.recv()def send(self, msg):self.ws.send(msg)class WebsocketUser(User):abstract = Truedef __init__(self, *args, **kwargs):super(WebsocketUser, self).__init__(*args, **kwargs)self.client = WebSocketClient(self.host)self.client._locust_environment = self.environmentclass ApiUser(WebsocketUser):host = "wss://sockettest.xhkjedu.com/"@task(1)def pft(self):# wss 地址self.url = 'wss://sockettest.xhkjedu.com/ws'print("连接前")start_time = time.time()try:self.client.connect(self.url)print("连接后")# 发送的订阅请求num = random.randint(0, 10000000)msg = {"b": {"num": num},"c": 123456,}msgstr = json.dumps(msg)self.client.send(msgstr)print(f"↑: {msgstr}")greeting = self.client.recv()print(f"↓: {greeting}")except Exception as e:total_time = int((time.time() - start_time) * 1000)fail_call("Send", total_time, e)else:total_time = int((time.time() - start_time) * 1000)success_call("Send", "success", total_time)
http://www.zskr.cn/news/47618.html

相关文章:

  • locust常用类和方法解析
  • 基于遗传算法的PID控制器参数整定方法详解
  • QT项目复盘:如何在有限资源下把桌面端做成‘高端应用’?
  • 基于HSMS通信标准的SECS通讯程序
  • 设置fdfs自动启动
  • AI人力资源管理系统如何让HR的工作更高效、更有判断力
  • 实用指南:AI应用架构师眼中的智能家居AI智能体:开启智能化居家生活的新机遇
  • 甘孜西林瓶灌装压塞机安装调试指南及周期解析
  • Day19综合案例一
  • 常见的无状态服务与典型有状态服务
  • CF1720D2 Xor-Subsequence (hard version)
  • 【SPIE出版 | 往届会后3个月完成EI检索】第二届遥感与数字地球国际学术会议 (RSDE 2025)
  • 基础模型+场景微调
  • Rust:关于Future和JoinHanlder的思考
  • 【刷题笔记】Placing Squares
  • P2279 [HNOI2003] 消防局的设立 题解加总结
  • 售后无忧!CRMEB售后订单处理指南,高效管理退款退货流程
  • 5分钟极简代码:轻松学会XXTEA加密解密
  • 更新了!微信公众号文章数据批量导出excel软件1.1版,轻松实现统计分析
  • 中国数据集成平台TOP10综合评估报告(2025)
  • 从“实时分账”到“智能问数”:汇付天下以“Data Agent”重塑支付业务决策效率
  • 热身赛总结 题解
  • 开盖扫码领红包小程序系统:实体商家的营销增长利器
  • 海报积分商城小程序:高效吸粉与礼品兑换的全能解决方案
  • 习题解析之:正负交错数列前n项和
  • 详细介绍:【Kylin V10】Ambari3.0.0 安装 Unexpected error Ambari repo file path not set for current OS 报错解决
  • 实战干货:Apache DolphinScheduler 参数使用与优化总结
  • 实用指南:Rust Slint实现列表式消息提示(Notification Dialog)源码分享
  • RED 状态
  • EMS4100N芯祥科技USB3.1高速双向模拟开关芯片资料,可pin对pin替代ASW3410