RocketMQ源码学习笔记:消费者启动流程

07-17 919阅读

这是本人学习的总结,主要学习资料如下

  • 马士兵教育
  • rocketMq官方文档

    目录

    • 1、前置知识
      • 1.1、pull和push型消费者
      • 1.2、消息CommitLog到ConsumeQueue
      • 1.3、自动创建的重试主题
      • 1.4、广播型消费和集群型消费中offset的存储位置
      • 2、消费中的启动流程
        • 2.1、Preview
        • 2.2、校验,配置,启动

          1、前置知识

          1.1、pull和push型消费者

          消费者可分为两种,一种是pull另一种是push。

          pull是消费者主动到broker上拿消息,push则是设置监听等broker发信息过来。

          当然,底层实现其实都是pull,push模式下broker只是发送一个通知,然后消费者到broker上拉取消息。

          1.2、消息CommitLog到ConsumeQueue

          消费者读取数据时不会去读取CommitLog,而是CunsumeQueue。

          生成者发送信息只会到CommitLog中发送消息,不管ConsumeQueue。

          Broker专门启动了一个后台线程从CommitLog中将消息同步到ConsumeQueue,就是ReputMessageService。

          这种与消息相关的服务基本上都在在DefaultMessageStore中进行构建和启动。

          RocketMQ源码学习笔记:消费者启动流程

          ReputMessageService这个类源码没什么特点或者技术亮点,代码一层套一层比较繁琐,没有必要太过深究,只需要知道它的功能即可。

          1.3、自动创建的重试主题

          消费消息失败时我们可以返回重试枚举return ConsumeConcurrentlyStatus.RECONSUME_LATER;。

          这样隔一段时间后消费者会再次尝试消费消息。

          重试机制会为原来的topic创建一个信息重试topic。

          在dashborad界面NORMAL就是我们平时创建的一般的topic,RETRY就是系统自动创建的重试topic,一般是在原topic前加上%RETRY%。

          RocketMQ源码学习笔记:消费者启动流程

          1.4、广播型消费和集群型消费中offset的存储位置

          offsetStore表示消息消费的偏移量的存储位置。

          RocketMQ源码学习笔记:消费者启动流程

          从上图可以看到,广播型和集群型的消费模式有不同的offsetStore的值。

          查看源码会发现,集群型消费的偏移量需要从远程的broker中获取;而广播型的offsetStore只在本地叫offsets.json的文件中维护偏移量,下图是广播型消费创建offsetStore的源码。

          RocketMQ源码学习笔记:消费者启动流程

          这是因为广播型的消息是给每个messageQueue都塞了消息,不像集群型一条消息只塞给其中一个messageQueue,所以广播型消费的特点是不保证每个consumer都能消费到消息,而集群型需要严格保证这一点。

          所以偏移量放在本地可以降低通讯成本,正好广播型不需要保证消息一定被消费,那即使消费者宕机offset丢失也是可以接受的。

          而集群型要保证消息一定被消费,就需要将offset保存在远程的broker,这样即使消费者宕机消息在broker中也是未消费的状态。

          2、消费中的启动流程

          2.1、Preview

          RocketMQ源码学习笔记:消费者启动流程

          这是一个简单的Consumer的代码。

          DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group1");
          consumer.setNamesrvAddr("localhost:9876");
          consumer.subscribe("topic1", "*");
          consumer.registerMessageListener(new MessageListenerConcurrently() {
              @Override
              public ConsumeConcurrentlyStatus consumeMessage(List list
                      , ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                 // todo
              }
          });
          consumer.start();
          

          前面的设置结束后就会通过start()启动服务拉取消息,接下来会进入start()查看源码了解消费者的启动过程。

          2.2、校验,配置,启动

          刚启动时,状态是CREATE_JUST,我们进入这个分支查看源码。

          RocketMQ源码学习笔记:消费者启动流程

          服务启动前都要做一些校验,然后配置初始化必要的bean,其中就包括为以后可能的消费失败创建一个重试topic。

          这些配置和检验不细说。不过有一个配置offsetStore比较有意思。

          广播型消费的offset存储在本地的offsets.json中,集群型消费的offset存储在远程。下面是相关代码。

          RocketMQ源码学习笔记:消费者启动流程

          各种配置之后开始启动消费者服务。

VPS购买请点击我

文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

目录[+]