当前位置: 首页 > news >正文

消息队列顺序性保证实战

消息队列顺序性保证实战

一、消息顺序性概述

消息队列的顺序性是指消息按照发送顺序被消费的特性,在金融交易、订单处理等场景至关重要。

1.1 顺序性问题场景

┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ Producer │────▶│ Queue │────▶│ Consumer │ │ (生产者) │ │ (队列) │ │ (消费者) │ └─────────────┘ └──────┬──────┘ └──────┬──────┘ │ │ │ ▼ │ ┌─────────────┐ │ │ Consumer 1 │ │ │ 消费消息1 │ │ └─────────────┘ │ │ │ ▼ │ ┌─────────────┐ │ │ Consumer 2 │ │ │ 消费消息2 │ │ └─────────────┘ │ ▼ 问题:消息1和消息2顺序可能错乱

1.2 顺序性破坏原因

原因说明
多分区同一主题多个分区,消息可能发送到不同分区
多消费者多个消费者并行消费,处理速度不同
重试机制消息重试可能打乱顺序
网络延迟网络波动导致消息到达顺序变化

二、Kafka顺序性保证

2.1 单分区单消费者

apiVersion: apps/v1 kind: Deployment metadata: name: kafka-consumer spec: replicas: 1 template: spec: containers: - name: consumer image: kafka-consumer:1.0.0 env: - name: KAFKA_TOPIC value: "orders" - name: KAFKA_GROUP_ID value: "order-group" - name: KAFKA_PARTITION value: "0"

2.2 分区键策略

public class OrderProducer { @Autowired private KafkaTemplate<String, Order> kafkaTemplate; public void sendOrder(Order order) { // 使用订单ID的哈希值作为分区键 String key = String.valueOf(order.getUserId()); kafkaTemplate.send("orders", key, order); } }

2.3 自定义分区器

public class OrderPartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { String orderKey = (String) key; int numPartitions = cluster.partitionCountForTopic(topic); return Math.abs(orderKey.hashCode()) % numPartitions; } @Override public void close() {} @Override public void configure(Map<String, ?> configs) {} }

三、RocketMQ顺序性保证

3.1 同步发送

public class RocketMQProducer { private DefaultMQProducer producer; public void sendOrderMessage(Order order) throws Exception { Message message = new Message( "OrderTopic", "OrderTag", order.getOrderId(), JSON.toJSONBytes(order) ); // 同步发送,保证顺序 SendResult result = producer.send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> queues, Message msg, Object arg) { Long orderId = (Long) arg; int index = (int) (orderId % queues.size()); return queues.get(index); } }, order.getOrderId()); } }

3.2 顺序消费

public class RocketMQConsumer { private DefaultMQPushConsumer consumer; public void consumeOrderMessages() throws Exception { consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { for (MessageExt msg : msgs) { Order order = JSON.parseObject(msg.getBody(), Order.class); processOrder(order); } return ConsumeOrderlyStatus.SUCCESS; } }); } }

四、RabbitMQ顺序性保证

4.1 单队列单消费者

@Component public class RabbitMQConsumer { @RabbitListener(queues = "order-queue", concurrency = "1") public void consumeOrder(Order order) { processOrder(order); } }

4.2 消息优先级

public class RabbitMQProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendOrderWithPriority(Order order, int priority) { MessageProperties props = new MessageProperties(); props.setPriority(priority); Message message = new Message( JSON.toJSONBytes(order), props ); rabbitTemplate.send("order-exchange", "order-routing-key", message); } }

五、消息顺序性最佳实践

5.1 业务层面保证

