Day5-微服务-RocketMQ具体项目的应用场景
场景:用户购票,在服务端,校验验证码,拿到锁,选座购票,那么现在,拿锁和选座购票中插入一个异步线程,告诉用户你有资格购票或者已经下单成功,不然一直在等待,给一个快速响应,前端轮询购票结果,用户不用担心中间的选座购票,轮询带来的压力对于后端相对较小。下单和查询分为两个模块。
现在把异步操作分为出票模块,从服务端发送消息给MQ,查看是否有出票的请求,MQ有一个生产者和服务者。现在我们使用MQ的异步,存储消息,即使中间出错也会保存出错前的消息。
生产者和消费者和主题,消息就是跟主题做关联,购票也可以是一个主题。
普通消息发送
顺序消息发送
延迟消息发送
批量消息发送
事务消息发送:解决分布式事务。
新版本默认自动开启接收消息,
消费者
消费概念
Push消费
Pull消费
消息可以被多个消费者处理也可以被单个处理。
启动NameServer和Broker
修改runbroker.cmd的配置
修改为一个512m,另一个1g,-Xms:启动时就占用的内存,-Xmx:最大可用内存。
另外把堆外内存改为1g,本地生产15g太大了。
在conf目录下的broker.conf文件下添加存放rocketmq数据的地方。
修改完之后先启动NameServerv:
看到这个显示这个就成功了:
然后再启动runbroker使用broker.conf启动
启动成功:
发送消息测试:
使用RocketMQ Assistant的GUI客户端并连接:
可以看到主题中有了1000条消息:
有一部分在发送MQ之前,一部分在发送之后。
首先导入依赖:
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.3</version> </dependency>SpringBoot3并不认识这个文件
一般消费者个生产者不在同一台机器。
实例,在火车购票系统中,为了防止一个时间段的大量请求压跨系统,就要引入RocketMQ的消息队列,先拿号在买票,而不是同时一下子处理所有人的购票请求。
购买车票成功后,就会显示主题的状态。
定义处理主题为确认订单的消费者:
@Service @RocketMQMessageListener(consumerGroup = "default", topic = "CONFIRM_ORDER") //消息主题 public class ConfirmOrderConsumer implements RocketMQListener<MessageExt> { private static final Logger LOG = LoggerFactory.getLogger(ConfirmOrderConsumer.class); @Resource private ConfirmOrderService confirmOrderService; @Override public void onMessage(MessageExt messageExt) { byte[] body = messageExt.getBody(); LOG.info("ROCKETMQ收到消息:{}", new String(body)); ConfirmOrderDoReq req = JSON.parseObject(new String(body), ConfirmOrderDoReq.class); MDC.put("LOG_ID", req.getLogId()); confirmOrderService.doConfirm(req); } }在确认订单之前的controller中注入MQ的templete:
@Service public class BeforeConfirmOrderServiceImpl implements BeforeConfirmOrderService { private static final Logger LOG = LoggerFactory.getLogger(BeforeConfirmOrderServiceImpl.class); @Resource private ConfirmOrderMapper confirmOrderMapper; @Resource private SkTokenService skTokenService; @Resource ConfirmOrderService confirmOrderService; //SpringBoot3注入不成功 @Resource public RocketMQTemplate rocketMQTemplate; @SentinelResource(value = "beforeDoConfirm", blockHandler = "beforeDoConfirmBlock") @Override public Long beforeDoConfirm(ConfirmOrderDoReq req) { req.setMemberId(LoginMemberContext.getId()); Long id = null; // 根据前端传值,加入排队人数 for (int i = 0; i < req.getLineNumber() + 1; i++) { req.setMemberId(LoginMemberContext.getId()); // 校验令牌余量 boolean validSkToken = skTokenService.validSkToken(req.getDate(), req.getTrainCode(), LoginMemberContext.getId()); if (validSkToken) { LOG.info("令牌校验通过"); } else { LOG.info("令牌校验不通过"); throw new BusinessException(BusinessExceptionEnum.CONFIRM_ORDER_SK_TOKEN_FAIL); } //获取车次锁 //RedisKeyPreEnum.CONFIRM_ORDER + "-" + DateUtil.formatDate(req.getDate())+"-"+req. Date date = req.getDate(); String trainCode = req.getTrainCode(); String start = req.getStart(); String end = req.getEnd(); List<ConfirmOrderTicketReq> tickets = req.getTickets(); // 保存确认订单表,状态初始 DateTime now = DateTime.now(); ConfirmOrder confirmOrder = new ConfirmOrder(); confirmOrder.setId(SnowUtil.getSnowflakeNextId()); confirmOrder.setCreateTime(now); confirmOrder.setUpdateTime(now); confirmOrder.setMemberId(req.getMemberId()); confirmOrder.setDate(date); confirmOrder.setTrainCode(trainCode); confirmOrder.setStart(start); confirmOrder.setEnd(end); confirmOrder.setDailyTrainTicketId(req.getDailyTrainTicketId()); confirmOrder.setStatus(ConfirmOrderStatusEnum.INIT.getCode()); confirmOrder.setTickets(JSON.toJSONString(tickets)); confirmOrderMapper.insert(confirmOrder); ConfirmOrderDoReq confirmOrderDoReq = new ConfirmOrderDoReq(); confirmOrderDoReq.setDate(req.getDate()); confirmOrderDoReq.setTrainCode(req.getTrainCode()); confirmOrderDoReq.setLogId(MDC.get("LOG_ID")); // 发送MQ排队购票 String reqJson = JSON.toJSONString(confirmOrderDoReq); LOG.info("排队购票,发送mq开始,消息:{}", reqJson); //发送的主题,购票的请求转换为string rocketMQTemplate.convertAndSend(RocketMQTopicEnum.CONFIRM_ORDER.getCode(), reqJson); LOG.info("排队购票,发送mq结束"); confirmOrderService.doConfirm(confirmOrderDoReq); id = confirmOrder.getId(); } //返回最后一个id return id; } @Override public void beforeDoConfirmBlock(ConfirmOrderDoReq req, BlockException e) { LOG.info("购票请求被限流:{}", req); throw new BusinessException(BusinessExceptionEnum.CONFIRM_ORDER_FLOW_EXCEPTION); } }关键:再发送MQ之前,订单表先保存下来。即使MQ丢失,订单表的数据任然存在。优先保证数据的准确性。
获取所得操作必须在正常的购票逻辑中。
打印日志:
那票但是拿不到锁,拿令牌就有资格购票,所以有了排队机制,分布式锁是为了防止超卖。
MQ就告诉出票模块有订单产生,出票模块只关心出票不用关心给谁,按照订单出票。
后端经常处理大批量数据,所以分页会减缓压力,分页分页的出票。
订单轮询查询:
用来给正在排队的订单提供一个结果,让用户知道前面还有多少个订单。
