RocketMQ(三):集成SpringBoot

2024-07-08 1101阅读

RocketMQ系列文章

RocketMQ(一):基本概念和环境搭建

RocketMQ(二):原生API快速入门

RocketMQ(三):集成SpringBoot


目录

  • 一、搭建环境
  • 二、不同类型消息
    • 1、同步消息
    • 2、异步消息
    • 3、单向消息
    • 4、延迟消息
    • 5、顺序消息
    • 6、带tag消息
    • 7、带key消息
    • 三、消息消费两种模式
      • 1、负载均衡模式
      • 2、广播模式

        一、搭建环境

        • 需要创建两个服务,消息生产服务和消息消费者服务
        • 生产消息存在多个服务,消费则统一由一个服务处理
        • 这样可以做到解耦

          pom.xml

          • 生产者和消费者都需要
                org.apache.rocketmq
                rocketmq-spring-boot-starter
                2.2.2
            
            

            生产者配置文件

            • 设置统一的生产者组,这样发送消息时就不用指定了
              rocketmq:
                  name-server: 127.0.0.1:9876     # rocketMq的nameServer地址
                  producer:
                      group: boot-producer-group        # 生产者组别
                      send-message-timeout: 3000  # 消息发送的超时时间
                      retry-times-when-send-async-failed: 2  # 异步消息发送失败重试次数
                      max-message-size: 4194304       # 消息的最大长度
              

              生产者配置文件

              • 不能设置统一的消费者组,因为不同的消费者订阅关系不一致,需要设置不同的消费者组
                rocketmq:
                    name-server: localhost:9876
                

                二、不同类型消息

                直接引入即可

                @Autowired
                private RocketMQTemplate rocketMQTemplate;
                

                1、同步消息

                生产消息

                • 消息由消费者发送到broker后,会得到一个确认,是具有可靠性的
                • 比如:重要的消息通知,短信通知等
                  rocketMQTemplate.syncSend("bootTestTopic", "我是boot的一个消息");
                  

                  消费消息

                  • RocketMQListener的泛型类型即消息类型
                    • MessageExt类型是消息的所有内容
                    • 其他类型则就只是消息体内容,没有消息头内容(keys、msgId、延迟时间、重试次数、主题名称...)
                    • onMessage方法内没有报错就是签收了,报错就是拒收会重试
                      @Component
                      @RocketMQMessageListener(topic = "bootTestTopic", consumerGroup = "boot-test-consumer-group")
                      public class ABootSimpleMsgListener implements RocketMQListener {
                          @Override
                          public void onMessage(MessageExt message) {
                              System.out.println(new String(message.getBody()));
                          }
                      }
                      

                      2、异步消息

                      • 发送异步消息,发送完以后会有一个异步通知
                      • 不影响程序往下执行
                        rocketMQTemplate.asyncSend("bootAsyncTestTopic", "我是boot的一个异步消息", new SendCallback() {
                            @Override
                            public void onSuccess(SendResult sendResult) {
                                System.out.println("成功");
                            }
                            @Override
                            public void onException(Throwable throwable) {
                                System.out.println("失败" + throwable.getMessage());
                            }
                        });
                        

                        3、单向消息

                        • 不关心发送结果的场景,这种方式吞吐量很大,但是存在消息丢失的风险
                        • 例如日志信息的发送
                          rocketMQTemplate.sendOneWay("bootOnewayTopic", "单向消息");
                          

                          4、延迟消息

                          • RocketMQ不支持任意时间的延时
                          • 只支持以下18个固定的延时等级,等级1就对应1s,以此类推,最高支持2h延迟
                          • private String messageDelayLevel = “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;
                          • 发送一个延时消息,延迟等级为4级,也就是30s后被监听消费
                            Message msg = MessageBuilder.withPayload("我是一个延迟消息").build();
                            rocketMQTemplate.syncSend("bootMsTopic", msg, 3000, 4);
                            

                            5、顺序消息

                            生产消息

                            • 根据syncSendOrderly方法的第三个参数计算hash值决定消息放入哪个队列
                              // 顺序消息 发送者放 需要将一组消息 都发在同一个队列中去  消费者 需要单线程消费
                              List msgModels = Arrays.asList(
                                      new MsgModel("qwer", 1, "下单"),
                                      new MsgModel("qwer", 1, "短信"),
                                      new MsgModel("qwer", 1, "物流"),
                                      new MsgModel("zxcv", 2, "下单"),
                                      new MsgModel("zxcv", 2, "短信"),
                                      new MsgModel("zxcv", 2, "物流")
                              );
                              msgModels.forEach(msgModel -> {
                                  // 发送  一般都是以json的方式进行处理
                                  // 根据第三个参数计算hash值决定消息放入哪个队列
                                  rocketMQTemplate.syncSendOrderly("bootOrderlyTopic", JSON.toJSONString(msgModel), msgModel.getOrderSn());
                              });
                              

                              消费消息

                              • 默认是并发消费模式,可以设置为单线程顺序模式
                              • 设置消费重试次数
                                @Component
                                @RocketMQMessageListener(topic = "bootOrderlyTopic",
                                        consumerGroup = "boot-orderly-consumer-group",
                                        consumeMode = ConsumeMode.ORDERLY, // 顺序消费模式 单线程
                                        maxReconsumeTimes = 5 // 消费重试的次数
                                )
                                public class BOrderlyMsgListener implements RocketMQListener {
                                    @Override
                                    public void onMessage(MessageExt message) {
                                        MsgModel msgModel = JSON.parseObject(new String(message.getBody()), MsgModel.class);
                                        System.out.println(msgModel);
                                    }
                                }
                                

                                6、带tag消息

                                • tag带在主题后面用:来携带
                                  rocketMQTemplate.syncSend("bootTagTopic:tagA", "我是一个带tag的消息");
                                  

                                  7、带key消息

                                  Message message = MessageBuilder
                                          .withPayload("我是一个带key的消息")
                                          .setHeader(RocketMQHeaders.KEYS, "10086")
                                          .build();
                                  rocketMQTemplate.syncSend("bootKeyTopic", message);
                                  

                                  获取带key和tag的消费者

                                  • 过滤模式有两种:正则表达式和sql92方式
                                  • keys从MessageExt对象中获取
                                    @Component
                                    @RocketMQMessageListener(topic = "bootTagTopic",
                                            consumerGroup = "boot-tag-consumer-group",
                                            selectorType = SelectorType.TAG,// tag过滤模式
                                            selectorExpression = "tagA || tagB"
                                    //        selectorType = SelectorType.SQL92,// sql92过滤模式
                                    //        selectorExpression = "a in (3,5,7)" // broker.conf中开启enbalePropertyFilter=true
                                    )
                                    public class CTagMsgListener implements RocketMQListener {
                                        @Override
                                        public void onMessage(MessageExt message) {
                                            System.out.println("获取keys: " + message.getKeys());
                                            System.out.println("消息内容: " + new String(message.getBody()));
                                        }
                                    }
                                    

                                    查看源码

                                    • destination目标 = 主题 : 标签
                                    • keys从消息头里面获取

                                      RocketMQ(三):集成SpringBoot

                                      三、消息消费两种模式

                                      • Rocketmq消息消费的模式分为两种:负载均衡模式和广播模式
                                      • 负载均衡模式表示多个消费者交替消费同一个主题里面的消息
                                      • 广播模式表示每个消费者都消费一遍订阅的主题的消息

                                        1、负载均衡模式

                                        • 队列会被消费者分摊
                                        • 队列数量应该>=消费者数量,否则多出来的消费者永远接收不到消息
                                        • mq服务器会记录消息的消费点位(即消息是否被消费)

                                          创建多个消费者监听同一个主题

                                          @Component
                                          @RocketMQMessageListener(topic = "modeTopic",
                                                  consumerGroup = "mode-consumer-group-a",
                                                  messageModel = MessageModel.CLUSTERING, // 集群模式(负载均衡)
                                          )
                                          public class DC1 implements RocketMQListener {
                                              @Override
                                              public void onMessage(String message) {
                                                  System.out.println("我是mode-consumer-group-a组的第一个消费者:" + message);
                                              }
                                          }
                                          @Component
                                          @RocketMQMessageListener(topic = "modeTopic",
                                                  consumerGroup = "mode-consumer-group-a",
                                                  messageModel = MessageModel.CLUSTERING // 集群模式(负载均衡)
                                          )
                                          public class DC2 implements RocketMQListener {
                                              @Override
                                              public void onMessage(String message) {
                                                  System.out.println("我是mode-consumer-group-a组的第二个消费者:" + message);
                                              }
                                          }
                                          @Component
                                          @RocketMQMessageListener(topic = "modeTopic",
                                                  consumerGroup = "mode-consumer-group-a",
                                                  messageModel = MessageModel.CLUSTERING // 集群模式(负载均衡)
                                          )
                                          public class DC3 implements RocketMQListener {
                                              @Override
                                              public void onMessage(String message) {
                                                  System.out.println("我是mode-consumer-group-a组的第三个消费者:" + message);
                                              }
                                          }
                                          

                                          生产者发送多条消息

                                          @Test
                                          public void modeTest() throws Exception {
                                              for (int i = 1; i 
                                                  rocketMQTemplate.syncSend("modeTopic", "我是第" + i + "个消息");
                                              }
                                          }
                                          
                                              @Override
                                              public void onMessage(String message) {
                                                  System.out.println("我是mode-consumer-group-b组的第一个消费者:" + message);
                                              }
                                              @Override
                                              public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
                                                  defaultMQPushConsumer.setInstanceName("第一个消费者");
                                              }
                                          }
                                          @Component
                                          @RocketMQMessageListener(topic = "modeTopic",
                                                  consumerGroup = "mode-consumer-group-b",
                                                  messageModel = MessageModel.BROADCASTING // 广播模式
                                          )
                                          public class DC5 implements RocketMQListener
                                              @Override
                                              public void onMessage(String message) {
                                                  System.out.println("我是mode-consumer-group-b组的第二个消费者:" + message);
                                              }
                                              @Override
                                              public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
                                                  defaultMQPushConsumer.setInstanceName("第二个消费者");
                                              }
                                          }
                                          @Component
                                          @RocketMQMessageListener(topic = "modeTopic",
                                                  consumerGroup = "mode-consumer-group-b",
                                                  messageModel = MessageModel.BROADCASTING // 广播模式
                                          )
                                          public class DC6 implements RocketMQListener
                                              @Override
                                              public void onMessage(String message) {
                                                  System.out.println("我是mode-consumer-group-b组的第三个消费者:" + message);
                                              }
                                              @Override
                                              public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
                                                  defaultMQPushConsumer.setInstanceName("第三个消费者");
                                              }
                                          }
                                          
                                              for (int i = 1; i 
                                                  rocketMQTemplate.syncSend("modeTopic", "我是第" + i + "个消息");
                                              }
                                          }
                                          
VPS购买请点击我

免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们,邮箱:ciyunidc@ciyunshuju.com。本站只作为美观性配图使用,无任何非法侵犯第三方意图,一切解释权归图片著作权方,本站不承担任何责任。如有恶意碰瓷者,必当奉陪到底严惩不贷!

目录[+]