在半导体制造中,EAP(Equipment Automation Programming,设备自动化编程)系统是连接MES与物理设备的桥梁。它通过SECS/GEM协议与设备通信,实现配方下发、数据采集、设备控制、事件通知等核心功能。本文将带你从零搭建一个完整的EAP系统,参与「2026嵌入式全栈技术征锋令」话题投稿。
一、EAP系统在半导体中的定位
EAP是CIM(Computer Integrated Manufacturing)体系的L2层,负责与设备直接通信。在典型的半导体FAB架构中:
- MES(L3层):生产管理、工单调度
- EAP(L2层):设备控制、数据采集(本文重点)
- MCS(L2层):AMHS物料搬运系统
- 设备(L1层):光刻机、刻蚀机、PVD/CVD等
一个大型FAB可能有上百台设备,每台设备都需要一个EAP实例。因此EAP系统必须具备高可用、低延迟、高吞吐的特性。
二、SECS/GEM协议基础
2.1 SECS协议
SECS(Semiconductor Equipment Communication Standard)是半导体设备通信的底层协议。SECS消息由消息流(Stream)和消息功能(Function)组成:
- Stream 1:设备状态管理(如S1F1询问设备ID、S1F3状态查询)
- Stream 2:配方管理(如S2F41配方下发、S2F17创建进程)
- Stream 5:异常事件(如S5F1报警通知、S5F6错误报告)
- Stream 6:数据采集(如S6F1 Trace数据、S6F11事件报告)
- Stream 10:终端服务(如S10F1终端显示、S10F3终端输入)
2.2 GEM标准
GEM(Generic Equipment Model)建立在SECS之上,定义了设备通信的标准模型:
- 通信建立(Communication):HSMS连接管理、状态机
- 控制(Control):设备状态模型(PAUSE/REMOTE/LOCAL等)
- 事件报告(Event):设备事件的订阅和通知机制
- 数据采集(Data Collection):Trace数据和Event数据的采集
2.3 HSMS通信
HSMS(High-Speed SECS Message Services)是SECS消息的传输层协议,基于TCP/IP。通信流程如下:
- Host(EAP)与Equipment建立TCP连接(端口通常为5000)
- Host发送S1F13建立通信
- Equipment回复S1F14确认
- Host发送S1F17配置事件报告
- 通信建立完成,开始正常消息交互
三、Python实现SECS/GEM通信
3.1 HSMS连接管理
import socket
import struct
import threading
from typing import Callable, Optional
class HSMSConnection:
"""HSMS TCP连接管理"""
def __init__(self, host: str, port: int = 5000):
self.host = host
self.port = port
self.socket: Optional[socket.socket] = None
self.session_id = 0
self.connected = False
self._callback = None
self._recv_thread = None
def connect(self, session_id: int = 1) -> bool:
try:
self.socket = socket.socket(
socket.AF_INET, socket.SOCK_STREAM
)
self.socket.settimeout(10)
self.socket.connect((self.host, self.port))
self.session_id = session_id
self.connected = True
# 启动接收线程
self._recv_thread = threading.Thread(
target=self._receive_loop, daemon=True
)
self._recv_thread.start()
return True
except Exception as e:
print(f'连接失败: {e}')
return False
def send_message(self, stream: int, func: int,
wbit: bool = False,
data: bytes = b'') -> bool:
"""发送SECS消息"""
if not self.connected:
return False
msg_id = self._next_msg_id()
header = self._build_header(
stream, func, wbit, msg_id, len(data)
)
try:
self.socket.sendall(header + data)
return True
except Exception as e:
print(f'发送失败: {e}')
return False
def _build_header(self, s, f, w, mid, dlen):
# HSMS消息头:10字节
dev = 0
header = struct.pack('>IIHH',
10 + dlen, # Message Length
dev << 1 | (1 if w else 0), # Device ID
s << 8 | f, # Stream | Function
(1 << 15) | mid # PType | System Byte
)
return header
3.2 常用SECS消息封装
class SECSEquipement:
"""EAP与设备通信的SECS消息封装"""
def __init__(self, conn: HSMSConnection):
self.conn = conn
def establish_comm(self):
"""S1F13: 建立通信"""
return self.conn.send_message(1, 13, wbit=True)
def request_equipment_id(self):
"""S1F1: 询问设备ID"""
return self.conn.send_message(1, 1, wbit=True)
def send_recipe(self, recipe_data: dict):
"""S2F41: 下发配方"""
data = self._build_recipe_data(recipe_data)
return self.conn.send_message(2, 41, data=data)
def start_process(self, lot_id: str):
"""S2F17: 启动进程"""
data = self._build_ascii(lot_id)
return self.conn.send_message(2, 17, data=data)
def request_event_report(self):
"""S6F1: 请求事件报告"""
return self.conn.send_message(6, 1, wbit=True)
def _build_ascii(self, text: str) -> bytes:
# SECS Item Format: ASCII
encoded = text.encode('ascii')
return bytes([0x41]) + struct.pack(
'>I', len(encoded)
) + encoded
3.3 EAP主程序框架
class EAPAgent:
"""EAP Agent主程序"""
def __init__(self, equip_id: str, host: str,
port: int = 5000):
self.equip_id = equip_id
self.conn = HSMSConnection(host, port)
self.secs = SECSEquipement(self.conn)
self.state = 'DISCONNECTED'
self.event_handlers = {}
def on_event(self, event_id: str, handler):
"""注册事件处理器"""
self.event_handlers[event_id] = handler
def startup(self) -> bool:
"""EAP启动序列"""
# Step 1: 建立HSMS连接
if not self.conn.connect():
return False
self.state = 'CONNECTED'
# Step 2: 建立SECS通信 (S1F13)
if not self.secs.establish_comm():
return False
# Step 3: 获取设备ID (S1F1)
self.secs.request_equipment_id()
# Step 4: 配置事件报告
self.secs.request_event_report()
self.state = 'ONLINE'
print(f'[{self.equip_id}] EAP上线成功')
return True
def shutdown(self):
"""EAP关闭序列"""
# S1F15: 关闭通信
self.conn.send_message(1, 15, wbit=True)
self.conn.disconnect()
self.state = 'DISCONNECTED'
print(f'[{self.equip_id}] EAP已关闭')
# 使用示例
if __name__ == '__main__':
eap = EAPAgent('ETCH-01', '192.168.1.100')
if eap.startup():
# 注册事件处理
eap.on_event('ProcessCompleted',
lambda d: print(f'完成: {d}'))
eap.on_event('AlarmNotify',
lambda d: print(f'报警: {d}'))
四、EAP系统架构设计
生产级的EAP系统需要考虑以下架构要点:
1. 消息队列解耦
EAP不应直接与MES同步通信。建议通过消息队列(如Kafka/RabbitMQ)解耦:设备事件→EAP→MQ→MES/MCS。这样即使MES短暂不可用,EAP也能继续运行。
2. 断线重连机制
设备可能因维护、故障等原因断开连接。EAP必须支持自动重连、状态恢复、消息缓冲。建议实现指数退避重连策略。
3. 配方缓存
配方数据应缓存在EAP本地,避免每次从MES拉取。当MES不可用时,EAP仍可使用缓存配方继续生产。
4. 日志与监控
所有SECS消息都应记录到日志系统(建议用ELK或InfluxDB+Grafana),便于问题排查和数据分析。
五、开发经验与避坑指南
在实际开发EAP系统时,这些经验非常有价值:
- 不同设备的SECS实现有差异:同一标准,不同厂商实现可能不同,务必做设备适配测试
- 消息时序很重要:某些操作必须按固定顺序执行(如先S2F41配方再S2F17启动)
- 处理设备的非标行为:部分老旧设备可能不完全遵守GEM标准,需要特殊处理
- 性能压力测试:一台设备可能每秒产生数十条消息,必须确保EAP处理性能足够
- SECS消息超时:必须实现消息超时和重发机制,避免因丢包导致流程卡死
六、总结
EAP系统是半导体CIM体系的关键组件,是MES与设备之间的「翻译官」。本文从SECS/GEM协议基础出发,提供了Python实现的HSMS通信、常用SECS消息封装、EAP主程序框架。这些代码为开发一个生产级EAP系统提供了坚实的基础。
如果你对半导体设备通信、CIM系统开发感兴趣,欢迎关注我的CSDN博客,持续分享半导体智能制造的实战经验!
--------------------------------------------------
关注我,每天分享半导体智能制造干货!
有问题?评论区留言,必回!
我的CSDN资源(积分兑换,持续更新):
- 《晶圆检查可视化工具》Python完整源码
- 《简单标签管理系统》小工厂数据管理神器
- 《MES系统设计文档模板》企业级模板
- 更多CIM工具:SPC/OEE/SECS-GEM/FDC/MES/EAP/APC...
访问主页下载:https://blog.csdn.net/yeflashzhihui
标签:#AI #半导体 #智能制造 #Python #工业互联网 #CIM