迁移旧版本 RabbitMQ 到新集群,最稳妥的方案是采用“双写双读”配合消息持久化与确认机制,而非直接拷贝数据文件,尤其是在跨版本或跨机房场景下。
先说结论:直接拷贝存储文件风险极高,推荐通过消息同步工具或双写方案过渡,并确保端到端持久化。
- 适合跨版本升级或跨集群迁移场景
- 先看版本兼容性与队列持久化配置
- 建议通过消息量对比验证一致性
前置准备:插件与工具
迁移前需确保管理插件和同步插件已启用,并准备好 CLI 工具。
1. 启用管理插件(源与目标集群)
rabbitmq-plugins enable rabbitmq_management rabbitmq-plugins enable rabbitmq_shovel
2. 获取 rabbitmqadmin 工具
该工具默认不安装在系统路径,需从管理界面下载:
wget http://<management-node-ip>:15672/cli/rabbitmqadmin -O /usr/local/bin/rabbitmqadmin chmod +x /usr/local/bin/rabbitmqadmin
分步处理
1. 评估与准备
检查源集群版本与目标集群版本的兼容性。若跨大版本(如 3.x 到 3.9+),需注意 Quorum Queues 引入的元数据存储变化(Raft 共识),但 Classic Queue 仍主要依赖 Mnesia。确保所有队列设置为 durable=true,消息发送设置 deliveryMode=2。
2. 元数据迁移
使用 rabbitmqadmin 导出交换机、队列和绑定关系定义文件,并在目标集群导入。不要直接拷贝 mnesia 数据库文件,除非是同版本同环境的热备切换。
rabbitmqadmin export definitions.json # 在目标集群执行导入 rabbitmqadmin import definitions.json
3. 数据同步
方案 A(Shovel 插件):配置 Shovel 将旧集群消息实时同步到新集群队列。需先启用插件。
rabbitmqctl set_parameter shovel my_shovel '{"src-uri": "amqp://user:pass@source:5672/%2f", "src-queue": "source_queue", "dest-uri": "amqp://user:pass@dest:5672/%2f", "dest-queue": "dest_queue"}'方案 B(双写双读):修改生产端代码同时向新旧集群发送消息,消费端先消费旧集群,逐步切换至新集群。以下是 Java Spring AMQP 双写示例逻辑:
@Bean
public RabbitTemplate primaryTemplate() {// 配置旧集群 ConnectionFactoryreturn new RabbitTemplate(primaryConnectionFactory());
}@Bean
public RabbitTemplate secondaryTemplate() {// 配置新集群 ConnectionFactoryreturn new RabbitTemplate(secondaryConnectionFactory());
}// 发送时双写
public void sendMessage(String msg) {primaryTemplate().convertAndSend("exchange", "route", msg);try {secondaryTemplate().convertAndSend("exchange", "route", msg);} catch (Exception e) {// 记录日志,不影响主流程log.warn("Secondary cluster send failed", e);}
}4. 切换与下线
确认新集群消息积压为零且消费正常后,停止旧集群生产端流量,待旧消息消费完毕后下线旧服务。
怎么验证是否生效
1. 命令行对比
登录管理界面或 CLI,对比新旧集群的 Queue 数量与 Message 数量是否一致。
rabbitmqctl list_queues name messages_ready messages_unacknowledged > queue_check.txt
2. 一致性校验脚本
可通过简单脚本对比两端消息总数(需确保迁移期间无新消息写入):
#!/bin/bash
SOURCE_COUNT=$(rabbitmqctl -n source_node list_queues messages | awk '{sum+=$2} END {print sum}')
DEST_COUNT=$(rabbitmqctl -n dest_node list_queues messages | awk '{sum+=$2} END {print sum}')
if [ "$SOURCE_COUNT" == "$DEST_COUNT" ]; thenecho "Check Passed: $SOURCE_COUNT == $DEST_COUNT"
elseecho "Check Failed: $SOURCE_COUNT != $DEST_COUNT"
fi3. 监控指标
观察消费者端的 ACK 日志,确认没有大量 Redelivered 消息。在迁移期间,监控源集群的 message_ready 和 message_unacknowledged 指标,确保最终归零。
常见坑
版本存储引擎差异:RabbitMQ 3.9+ 引入了 Quorum Queues,元数据存储机制有所变化,但 Classic Queue 仍主要依赖 Mnesia。跨版本直接拷贝数据文件官方不支持,极易导致启动失败。
非持久化消息:若消息未设置持久化,迁移期间重启服务会导致内存中消息丢失。
自动 ACK 风险:消费者若使用自动 ACK,宕机时消息会被直接删除,迁移期间建议改为手动 ACK。
rabbitmqadmin 不可用:该工具需手动下载,未下载前直接执行命令会报错 command not found。
参考来源
- RabbitMQ Official Docs: RabbitMQ Clustering - https://www.rabbitmq.com/clustering.html
- RabbitMQ Official Docs: Shovel Plugin - https://www.rabbitmq.com/shovel.html
- RabbitMQ Official Docs: Production Checklist - https://www.rabbitmq.com/production-checklist.html
原文链接:https://www.zjcp.cc/ask/11582.html
