RocketMQ(三):集成SpringBoot
RocketMQ系列文章
RocketMQ(一):基本概念和环境搭建
RocketMQ(二):原生API快速入门
RocketMQ(三):集成SpringBoot
目录
- 一、搭建环境
- 二、不同类型消息
- 1、同步消息
- 2、异步消息
- 3、单向消息
- 4、延迟消息
- 5、顺序消息
- 6、带tag消息
- 7、带key消息
- 三、消息消费两种模式
- 1、负载均衡模式
- 2、广播模式
一、搭建环境
- 需要创建两个服务,消息生产服务和消息消费者服务
- 生产消息存在多个服务,消费则统一由一个服务处理
- 这样可以做到解耦
pom.xml
- 生产者和消费者都需要
org.apache.rocketmq rocketmq-spring-boot-starter 2.2.2生产者配置文件
- 设置统一的生产者组,这样发送消息时就不用指定了
rocketmq: name-server: 127.0.0.1:9876 # rocketMq的nameServer地址 producer: group: boot-producer-group # 生产者组别 send-message-timeout: 3000 # 消息发送的超时时间 retry-times-when-send-async-failed: 2 # 异步消息发送失败重试次数 max-message-size: 4194304 # 消息的最大长度生产者配置文件
- 不能设置统一的消费者组,因为不同的消费者订阅关系不一致,需要设置不同的消费者组
rocketmq: name-server: localhost:9876二、不同类型消息
直接引入即可
@Autowired private RocketMQTemplate rocketMQTemplate;
1、同步消息
生产消息
- 消息由消费者发送到broker后,会得到一个确认,是具有可靠性的
- 比如:重要的消息通知,短信通知等
rocketMQTemplate.syncSend("bootTestTopic", "我是boot的一个消息");消费消息
- RocketMQListener的泛型类型即消息类型
- MessageExt类型是消息的所有内容
- 其他类型则就只是消息体内容,没有消息头内容(keys、msgId、延迟时间、重试次数、主题名称...)
- onMessage方法内没有报错就是签收了,报错就是拒收会重试
@Component @RocketMQMessageListener(topic = "bootTestTopic", consumerGroup = "boot-test-consumer-group") public class ABootSimpleMsgListener implements RocketMQListener { @Override public void onMessage(MessageExt message) { System.out.println(new String(message.getBody())); } }2、异步消息
- 发送异步消息,发送完以后会有一个异步通知
- 不影响程序往下执行
rocketMQTemplate.asyncSend("bootAsyncTestTopic", "我是boot的一个异步消息", new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("成功"); } @Override public void onException(Throwable throwable) { System.out.println("失败" + throwable.getMessage()); } });3、单向消息
- 不关心发送结果的场景,这种方式吞吐量很大,但是存在消息丢失的风险
- 例如日志信息的发送
rocketMQTemplate.sendOneWay("bootOnewayTopic", "单向消息");4、延迟消息
- RocketMQ不支持任意时间的延时
- 只支持以下18个固定的延时等级,等级1就对应1s,以此类推,最高支持2h延迟
- private String messageDelayLevel = “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;
- 发送一个延时消息,延迟等级为4级,也就是30s后被监听消费
Message msg = MessageBuilder.withPayload("我是一个延迟消息").build(); rocketMQTemplate.syncSend("bootMsTopic", msg, 3000, 4);5、顺序消息
生产消息
- 根据syncSendOrderly方法的第三个参数计算hash值决定消息放入哪个队列
// 顺序消息 发送者放 需要将一组消息 都发在同一个队列中去 消费者 需要单线程消费 List msgModels = Arrays.asList( new MsgModel("qwer", 1, "下单"), new MsgModel("qwer", 1, "短信"), new MsgModel("qwer", 1, "物流"), new MsgModel("zxcv", 2, "下单"), new MsgModel("zxcv", 2, "短信"), new MsgModel("zxcv", 2, "物流") ); msgModels.forEach(msgModel -> { // 发送 一般都是以json的方式进行处理 // 根据第三个参数计算hash值决定消息放入哪个队列 rocketMQTemplate.syncSendOrderly("bootOrderlyTopic", JSON.toJSONString(msgModel), msgModel.getOrderSn()); });消费消息
- 默认是并发消费模式,可以设置为单线程顺序模式
- 设置消费重试次数
@Component @RocketMQMessageListener(topic = "bootOrderlyTopic", consumerGroup = "boot-orderly-consumer-group", consumeMode = ConsumeMode.ORDERLY, // 顺序消费模式 单线程 maxReconsumeTimes = 5 // 消费重试的次数 ) public class BOrderlyMsgListener implements RocketMQListener { @Override public void onMessage(MessageExt message) { MsgModel msgModel = JSON.parseObject(new String(message.getBody()), MsgModel.class); System.out.println(msgModel); } }6、带tag消息
- tag带在主题后面用:来携带
rocketMQTemplate.syncSend("bootTagTopic:tagA", "我是一个带tag的消息");7、带key消息
Message message = MessageBuilder .withPayload("我是一个带key的消息") .setHeader(RocketMQHeaders.KEYS, "10086") .build(); rocketMQTemplate.syncSend("bootKeyTopic", message);获取带key和tag的消费者
- 过滤模式有两种:正则表达式和sql92方式
- keys从MessageExt对象中获取
@Component @RocketMQMessageListener(topic = "bootTagTopic", consumerGroup = "boot-tag-consumer-group", selectorType = SelectorType.TAG,// tag过滤模式 selectorExpression = "tagA || tagB" // selectorType = SelectorType.SQL92,// sql92过滤模式 // selectorExpression = "a in (3,5,7)" // broker.conf中开启enbalePropertyFilter=true ) public class CTagMsgListener implements RocketMQListener { @Override public void onMessage(MessageExt message) { System.out.println("获取keys: " + message.getKeys()); System.out.println("消息内容: " + new String(message.getBody())); } }查看源码
- destination目标 = 主题 : 标签
- keys从消息头里面获取
三、消息消费两种模式
- Rocketmq消息消费的模式分为两种:负载均衡模式和广播模式
- 负载均衡模式表示多个消费者交替消费同一个主题里面的消息
- 广播模式表示每个消费者都消费一遍订阅的主题的消息
1、负载均衡模式
- 队列会被消费者分摊
- 队列数量应该>=消费者数量,否则多出来的消费者永远接收不到消息
- mq服务器会记录消息的消费点位(即消息是否被消费)
创建多个消费者监听同一个主题
@Component @RocketMQMessageListener(topic = "modeTopic", consumerGroup = "mode-consumer-group-a", messageModel = MessageModel.CLUSTERING, // 集群模式(负载均衡) ) public class DC1 implements RocketMQListener { @Override public void onMessage(String message) { System.out.println("我是mode-consumer-group-a组的第一个消费者:" + message); } } @Component @RocketMQMessageListener(topic = "modeTopic", consumerGroup = "mode-consumer-group-a", messageModel = MessageModel.CLUSTERING // 集群模式(负载均衡) ) public class DC2 implements RocketMQListener { @Override public void onMessage(String message) { System.out.println("我是mode-consumer-group-a组的第二个消费者:" + message); } } @Component @RocketMQMessageListener(topic = "modeTopic", consumerGroup = "mode-consumer-group-a", messageModel = MessageModel.CLUSTERING // 集群模式(负载均衡) ) public class DC3 implements RocketMQListener { @Override public void onMessage(String message) { System.out.println("我是mode-consumer-group-a组的第三个消费者:" + message); } }生产者发送多条消息
@Test public void modeTest() throws Exception { for (int i = 1; i rocketMQTemplate.syncSend("modeTopic", "我是第" + i + "个消息"); } } @Override public void onMessage(String message) { System.out.println("我是mode-consumer-group-b组的第一个消费者:" + message); } @Override public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) { defaultMQPushConsumer.setInstanceName("第一个消费者"); } } @Component @RocketMQMessageListener(topic = "modeTopic", consumerGroup = "mode-consumer-group-b", messageModel = MessageModel.BROADCASTING // 广播模式 ) public class DC5 implements RocketMQListener @Override public void onMessage(String message) { System.out.println("我是mode-consumer-group-b组的第二个消费者:" + message); } @Override public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) { defaultMQPushConsumer.setInstanceName("第二个消费者"); } } @Component @RocketMQMessageListener(topic = "modeTopic", consumerGroup = "mode-consumer-group-b", messageModel = MessageModel.BROADCASTING // 广播模式 ) public class DC6 implements RocketMQListener @Override public void onMessage(String message) { System.out.println("我是mode-consumer-group-b组的第三个消费者:" + message); } @Override public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) { defaultMQPushConsumer.setInstanceName("第三个消费者"); } } for (int i = 1; i rocketMQTemplate.syncSend("modeTopic", "我是第" + i + "个消息"); } }
- tag带在主题后面用:来携带
- 根据syncSendOrderly方法的第三个参数计算hash值决定消息放入哪个队列
- RocketMQListener的泛型类型即消息类型
- 不能设置统一的消费者组,因为不同的消费者订阅关系不一致,需要设置不同的消费者组
- 设置统一的生产者组,这样发送消息时就不用指定了
- 生产者和消费者都需要
免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们,邮箱:ciyunidc@ciyunshuju.com。本站只作为美观性配图使用,无任何非法侵犯第三方意图,一切解释权归图片著作权方,本站不承担任何责任。如有恶意碰瓷者,必当奉陪到底严惩不贷!

