某某会员小程序后端性能优化
背景
某某会员小程序后台提供开放平台能力,为三方油站提供会员积分、优惠劵等api。当用户在油站加油,油站收银会调用我们系统为用户发放积分、优惠劵等。用户反馈慢,三方调用发放积分接口性能极低,耗时30s+;
接口情况
发放积分接口业务负责,且用存储过程写的业务,改动风险极大
数据库情况
优惠卷等表,数据量800w+,甚至存在单表3000w+
优化方案
数据库数据归档
归档交易、用户优惠劵等表历史数据,比如归档三个月前的数据(根据实际情况补充归档条件,如用户优惠劵没使用或没过期的数据不能归档)
优化效果:存储过程耗时从30s降低到7s,但是作为Toc用途接口性能远远不达标,优化数据库sql或许能进一步降低响应时间,但是存储过程复杂优化费时费力风险大
方案 | 描述 | 风险 | 工作量 | 难度 | 是否能解决性能问题 | 是否解决并发冲突 | 影响 | 使用技术 |
---|---|---|---|---|---|---|---|---|
方案1 | java重写存储过程业务 | 大 | 大 | 大 | 一定程度能解决 | yes | 改动点多,业务影响大 | java + orm |
方案2 | 保证存储过程全局串行执行 | 小 | 小 | 大 | no | yes | 接口性能会降低 | 分布式锁 |
方案3 | 异步下存储过程全局串行执行 | 中 | 中 | 中 | yes | yes | rabbitmq+分布式锁+自旋锁 |
线程池异步化分析
接口中存储串行调用改为异步调用,
使用线程池异步化存在问题
开始简单使用线程池异步化,但是出现锁表的情况(原因存储过程没有保证原子性,并且其中大量使用临时表,并发下出现竞争锁表),而SqlServer自带的死锁检查机制杀死事务导致发放积分失败
线程池+分布式锁
异步线程【不能保证分布式环境的全局顺序执行】,使用分布式锁能保证同一个时间只有一个存储过程执行
问题:但是并发情况会将存储过程执行堆积在线程池,并发过大存在OOM风险,或者处理丢失风险
rabbitmq异步改造
可行性验证报告结论
验证通过点如下:
- 测试rabbitmq发送/接收消息【通过】
- 测试并发下分布式锁+自旋锁保证业务串行执行【通过】
- 测试并发下分布式锁+自旋锁+mq保证业务串行执行【通过】
- 测试业务幂等性保证不重复消费【通过】
- 测试手动ack兼容原来配置保证可靠性【通过】
当前项目rabbitmq使用方式问题分析
配置发下
spring.rabbitmq.host=172.18.229.23 spring.rabbitmq.port=5672 spring.rabbitmq.username=totaltest spring.rabbitmq.password=totaltest spring.rabbitmq.virtual-host=/totaltest/ spring.rabbitmq.publisher-confirms=false
该配置没有
spring.rabbitmq.listener.direct.acknowledge-mode=manual spring.rabbitmq.listener.simple.acknowledge-mode=manual
若是不配默认为
spring.rabbitmq.listener.direct.acknowledge-mode=auto spring.rabbitmq.listener.simple.acknowledge-mode=auto
rabbitmq消费者ack机制问题分析
spring.rabbitmq.listener.direct.acknowledge-mode是用于配置Spring Boot应用中RabbitMQ消息监听器的确认模式。确认模式决定了在消费者处理消息后如何通知RabbitMQ服务器来确认消息的接收情况。
该配置有以下几种可选的值:
- AUTO: 在这种模式下,消费者处理消息后,RabbitMQ会自动确认消息。这意味着消息一旦被消费者接收,就会立即从队列中删除。这是默认的确认模式。
- MANUAL: 在这种模式下,消费者需要显式地发送确认消息来告知RabbitMQ服务器消息已经被成功处理。这意味着消费者可以在处理消息后决定是否要确认消息。通常在需要进行消息处理的事务性操作时使用这种模式。
- NONE: 在这种模式下,消费者不会发送任何确认消息,也不会被要求发送确认消息。这意味着消息会在被传递给消费者之后立即被视为已经被确认。
问题:项目中该配置使用的模式配置,以为着没有手动ack,即消费者接收到消息,消息就会从mq中删除,若是消费者消费异常,则消息丢失不可追溯复原
rabbitmq生产者ack机制问题分析
项目中配置如下
spring.rabbitmq.publisher-confirms=false
spring.rabbitmq.publisher-confirms是Spring Boot中用于配置RabbitMQ生产者消息确认的属性。它用于控制是否启用RabbitMQ的发布确认机制,以确保消息成功发送到Exchange。
当spring.rabbitmq.publisher-confirms属性设置为true时,表示启用了RabbitMQ的发布确认机制。在这种情况下,当生产者发送消息到Exchange后,RabbitMQ会发送一个确认消息给生产者,告知消息是否成功到达Exchange。生产者可以根据收到的确认消息来判断消息是否成功发送,从而进行相应的处理。
当spring.rabbitmq.publisher-confirms属性设置为false时,表示禁用了RabbitMQ的发布确认机制。在这种情况下,生产者发送消息到Exchange后,不会收到确认消息,也无法得知消息是否成功到达Exchange。
通常情况下,建议将spring.rabbitmq.publisher-confirms属性设置为true,以确保消息的可靠发送。当然,具体是否启用发布确认机制,还取决于业务场景和对消息可靠性的要求。
rabbitmq消息可靠性问题分析
通过上诉【rabbitmq生产者ack机制问题分析】和【rabbitmq消费者ack机制问题分析】
可知当前项目中消息没有保证消息可靠性,rabbitmq宕机恢复、消费者消费异常都会导致消息丢失,导致业务完整性缺失
rabbitmq配置最小改动方案
上诉问题若想得到解决需项目中rabbitmq配置,会影响到原来所有使用mq的地方,避免影响范围较大
解决方案:新增消费者类似,通过设置不同的消费者来实现接收指定的消息需要手动 ack
测试rabbitmq配置发送接收消息【通过】
rabbitmq和springboot对应版本:3. Reference
创建虚拟host
创建测试 交换机和queues
Exchange:exchange-1 Queue:queue-1 key:springboot.*
spring.rabbitmq.listener.order.queue.name=queue-2 spring.rabbitmq.listener.order.queue.durable=true spring.rabbitmq.listener.order.exchange.name=exchange-2 spring.rabbitmq.listener.order.exchange.durable=true spring.rabbitmq.listener.order.exchange.type=topic spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true spring.rabbitmq.listener.order.key=springboot.*
发送消息
package com.bfxy.springboot.producer; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback; import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; import com.bfxy.springboot.entity.Order; @Component public class RabbitSender { private static final Logger LOGGER = LoggerFactory.getLogger(RabbitSender.class); //自动注入RabbitTemplate模板类 @Autowired private RabbitTemplate rabbitTemplate; //回调函数: confirm确认 final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.err.println("correlationData: " + correlationData); System.err.println("ack: " + ack); if(!ack){ System.err.println("异常处理...."); } } }; //回调函数: return返回 final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText, String exchange, String routingKey) { System.err.println("return exchange: " + exchange + ", routingKey: " + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText); } }; //发送消息方法调用: 构建Message消息 public void send(Object message, Map properties) { LOGGER.info("消息内容:{}",message); LOGGER.info("properties:{}",properties); try { MessageHeaders mhs = new MessageHeaders(properties); Message msg = MessageBuilder.createMessage(message, mhs); rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); //id + 时间戳 全局唯一 CorrelationData correlationData = new CorrelationData("1234567890"); rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData); }catch (Exception e){ LOGGER.error("发送消息异常,message:{}",message); } } //发送消息方法调用: 构建自定义对象消息 public void sendOrder(Order order) { LOGGER.info("订单消息内容:{}",order); try { rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); //id + 时间戳 全局唯一 CorrelationData correlationData = new CorrelationData("0987654321"); rabbitTemplate.convertAndSend("exchange-2", "springboot.def", order, correlationData); }catch (Exception e){ LOGGER.error("订单发送消息异常,message:{}",order); } } }
测试代码
package com.bfxy.springboot; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.Map; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import com.bfxy.springboot.entity.Order; import com.bfxy.springboot.producer.RabbitSender; @RunWith(SpringRunner.class) @SpringBootTest public class ApplicationTests { @Test public void contextLoads() { } @Autowired private RabbitSender rabbitSender; private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); @Test public void testSender1() throws Exception { Map properties = new HashMap(); properties.put("number", "12345"); properties.put("send_time", simpleDateFormat.format(new Date())); rabbitSender.send("Hello RabbitMQ For Spring Boot!", properties); } @Test public void testSender2() throws Exception { Order order = new Order("001", "第一个订单"); rabbitSender.sendOrder(order); } }
接收消息
package com.bfxy.springboot.conusmer; import java.util.Map; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.Message; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import com.rabbitmq.client.Channel; @Component public class RabbitReceiver { /** * 用于标识方法是一个RabbitMQ消息的监听方法,用于监听指定的队列,并在接收到消息时调用该方法进行处理。 * 可以指定队列、交换机、路由键等属性,用于配置消息监听的相关信息。 * 通常与@RabbitHandler一起使用,将消息监听和消息处理方法关联起来。 */ @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "queue-1", durable="true"), exchange = @Exchange(value = "exchange-1", durable="true", type= "topic", ignoreDeclarationExceptions = "true"), key = "springboot.*" ) ) /** * 用于标识方法是一个RabbitMQ消息的处理方法。 * 通常与@RabbitListener一起使用,用于指定具体的消息处理方法。 * 通过@RabbitHandler注解标识的方法可以处理多个不同类型的消息,通过方法参数的类型来区分不同的消息类型。 */ @RabbitHandler public void onMessage(Message message, Channel channel) throws Exception { System.err.println("--------------------------------------"); System.err.println("消费端Payload: " + message.getPayload()); Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG); //手工ACK channel.basicAck(deliveryTag, false); } /** * * spring.rabbitmq.listener.order.queue.name=queue-2 spring.rabbitmq.listener.order.queue.durable=true spring.rabbitmq.listener.order.exchange.name=exchange-1 spring.rabbitmq.listener.order.exchange.durable=true spring.rabbitmq.listener.order.exchange.type=topic spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true spring.rabbitmq.listener.order.key=springboot.* * @param order * @param channel * @param headers * @throws Exception */ @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}", durable="${spring.rabbitmq.listener.order.queue.durable}"), exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}", durable="${spring.rabbitmq.listener.order.exchange.durable}", type= "${spring.rabbitmq.listener.order.exchange.type}", ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"), key = "${spring.rabbitmq.listener.order.key}" ) ) @RabbitHandler public void onOrderMessage(@Payload com.bfxy.springboot.entity.Order order, Channel channel, @Headers Map headers) throws Exception { System.err.println("--------------------------------------"); System.err.println("消费端order: " + order.getId()); Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG); //手工ACK channel.basicAck(deliveryTag, false); } }
断点测试
测试分布式锁+自旋锁测试串行执行【通过】
测试并发分布式锁顺序执行业务代码
package com.bfxy.springboot; import org.junit.Test; import org.junit.runner.RunWith; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @RunWith(SpringRunner.class) @SpringBootTest public class ApplicationTests { private static final Logger LOGGER = LoggerFactory.getLogger(ApplicationTests.class); @Autowired RedissonClient redissonClient; @Test public void contextLoads() { long startTime = System.currentTimeMillis(); for (int i = 1;i int finalI = i; CompletableFuture.runAsync(()-{ bizLock(String.valueOf(finalI)); }); } while (true){} } private void bizLock(String taskName) { RLock lock = redissonClient.getLock("my-lock"); boolean locked = false; try { while (!locked) { locked = lock.tryLock(); if (locked) { try { biz(3000, taskName); System.out.println("----------------"); } finally { lock.unlock(); } } else { // 未获取到锁,可以进行一些等待操作,比如休眠一段时间后再尝试获取锁 Thread.sleep(100); } } } catch (Exception e) { e.printStackTrace(); } } private void biz(Integer time,String taskName) throws Exception{ long startTime = System.currentTimeMillis(); LOGGER.info("任务序号={},任务执行开始时间={}",taskName,startTime); Thread.sleep(time); long endtime = System.currentTimeMillis(); LOGGER.info("任务序号={},任务执行结束时间={}",taskName,startTime); LOGGER.info("任务序号={},任务执行消耗时间={}",taskName,(endtime-startTime)); } }
执行日志如下:【测试结果并发串行执行(同一时间只有一个任务执行)】
2024-07-10 13:46:49.587 INFO 36076 --- [onPool-worker-9] com.bfxy.springboot.ApplicationTests : 任务序号=1,任务执行开始时间=1720590409587 2024-07-10 13:46:52.601 INFO 36076 --- [onPool-worker-9] com.bfxy.springboot.ApplicationTests : 任务序号=1,任务执行结束时间=1720590409587 2024-07-10 13:46:52.601 INFO 36076 --- [onPool-worker-9] com.bfxy.springboot.ApplicationTests : 任务序号=1,任务执行消耗时间=3014 ---------------- 2024-07-10 13:46:52.665 INFO 36076 --- [onPool-worker-4] com.bfxy.springboot.ApplicationTests : 任务序号=4,任务执行开始时间=1720590412665 2024-07-10 13:46:55.678 INFO 36076 --- [onPool-worker-4] com.bfxy.springboot.ApplicationTests : 任务序号=4,任务执行结束时间=1720590412665 2024-07-10 13:46:55.678 INFO 36076 --- [onPool-worker-4] com.bfxy.springboot.ApplicationTests : 任务序号=4,任务执行消耗时间=3013 ---------------- 2024-07-10 13:46:55.759 INFO 36076 --- [onPool-worker-2] com.bfxy.springboot.ApplicationTests : 任务序号=2,任务执行开始时间=1720590415759 2024-07-10 13:46:58.761 INFO 36076 --- [onPool-worker-2] com.bfxy.springboot.ApplicationTests : 任务序号=2,任务执行结束时间=1720590415759 2024-07-10 13:46:58.761 INFO 36076 --- [onPool-worker-2] com.bfxy.springboot.ApplicationTests : 任务序号=2,任务执行消耗时间=3002
压力测试对比资源使用情况
结论使用线程池比较消耗资源,特别是内存,一点并发上来可能oom
压测前
200并发
1000并发
2000并发
测试分布式锁+自旋锁+mq全局串行执行【通过】
使用线程池控制会导致请求积压到线程池消耗cpu和内存资源,使用mq能有效削峰限流(减小服务器资源消耗),线上部署了两个节点即并发为2
消费者代码
/** * * spring.rabbitmq.listener.order.queue.name=queue-2 spring.rabbitmq.listener.order.queue.durable=true spring.rabbitmq.listener.order.exchange.name=exchange-1 spring.rabbitmq.listener.order.exchange.durable=true spring.rabbitmq.listener.order.exchange.type=topic spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true spring.rabbitmq.listener.order.key=springboot.* * @param order * @param channel * @param headers * @throws Exception */ @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}", durable="${spring.rabbitmq.listener.order.queue.durable}"), exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}", durable="${spring.rabbitmq.listener.order.exchange.durable}", type= "${spring.rabbitmq.listener.order.exchange.type}", ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"), key = "${spring.rabbitmq.listener.order.key}" ) ) @RabbitHandler public void onOrderMessage(@Payload com.bfxy.springboot.entity.Order order, Channel channel, @Headers Map headers) throws Exception { //System.err.println("--------------------------------------"); //System.err.println("消费端order: " + order.getId()); Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG); onLockOrderMessage(order); //手工ACK channel.basicAck(deliveryTag, false); } @Autowired RedissonClient redissonClient; private void onLockOrderMessage(com.bfxy.springboot.entity.Order order) { RLock lock = redissonClient.getLock("my-lock"); boolean locked = false; try { while (!locked) { locked = lock.tryLock(); if (locked) { try { long startTime = System.currentTimeMillis(); String id = order.getId(); LOGGER.info("订单序号={},订单执行开始时间={}",id,startTime); Thread.sleep(7000); long endtime = System.currentTimeMillis(); LOGGER.info("订单序号={},订单执行结束时间={}",id,startTime); LOGGER.info("订单序号={},订单执行消耗时间={}",id,(endtime-startTime)); System.out.println("----------------"); } finally { lock.unlock(); } } else { // 未获取到锁,可以进行一些等待操作,比如休眠一段时间后再尝试获取锁 Thread.sleep(100); } } } catch (Exception e) { e.printStackTrace(); } }
生产者代码
@Test public void testSender3() throws Exception { for (int i = 1;i int finalI = i; CompletableFuture.runAsync(()-{ Order order = new Order(String.valueOf(finalI), "第"+finalI+"个订单"); rabbitSender.sendOrder(order); }); System.err.println("发送消息订单:"+finalI); if (i%5==0){ Thread.sleep(1000); } } }
启动两个消费者【验证全局串行:同一时间只有一个业务执行】
记录消费日志验证是否串行
- 通过日志可知:单个消费者消费顺序执行
- 验证消费者1和2直接业务串行
消费者2:15:18:22 到 15:18:50 之间没有接收到消息【串行执行】
验证消费1:15:18:22 到 15:18:50时间段消息情况【串行执行】
业务幂等性保障测试【通过】
mq接收到消息会将消息中的uid放入redis,当重复消费时会进行判断,保障业务幂等性
@Test public void time() { // 获取字符串对象 String key = "myKey"; String value = "Hello, Redis!"; RBucket bucket = redissonClient.getBucket(key); bucket.set(value, 30, TimeUnit.SECONDS); // 设置失效时间为10秒 }
幂等逻辑
// 判断key是否存在 if(bucket.isExists()){ LOGGER.error("重复消费,id={}",id); // 重复消息不执行业务逻辑跳出直接ack break; }else { marker(id); }
rabbitmq配置生效测试
原项目配置【自动ack测试】-【通过】
测试自动ack是否生效
package com.bfxy.springboot.config; import org.springframework.amqp.core.AcknowledgeMode; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Scope; @Configuration public class RabbitMQConfig { @Value("${spring.rabbitmq.host}") private String addresses; @Value("${spring.rabbitmq.port}") private String port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; @Value("${spring.rabbitmq.virtual-host}") private String virtualHost; @Value("${spring.rabbitmq.publisher-confirms}") private boolean publisherConfirms; @Bean /** 因为要设置回调类,所以应是prototype类型,如果是singleton类型,则回调类为最后一次设置 */ // @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public RabbitTemplate rabbitTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory()); return rabbitTemplate; } @Bean public RabbitTemplate manualAckRabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); // 配置手动ACK rabbitTemplate.setChannelTransacted(true); rabbitTemplate.setChannelTransacted(true); rabbitTemplate.setChannelTransacted(true); rabbitTemplate.setChannelTransacted(true); rabbitTemplate.setAcknowledgeMode(AcknowledgeMode.MANUAL); return rabbitTemplate; } @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses(addresses + ":" + port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(virtualHost); /** 如果要进行消息回调,则这里必须要设置为true */ connectionFactory.setPublisherConfirms(publisherConfirms); return connectionFactory; } @Bean("mqContainerFactory") @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public SimpleRabbitListenerContainerFactory containerFactory(ConnectionFactory connectionFactory, MessageConverter messageConverter) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(messageConverter); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); return factory; } }
spring.rabbitmq.addresses=127.0.0.1:5672 spring.rabbitmq.username=admin spring.rabbitmq.password=admin spring.rabbitmq.virtual-host=total-api spring.rabbitmq.connection-timeout=15000
发送消息,没ack前控制台信息
等待一会,自动ack的消息从rabbitmq中删除了
新增配置【手动ack测试】-【通过】
rabbitmq如何实现接受指定的消息要手动ack,其他消息自动ack
import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { @Bean public RabbitListenerContainerFactory manualAckListenerContainerFactory() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); return factory; } @Bean public RabbitListenerContainerFactory autoAckListenerContainerFactory() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setAcknowledgeMode(AcknowledgeMode.AUTO); return factory; } }
import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { @Bean public RabbitListenerContainerFactory manualAckListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); return factory; } @Bean public RabbitListenerContainerFactory autoAckListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setAcknowledgeMode(AcknowledgeMode.AUTO); return factory; } }
消费者代码指定手动ack,并注释手动ack
查看rabbitmq中消息是否被删除,预期消息不会删除
放开手动ack注释,再次测试
兜底保证方案
消息处理可能失败,处理失败的消息记录到broker_message_log表中
-- 表 broker_message_log 消息记录结构 CREATE TABLE IF NOT EXISTS `broker_message_log` ( `message_id` varchar(128) NOT NULL, -- 消息唯一ID `message` varchar(4000) DEFAULT NULL, -- 消息内容 `try_count` int(4) DEFAULT '0', -- 重试次数 `status` varchar(10) DEFAULT '', -- 消息投递状态 0 投递中 1 投递成功 2 投递失败 `next_retry` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', --下一次重试时间 或 超时时间 `create_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', --创建时间 `update_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00', --更新时间 PRIMARY KEY (`message_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
通过定时任务重新执行失败的消息
执行点设计
- 定时任务重目标业务方法(该方式要将业务封装某个class的某个方法中,失败时会录入表中)
- 发送mq在消费中执行