RocketMQ源码学习笔记:消费者启动流程
这是本人学习的总结,主要学习资料如下
- 马士兵教育
- rocketMq官方文档
目录
- 1、前置知识
- 1.1、pull和push型消费者
- 1.2、消息CommitLog到ConsumeQueue
- 1.3、自动创建的重试主题
- 1.4、广播型消费和集群型消费中offset的存储位置
- 2、消费中的启动流程
- 2.1、Preview
- 2.2、校验,配置,启动
1、前置知识
1.1、pull和push型消费者
消费者可分为两种,一种是pull另一种是push。
pull是消费者主动到broker上拿消息,push则是设置监听等broker发信息过来。
当然,底层实现其实都是pull,push模式下broker只是发送一个通知,然后消费者到broker上拉取消息。
1.2、消息CommitLog到ConsumeQueue
消费者读取数据时不会去读取CommitLog,而是CunsumeQueue。
生成者发送信息只会到CommitLog中发送消息,不管ConsumeQueue。
Broker专门启动了一个后台线程从CommitLog中将消息同步到ConsumeQueue,就是ReputMessageService。
这种与消息相关的服务基本上都在在DefaultMessageStore中进行构建和启动。
ReputMessageService这个类源码没什么特点或者技术亮点,代码一层套一层比较繁琐,没有必要太过深究,只需要知道它的功能即可。
1.3、自动创建的重试主题
消费消息失败时我们可以返回重试枚举return ConsumeConcurrentlyStatus.RECONSUME_LATER;。
这样隔一段时间后消费者会再次尝试消费消息。
重试机制会为原来的topic创建一个信息重试topic。
在dashborad界面NORMAL就是我们平时创建的一般的topic,RETRY就是系统自动创建的重试topic,一般是在原topic前加上%RETRY%。
1.4、广播型消费和集群型消费中offset的存储位置
offsetStore表示消息消费的偏移量的存储位置。
从上图可以看到,广播型和集群型的消费模式有不同的offsetStore的值。
查看源码会发现,集群型消费的偏移量需要从远程的broker中获取;而广播型的offsetStore只在本地叫offsets.json的文件中维护偏移量,下图是广播型消费创建offsetStore的源码。
这是因为广播型的消息是给每个messageQueue都塞了消息,不像集群型一条消息只塞给其中一个messageQueue,所以广播型消费的特点是不保证每个consumer都能消费到消息,而集群型需要严格保证这一点。
所以偏移量放在本地可以降低通讯成本,正好广播型不需要保证消息一定被消费,那即使消费者宕机offset丢失也是可以接受的。
而集群型要保证消息一定被消费,就需要将offset保存在远程的broker,这样即使消费者宕机消息在broker中也是未消费的状态。
2、消费中的启动流程
2.1、Preview
这是一个简单的Consumer的代码。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group1"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("topic1", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List list , ConsumeConcurrentlyContext consumeConcurrentlyContext) { // todo } }); consumer.start();
前面的设置结束后就会通过start()启动服务拉取消息,接下来会进入start()查看源码了解消费者的启动过程。
2.2、校验,配置,启动
刚启动时,状态是CREATE_JUST,我们进入这个分支查看源码。
服务启动前都要做一些校验,然后配置初始化必要的bean,其中就包括为以后可能的消费失败创建一个重试topic。
这些配置和检验不细说。不过有一个配置offsetStore比较有意思。
广播型消费的offset存储在本地的offsets.json中,集群型消费的offset存储在远程。下面是相关代码。
各种配置之后开始启动消费者服务。