public class OrderService { @Transactional public void processOrders(List<Order> orders) { // 按订单ID排序 orders.sort(Comparator.comparingLong(Order::getOrderId)); for (Order order : orders) { // 处理订单逻辑 processOrder(order); } } }

5.2 消息去重

@Component public class MessageDeduplicationService { @Autowired private RedisTemplate<String, Object> redisTemplate; private static final String PREFIX = "msg:dedupe:"; public boolean isDuplicate(String messageId) { String key = PREFIX + messageId; Boolean exists = redisTemplate.hasKey(key); if (Boolean.TRUE.equals(exists)) { return true; } redisTemplate.opsForValue().set(key, "true", 24, TimeUnit.HOURS); return false; } }

5.3 死信队列

# RabbitMQ死信队列配置 spring: rabbitmq: listener: simple: retry: enabled: true max-attempts: 3 initial-interval: 1000ms template: reply-timeout: 5000ms # 死信交换机和队列 @Configuration public class DeadLetterConfig { @Bean public Queue deadLetterQueue() { return QueueBuilder.durable("dead-letter-queue").build(); } @Bean public Exchange deadLetterExchange() { return ExchangeBuilder.directExchange("dead-letter-exchange").durable(true).build(); } }

六、顺序性验证

6.1 测试用例

@Test void testMessageOrder() { // 发送100条有序消息 for (int i = 0; i < 100; i++) { producer.send("test-topic", String.valueOf(i), "message-" + i); } // 收集消费结果 List<String> received = consumer.receiveAll(); // 验证顺序 for (int i = 0; i < received.size(); i++) { assertEquals("message-" + i, received.get(i)); } }

6.2 性能测试

# 使用kafka-producer-perf-test测试 kafka-producer-perf-test.sh \ --topic orders \ --num-records 100000 \ --record-size 1024 \ --throughput 10000 \ --producer-props bootstrap.servers=kafka:9092

七、总结

消息顺序性保证策略:

  1. 单分区单消费者:最简单的顺序保证方式
  2. 分区键策略:按业务键哈希分配分区
  3. 同步发送:确保消息按顺序发送
  4. 顺序消费:使用消息队列的顺序消费特性
  5. 业务层面排序:消费后再排序确保顺序

根据业务场景选择合适的顺序性保证方案。

http://www.zskr.cn/news/1397427.html

相关文章:

  • Web Workers:JavaScript 的多线程编程
  • CMOS传感器lines_per_second参数原理与应用解析
  • 留学生论文被 Turnitin 判 AI?用 PaperXie 一键把 AIGC 率压到个位数,再也不怕被导师质疑
  • 告警策略与自动化运维:构建智能运维体系
  • 技术分享:让知识流动起来
  • 如何构建Multi-Agent系统的知识库:领域知识融合与动态更新
  • 明日方舟游戏资源库:技术开发者与创意工作者的完整解决方案
  • 基于PLC的立体仓库控制系统设计(设计源文件+万字报告+讲解)(支持资料、图片参考_降重降ai)_文章底部可以扫码
  • 优雅的桌面歌词体验:LyricsX Swift插件深度解析
  • Meta 8000 人大裁员余震不断:员工士气低迷、调岗不满,怨气与激进观点合流
  • 基于Transformer与知识图谱的药物重定位:2型糖尿病老药新用智能发现
  • TwinGAN:双阶段GAN实现中国山水画风格迁移的技术解析与实践
  • 大模型API定价全解析:从百倍价差到成本优化实战
  • 终极指南:如何用EyesGuard智能用眼保护工具守护您的视力健康
  • RAID5系统Ghost备份原理与一致性风险解析
  • LoRA微调实战指南:企业级AI模型精准校准方法
  • 压缩感知理论导向的核废物桶TGS图像重建技术【附代码】
  • 天津地区高层住宅自然通风与建筑节能设计参数优化【附代码】
  • Linux权限管理避坑指南:为什么你的新用户加不进sudo组?详解wheel组与/etc/sudoers.d
  • 多Agent协同场景下的Harness工程架构设计与核心挑战破解
  • 再见,我的华为5年
  • CentOS 7下用yum一键安装iperf3,再也不用担心网络测速工具了
  • MHmarkets:平台工具、风控与体验体系观察
  • 基于Bi-GRU与嵌入技术的海洋叶绿素垂直剖面深度学习预测模型
  • AI Agent Harness Engineering 创业融资攻略:如何向投资人展示 Agent 技术的商业价值
  • AI Agent商业化失败案例复盘:10个致命错误与教训
  • 2026年开源商城和 SaaS 怎么选?为什么越来越多企业开始重视“自主可控”?——真正决定企业长期上限的,从来不是“前期上线速度”,而是“未来还能不能持续演进”
  • 集成学习在低资源语言情感分析中的应用:以波斯语社交媒体评论为例
  • 融合动态新闻情感与TEGRU模型的股票价格预测实践
  • 在Mac本地部署离线AI助手:Llama 2模型与llama.cpp实战指南