Spring Cloud Stream RocketMQ整合步骤

Spring Cloud Stream RocketMQ整合步骤

1.RocketMQ服务器安装部署

  • 下载RocketMQ服务器
  • 配置namesrvbroker
  • 启动Name Server和Broker服务

2.项目依赖配置

<!-- Spring Cloud Stream RocketMQ依赖 --><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-stream-rocketmq</artifactId></dependency>

3.通道接口定义

创建接口定义输入输出通道:

publicinterfaceChannel{StringOUTPUT_EXAMPLE="output_example";StringINPUT_EXAMPLE="input_example";@Output(OUTPUT_EXAMPLE)MessageChanneloutputExample();@Input(INPUT_EXAMPLE)MessageChannelinputExample();}

4.YAML配置文件绑定

配置通道与MQ主题映射及服务器连接:

spring:cloud:stream:rocketmq:binder:name-server:xx.xx.xx.xx:9876bindings:output_example:destination:example-topicinput_example:destination:example-topicgroup:example-group

5.启用通道绑定(在消息发送接收类上面要加的注解)

@EnableBinding(Channel.class)@ComponentpublicclassMessageProcessor{// 消息处理逻辑}

@EnableBinding 注解的作用
启用通道绑定功能:告诉Spring Cloud Stream框架需要绑定消息通道
扫描通道定义:框架会扫描 Channel 接口中定义的所有 @Input 和 @Output 通道
创建通道实例:为每个定义的通道创建相应的实例,使得可以通过 @Autowired 注入使用
建立消息连接:将应用程序与消息中间件(RocketMQ)建立连接
所以无论是消息发送方还是接收方,都需要使用 @EnableBinding(Channel.class) 注解来启用消息通道功能。

6.消息发送实现

@Autowired@Qualifier(Channel.OUTPUT_EXAMPLE)privateMessageChanneloutputExampleChannel;publicvoidsendExampleMessage(Stringcontent){Message<String>message=MessageBuilder.withPayload(content).build();outputExampleChannel.send(message);}

7.消息接收实现

@StreamListener(Channel.INPUT_EXAMPLE)publicvoidhandleExampleMessages(Stringcontent){// 处理接收到的消息}