当前位置: 首页 > news >正文

从传感器到推理端:VLA 机器人 TCP 通信与 msgpack 序列化深度解析

从传感器到推理端VLA 机器人 TCP 通信与 msgpack 序列化深度解析场景在做 VLA 机器人项目时需要一套高效的传感器数据传输方案——机器人端发送传感器数据推理端接收后模型推理再将结果以 chunk 流式返回。本文以此为背景把 TCP 通信 msgpack 序列化涉及的每个知识点都讲清楚。一、为什么用 msgpack 而不是 JSONTCP 传输的是字节流任何数据发送前都需要序列化成字节。JSONmsgpack格式文本可读二进制不可读体积较大比 JSON 小 20–50%速度较慢序列化/反序列化更快适合场景对外 API、配置文件传感器数据流、服务间通信机器人传感器数据高频100Hz、数据量大msgpack 是更合适的选择。二、msgpack 序列化原理逐字节拆解机器人端发送一帧传感器数据sensor_request{type:sensor,joint:[0.1,-0.2,0.3],# 关节角度单位 radts:1700000000# 时间戳Unix 秒}packedmsgpack.packb(sensor_request,use_bin_typeTrue)# 共 43 字节用 hex 查看原始字节83 a4 74797065 a6 73656e736f72 a5 6a6f696e74 93 ca 3dcccccd ca be4ccccd ca 3e99999a a2 7473 ce 6553f100逐字节对照表偏移字节hex含义[00]83fixmap3 个键值对0x80 3[01]a4fixstr长度 40xa0 4[02-05]74 79 70 65typeASCII[06]a6fixstr长度 60xa0 6[07-12]73 65 6e 73 6f 72sensorASCII[13]a5fixstr长度 50xa0 5[14-18]6a 6f 69 6e 74jointASCII[19]93fixarray3 个元素0x90 3[20]cafloat32 类型标记[21-24]3d cc cc cd0.1的 IEEE 754 float32[25]cafloat32 类型标记[26-29]be 4c cc cd-0.2的 IEEE 754 float32[30]cafloat32 类型标记[31-34]3e 99 99 9a0.3的 IEEE 754 float32[35]a2fixstr长度 20xa0 2[36-37]74 73tsASCII[38]ceuint32 类型标记[39-42]65 53 f1 001700000000大端 uint32字节数验证1 ← fixmap 头 (14) (16) ← type: sensor (15) ← joint 键 1 3×(14) ← fixarray 头 3个float32每个1字节标记4字节数据 (12) ← ts 键 (14) ← uint32 值 43 字节 ✓msgpack 类型编码规律前缀规则范围0x80 nfixmap字典n ≤ 150x90 nfixarray列表n ≤ 150xa0 nfixstr字符串n ≤ 31 字节0x00~0x7f正整数直接存单字节0–1270xce 4字节uint320 – 4,294,967,2950xca 4字节float32IEEE 754—Python 打印bytes时能表示为 ASCII 的字节会直接显示成字母所以看到的是\x83\xa4type\xa6sensor...而不是全十六进制。三、TCP 粘包问题与长度前缀协议什么是粘包TCP 是流式协议没有消息边界。sendall一次发出 47 字节4 字节头 43 字节体接收方可能第一次 recv → 20 字节 第二次 recv → 27 字节如果推理端连续推理多帧并返回接收方甚至可能一次收到多条消息粘在一起。解决方案4 字节长度前缀协议约定每条消息前加固定 4 字节存储消息体的字节长度。发送的 47 字节 ┌──────────────────┬────────────────────────────────────────────┐ │ 00 00 00 2b │ 83 a4 74 79 70 65 a6 73 65 6e ... │ │ (4字节值43) │ (43字节 msgpack 消息体) │ └──────────────────┴────────────────────────────────────────────┘接收方先读 4 字节知道长度43再精确读 43 字节完全消除粘包。四、struct.pack / unpack字节与整数互转发送端整数 → 字节struct.pack(I,43)# b\x00\x00\x00\x2b格式字符串I字符含义大端序Big-endian高位字节在前即网络字节序Iunsigned int4 字节无符号整数43 0x0000002b 大端序 00 00 00 2b ← 高位在前标准网络传输顺序 小端序 2b 00 00 00 ← x86 CPU 本地字节序接收端字节 → 整数msg_lengthstruct.unpack(I,raw_length)[0]# b\x00\x00\x00\x2b → (43,) → 43struct.unpack固定返回元组支持一次解多个值[0]取第一个元素struct.unpack(I,...)# → (43,) 一个值也是元组struct.unpack(II,...)# → (43, 7) 解两个值五、recv_all确保读满指定字节数defrecv_all(conn,length):databwhilelen(data)length:packetconn.recv(length-len(data))ifnotpacket:returnNone# 对端关闭连接recv 返回 bdatapacketreturndataconn.recv(n)语义是最多读 n 字节不保证一次读满。循环示例目标读 43 字节TCP 分两次到达初始data b 第 1 次循环len(data)0 43recv(43) → 实际到了 20 字节data 20字节 第 2 次循环len(data)20 43recv(23) → 实际到了 23 字节data 43字节 第 3 次循环len(data)43 43退出return dataif not packet处理对端正常关闭的情况此时recv返回b不判断会死循环。六、Socket 对象解读conn,addrserver_sock.accept()# socket.socket fd4, family2, type1, proto0,# laddr(127.0.0.1, 9999), raddr(127.0.0.1, 49724)字段值含义fd44文件描述符Linux 中 socket 也是文件family2AF_INETIPv4type1SOCK_STREAMTCPladdr(127.0.0.1, 9999)推理服务端地址和监听端口raddr(127.0.0.1, 49724)机器人端地址和临时端口server_sock 与 conn 的区别server_sock.listen(5)# 只负责监听等待机器人连接conn,addrserver_sock.accept()# 每来一个连接新建 conn 专门通信server_sock守着 9999 端口不做数据收发conn和某个具体机器人节点通信的 socket关于客户端临时端口机器人端connect()时OS 随机分配一个空闲端口Ephemeral Port通常 49152–65535用完即释放。TCP 连接由四元组唯一标识机器人端 IP : 临时端口 → 推理端 IP : 监听端口 127.0.0.1 : 49724 → 127.0.0.1 : 9999七、完整通信流程机器人端client 推理端server │ │ │ connect(推理端 IP:9999) │ │──────────────────────────────────────────│ │ │ accept() → conn │ │ │ sendall(4字节长度头 43字节传感器数据) │ │──────────────────────────────────────────│ │ │ recv_all(conn, 4) 读长度 → 43 │ │ recv_all(conn, 43) 读消息体 │ │ msgpack.unpackb() 还原字典 │ │ VLA 模型推理 │ │ msgpack.packb() 序列化 action │ │ sendall(4字节头 41字节响应) │ │ │ recv_all(4) 读长度 → 41 │ │ recv_all(41) 读响应体 │ │ msgpack.unpackb() 还原 action 字典 │ │ │ │ close() │ close(conn)八、完整代码server.py推理端 推理端 TCP 服务 - 接收机器人传感器数据msgpack 序列化 - 模拟 VLA 模型推理返回控制 action importsocketimportmsgpackimportstruct HOST127.0.0.1PORT9999defrecv_all(conn,length):循环读取确保收满 length 字节解决 TCP 流式拆包问题databwhilelen(data)length:packetconn.recv(length-len(data))ifnotpacket:returnNonedatapacketreturndatadefinfer(sensor_data:dict)-dict:模拟 VLA 推理输入传感器数据返回控制 actionjointsensor_data.get(joint,[])# 实际场景替换为模型前向推理action[round(j*0.5,4)forjinjoint]return{status:ok,action:action,chunk:1,}defhandle_client(conn,addr):print(f[推理端] 机器人已连接:{addr})try:# 1. 读 4 字节长度前缀raw_lengthrecv_all(conn,4)ifnotraw_length:return# b\x00\x00\x00\x2b → 43msg_lengthstruct.unpack(I,raw_length)[0]print(f[推理端] 消息体长度:{msg_length}字节)# 2. 按长度读消息体raw_datarecv_all(conn,msg_length)ifnotraw_data:return# 3. msgpack 反序列化sensor_datamsgpack.unpackb(raw_data,rawFalse)print(f[推理端] 收到传感器数据:{sensor_data})# 4. 推理responseinfer(sensor_data)print(f[推理端] 推理结果:{response})# 5. 序列化响应 长度前缀发送packedmsgpack.packb(response,use_bin_typeTrue)conn.sendall(struct.pack(I,len(packed))packed)exceptExceptionase:print(f[推理端] 出错:{e})finally:conn.close()print(f[推理端] 关闭连接:{addr}\n)defmain():withsocket.socket(socket.AF_INET,socket.SOCK_STREAM)asserver_sock:server_sock.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)server_sock.bind((HOST,PORT))server_sock.listen(5)print(f[推理端] 监听{HOST}:{PORT}...)whileTrue:conn,addrserver_sock.accept()handle_client(conn,addr)# 并发版# import threading# threading.Thread(targethandle_client, args(conn, addr)).start()if__name____main__:try:main()exceptKeyboardInterrupt:print(\n[推理端] 已退出)client.py机器人端 机器人端 TCP 客户端 - 采集传感器数据msgpack 序列化后发送给推理端 - 接收推理端返回的控制 action importsocketimportmsgpackimportstruct HOST127.0.0.1PORT9999defrecv_all(sock,length):databwhilelen(data)length:packetsock.recv(length-len(data))ifnotpacket:returnNonedatapacketreturndatadefsend_sensor(sensor_data:dict)-dict:发送一帧传感器数据返回推理端的 actionwithsocket.socket(socket.AF_INET,socket.SOCK_STREAM)assock:sock.connect((HOST,PORT))# msgpack 序列化# {type: sensor, joint: [0.1, -0.2, 0.3], ts: 1700000000}# → 43 字节hex: 83 a4 74797065 a6 73656e736f72 ...packedmsgpack.packb(sensor_data,use_bin_typeTrue)print(f[机器人端] 序列化后{len(packed)}字节:{packed.hex()})# 加 4 字节长度前缀后发送# 43 → struct.pack(I, 43) b\x00\x00\x00\x2bsock.sendall(struct.pack(I,len(packed))packed)print(f[机器人端] 已发送:{sensor_data})# 接收推理结果raw_lengthrecv_all(sock,4)msg_lengthstruct.unpack(I,raw_length)[0]raw_datarecv_all(sock,msg_length)responsemsgpack.unpackb(raw_data,rawFalse)print(f[机器人端] 收到 action:{response}\n)returnresponsedefmain():# 模拟多帧传感器数据frames[{type:sensor,joint:[0.1,-0.2,0.3],ts:1700000000},{type:sensor,joint:[0.2,-0.15,0.25],ts:1700000001},{type:sensor,joint:[0.0,0.0,0.0],ts:1700000002},]forframeinframes:try:send_sensor(frame)exceptConnectionRefusedError:print([机器人端] 无法连接推理端请先启动 server.py)breakif__name____main__:main()九、运行pipinstallmsgpack# 终端 1启动推理端python server.py# 终端 2启动机器人端python client.py十、扩展方向本文 demo 是单帧一问一答实际 VLA 场景可在此基础上扩展长连接多帧一次connect后循环发送多帧传感器数据避免频繁建连开销需在handle_client中加while True循环读包推理 chunk 流式返回推理端每推理出一个 token/chunk 就发一帧响应机器人端循环接收同样用长度前缀帧封装每个 chunk并发多机器人handle_client改为threading.Thread或asyncio同时服务多个机器人节点消息类型扩展在请求字典中加type字段区分关节角度、图像帧base64、力传感器等不同数据服务端按type分发处理
http://www.zskr.cn/news/1373423.html

相关文章:

  • Rydberg原子接收器:量子传感技术的突破与应用
  • Ubuntu 20.04 ROS新手避坑:catkin_make报‘empy’错误的完整解决流程
  • ARM SME指令集浮点运算优化指南
  • 神经网络量化技术:TruncQuant在边缘计算中的高效实现
  • OpenClaw强势推出V2026.5.20版本地部署最新教程来啦!3分钟一键安装中文版可视化操作指南
  • ARM SME指令集:矩阵运算与数据传输优化指南
  • 2026年5月视频剪辑制作培训机构排行实测盘点:软件测试线下就业培训/AI软件测试培训/外贸电商设计培训/影视特效剪辑培训/选择指南 - 优质品牌商家
  • 手把手教你用Yalmip+Gurobi复现顶刊论文:配电网应急电源预配置的鲁棒优化实战
  • CNSH 语义接入规范 v2.0·功能语义技术用词对照表 + 协作宣言|中英对照·行话翻译·DNA锚链
  • 胖头鱼的技术专栏-427 AI Agent记忆系统可视化页面介绍(20260524)
  • 2026年5月新发布河南IPO企业股权激励选择指南 - 2026年企业推荐榜
  • CVE漏洞编号规范与开源项目安全验证指南
  • Kylin V10 SP1 上 auditd 服务内存泄漏排查与修复实录(附升级包下载)
  • ARM ETE协议数据包解析与嵌入式调试实践
  • 边缘计算深度学习模型优化:MARCO框架技术解析
  • Arm DS自定义组件XML配置与调试技巧
  • 动态稀疏训练与对角线稀疏模式优化实践
  • Burp Suite Intruder四种攻击模式原理与实战建模
  • 四川钢管厂家现货批发|工程专用钢材一站式配送 - 四川盛世钢联营销中心
  • ARM ETE嵌入式追踪单元架构与调试实践
  • Keil MDK V5模块化架构解析与供应商资源获取指南
  • gmapping算法源码实现分析(四)
  • Arm DS/DS-5 JTAG解锁序列配置与调试指南
  • 瑞德克斯在不同终端的使用体验如何?语言覆盖广不广?
  • 别装Matlab了!用这个免费网站Desmos,5分钟搞定函数绘图和矩阵计算
  • 揭秘古老算法与现代插桩:手把手用‘更相减损术’理解程序插桩技术
  • uniapp使用canvas绘制雷达图支持多维度
  • PyTorch代码(5)
  • Claude Code完整安装与配置指南
  • 【助睿实验指导】学生用户画像 - 考勤画像可视化分析