Spring整合Kafka(十七)----处理异常

06-15 508阅读

  • 一、监听器错误处理程序Listener Error Handlers
  • 二、容器错误处理程序Container Error Handlers
  • 三、重试等待处理程序Back Off Handlers
  • 四、默认错误处理程序DefaultErrorHandler
  • 五、使用batch错误处理程序的转换错误Conversion Errors with Batch Error Handlers
  • 六、重试完整批Retrying Complete Batches
  • 七、容器停止错误处理程序Container Stopping Error Handlers
  • 八、委托错误处理程序Delegating Error Handler
  • 九、日志错误处理程序Logging Error Handler
  • 十、为record和batch监听器使用不同的常见错误处理程序Using Different Common Error Handlers for Record and Batch Listeners
  • 十一、常见错误处理程序摘要Common Error Handler Summary
  • 十二、遗留错误处理程序及其替代Legacy Error Handlers and Their Replacements
    • 12.1 将自定义遗留错误处理程序实现迁移到CommonErrorHandler Migrating Custom Legacy Error Handler Implementations to CommonErrorHandler
    • 十三、rollback之后处理器After-rollback Processor
    • 十四、投递尝试头Delivery Attempts Header
    • 十五、监听信息头Listener Info Header
    • 十六、发布死信记录Publishing Dead-letter Records
    • 十七、管理死信记录头Managing Dead Letter Record Headers
    • 十八、ExponentialBackOffWithMaxRetries Implementation

      本文描述如何处理在使用Spring for Apache Kafka时可能出现的各种异常。

      Spring整合Kafka(十七)----处理异常
      (图片来源网络,侵删)

      一、监听器错误处理程序Listener Error Handlers

      @KafkaListener注解有一个属性:errorHandler。

      你可以使用errorHandler来提供KafkaListenerErrorHandler实现的bean名称。这个函数式接口有一个方法,如下所示:

      @FunctionalInterface
      public interface KafkaListenerErrorHandler {
          Object handleError(Message message, ListenerExecutionFailedException exception) throws Exception;
      }
      

      你可以访问由消息转换器生成的spring-messaging Message对象和监听器引发的异常,该异常被封装在ListenerExecutionFailedException中。错误处理程序可以抛出原始异常或新异常,这些异常被抛出到容器中。错误处理程序返回的任何内容将被忽略。 你可以在MessagingMessageConverter和BatchMessagingMessageConverter上设置rawRecordHeader属性,这会将原始ConsumerRecord添加到“KafkaHeaders.RAW\_DATA” header中的转换后的Message。例如,如果你希望在监听器错误处理程序中使用DeadLetterPublishingRecoverer,这将非常有用。它可能用于request/reply场景,在该场景中,你希望在一定次数的重试后,在dead letter主题中捕获失败记录后,将失败结果发送给发件人。

      @Bean
      KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) {
          return (msg, ex) -> {
              if (msg.getHeaders().get(KafkaHeaders.DELIVERY\_ATTEMPT, Integer.class) > 9) {
                  recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW\_DATA, ConsumerRecord.class), ex);
                  return "FAILED";
              }
              throw ex;
          };
      }
      

      KafkaListenerErrorHandler有一个子接口(ConsumerAwareListenerErrorHandler),可以通过以下方法访问消费者对象:

      Object handleError(Message message, ListenerExecutionFailedException exception, Consumer consumer);
      

      另一个子接口(ManualAckListenerErrorHandler)在使用手动AckMode时提供对Acknowledgment对象的访问。

      Object handleError(Message message, ListenerExecutionFailedException exception,
      			Consumer consumer, @Nullable Acknowledgment ack);
      

      在任何一种情况下,都不应该对consumer执行任何seek,因为容器不知道它们。

      二、容器错误处理程序Container Error Handlers

      从2.8版本开始,遗留的ErrorHandler和BatchErrorHandler接口已被新的CommonErrorHandler所取代。这些错误处理程序可以处理record 和batch监听器的错误,允许单个监听器容器工厂为这两种类型的监听器创建容器。现在,可以使用CommonErrorHandler的实现来取代大多数遗留框架错误处理程序。遗留接口仍然由监听器容器和监听器容器工厂支持;它们将在将来的版本中被弃用。

      有关将自定义错误处理程序迁移到CommonErrorHandler的信息,请参阅将自定义遗留错误处理程序实现迁移到CommonErrorHandler。

      在使用事务时,默认情况下不配置错误处理程序,因此异常将回滚事务。事务容器的错误处理由AfterRollbackProcessor处理。如果你在使用事务时提供了自定义错误处理程序,那么它必须抛出异常才能回滚事务。

      CommonErrorHandler接口有一个默认的方法isAckAfterHandle(),容器调用它来确定如果错误处理程序返回而没有抛出异常,是否应该提交偏移量;默认情况下返回true。

      通常,当错误未被“处理”时(例如,在执行seek操作之后),框架提供的错误处理程序将抛出异常。默认情况下,容器会以ERROR级别记录此类异常。所有的框架错误处理程序都继承了KafkaExceptionLogLevelAware,它允许你控制这些异常的日志级别。

      /\*\*
       \* Set the level at which the exception thrown by this handler is logged.
       \* @param logLevel the level (default ERROR).
       \*/
      public void setLogLevel(KafkaException.Level logLevel) {
          ...
      }
      

      你可以为容器工厂中的所有监听器指定一个全局错误处理程序。下面的例子展示了如何这样做:

      @Bean
      public KafkaListenerContainerFactory
              kafkaListenerContainerFactory() {
          ConcurrentKafkaListenerContainerFactory factory =
                  new ConcurrentKafkaListenerContainerFactory();
          ...
          factory.setCommonErrorHandler(myErrorHandler);
          ...
          return factory;
      }
      

      默认情况下,如果带注解的监听器方法抛出异常,它会被抛出到容器,消息会根据容器配置被处理。

      容器在调用错误处理程序之前将提交任何pending的偏移量。

      如果你使用的是Spring Boot,只需将错误处理程序添加为@Bean,Boot就会将其添加到自动配置的工厂中。

      三、重试等待处理程序Back Off Handlers

      错误处理程序(如DefaultErrorHandler)使用BackOff来确定重试传递之前等待的时间。你可以配置自定义的BackOffHandler。默认处理程序只是挂起线程,直到回退时间过去(或者容器停止)。框架还提供ContainerPausingBackOffHandler,它暂停监听器容器,直到回退时间过去,然后恢复容器。当延迟长于consumer属性max.poll.interval.ms时,这很有用。注意,实际回退时间将受到pollTimeout容器属性的影响。

      四、默认错误处理程序DefaultErrorHandler

      这个DefaultErrorHandler取代了SeekToCurrentErrorHandler和RecoveringBatchErrorHandler。注意,batch监听器的回退行为(当抛出BatchListenerFailedException以外的异常时)等效于重试完整批。

      DefaultErrorHandler可以被配置为提供与seek未处理的记录偏移量相同的语义,但不需要实际seek。相反,记录由监听器容器保留,并在错误处理程序退出后(以及在执行单个暂停的poll()后)重新提交给监听器,以保持consumer alive;如果正在使用非阻塞重试(Non-Blocking Retries)或ContainerPausingBackOffHandler,则暂停可以扩展到多个polls)。错误处理程序向容器返回一个结果,该结果指示当前失败的记录是否可以重新提交,或者,它是否已恢复,然后它将不会被再次发送到监听器。要启用此模式,请将seekAfterError属性设置为false。

      错误处理程序可以恢复(跳过)一直失败的记录。默认情况下,在十次失败后,会在ERROR级别记录失败的record。你可以使用自定义恢复器(BiConsumer)和BackOff来配置处理程序,BackOff控制delivery尝试和每次尝试之间的延迟。将FixedBackOff与FixedBackOff.UNLIMITED_ATTEMPTS一起使用会导致无限次重试。以下示例配置三次尝试后的恢复:

      DefaultErrorHandler errorHandler =
          new DefaultErrorHandler((record, exception) -> {
              // recover after 3 failures, with no back off - e.g. send to a dead-letter topic
          }, new FixedBackOff(0L, 2L));
      

      要使用此处理程序的自定义实例配置监听器容器,需要将其添加到容器工厂。例如,使用@KafkaListener容器工厂,你可以像下面这样添加DefaultErrorHandler:

      @Bean
      public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
          ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
          factory.setConsumerFactory(consumerFactory());
          factory.getContainerProperties().setAckMode(AckMode.RECORD);
          factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 2L)));
          return factory;
      }
      

      对于record监听器,它将重试最多2次delivery(共3次delivery),以1秒的时间间隔,而不是默认配置(FixedBackOff(0L, 9))。在重试次数用完后会记录失败。

      作为一个例子;如果poll返回六条记录(在分区0、1、2中各有两条),并且监听器在第四条记录上抛出异常,则容器通过提交前三条的偏移量来acknowledge消息。DefaultErrorHandler为分区1 seek偏移量1,为分区2 seek偏移量0。下一个poll()返回三个未处理的记录。

      如果AckMode是BATCH,则容器在调用错误处理程序之前提交前两个分区的偏移量。

      对于batch监听器,它必须抛出BatchListenerFailedException,提示batch中的哪些记录失败。

      事件顺序为:

      • 在索引之前提交记录的偏移量。
      • 如果重试次数未用完,执行seek,以便重新deliver所有剩余记录(包括失败的记录)。
      • 如果重试次数已用完,尝试恢复失败的记录(默认仅记录日志)并执行seek,以便重新传递剩余的记录(不包括失败的记录)。恢复记录的偏移量会被提交。
      • 如果重试次数已用完且恢复失败,则将执行seek,如同重试次数未用完一样。

        DefaultErrorHandler可以被配置为提供与seek未处理的记录偏移量相同的语义,但不需要实际seek。相反,错误处理程序会创建一个新的ConsumerRecords只包含未处理的记录,然后这些记录将被提交给监听器(在执行单个暂停的poll()之后,以保持consumer alive)。要启用此模式,请将seekAfterError属性设置为false。

        在重试次数用完后,默认的恢复器(recoverer)会记录失败的record。你可以使用自定义恢复器,也可以使用框架提供的恢复器,如DeadLetterPublishingRecoverer。

        当使用POJO batch监听器(例如List)时,并且你没有完整的consumer记录要添加到异常中,你可以只添加失败记录的索引:

        @KafkaListener(id = "recovering", topics = "someTopic")
        public void listen(List things) {
            for (int i = 0; i  
        

        当容器配置为“AckMode.MMANUAL_IMMEDIATE”时,可以配置错误处理程序来提交已恢复记录的偏移量;将commitRecovered属性设置为true。

        另请参见发布死信记录。

        使用事务时,DefaultAfterRollbackProcessor也提供类似的功能。请参阅回滚后处理器。

        DefaultErrorHandler认为某些异常是fatal的,并跳过对此类异常的重试;在第一次失败时调用恢复器。默认情况下,被视为fatal的异常有:

        • DeserializationException
        • MessageConversionException
        • ConversionException
        • MethodArgumentResolutionException
        • NoSuchMethodException
        • ClassCastException

          因为这些异常不太可能在重试delivery时得到解决。

          你可以将更多异常类型添加到不可重试类别,或者完全替换已分类异常的映射。有关更多信息,请参阅DefaultErrorHandler.addNotRetryableException()和DefaultErrorHandler.setClassifications()的Javadocs,以及spring-retry BinaryExceptionClassifier的那些文档。

          以下是一个将IllegalArgumentException添加到不可重试异常中的示例:

          @Bean
          public DefaultErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) {
              DefaultErrorHandler handler = new DefaultErrorHandler(recoverer);
              handler.addNotRetryableExceptions(IllegalArgumentException.class);
              return handler;
          }
          

          错误处理程序可以配置一个或多个RetryListener,接收重试和恢复进度的通知。

          @FunctionalInterface
          public interface RetryListener {
              void failedDelivery(ConsumerRecord record, Exception ex, int deliveryAttempt);
              default void recovered(ConsumerRecord record, Exception ex) {
              }
              default void recoveryFailed(ConsumerRecord record, Exception original, Exception failure) {
              }
          	default void failedDelivery(ConsumerRecords records, Exception ex, int deliveryAttempt) {
          	}
          	default void recovered(ConsumerRecords records, Exception ex) {
          	}
          	default void recoveryFailed(ConsumerRecords records, Exception original, Exception failure) {
          	}
          }
          

          如果恢复器失败(引发异常),则失败的记录将包含在seek中。如果恢复器失败,默认情况下将重置BackOff,并且在再次尝试恢复之前,重新deliver将再次执行。若要在恢复失败后跳过重试,请将错误处理程序的resetStateOnRecoveryFailure设置为false。

          你可以为错误处理程序提供BiFunction, Exception, BackOff>,根据失败的记录或异常来确定要使用的BackOff:

          handler.setBackOffFunction((record, ex) -> { ... }
          

          如果函数返回null,则将使用processor的默认BackOff。

          将resetStateOnExceptionChange设置为true,如果异常类型在各失败之间发生变化,则重试序列将重新启动(包括选择新的BackOff,如果配置了此选项)。默认情况下,不考虑异常类型。

          从2.3.1版本开始,类似于DefaultErrorHandler,DefaultAfterRollbackProcessor认为某些异常是fatal的,并跳过对此类异常的重试;在第一次失败时调用恢复器。默认情况下,被视为fatal的异常有:

          • DeserializationException
          • MessageConversionException
          • ConversionException
          • MethodArgumentResolutionException
          • NoSuchMethodException
          • ClassCastException

            因为这些异常不太可能在重试delivery时得到解决。

            你可以将更多异常类型添加到不可重试类别,或者完全替换已分类异常的映射。有关更多信息,请参阅DefaultAfterRollbackProcessor.setClassifications()的Javadocs,以及spring-retry BinaryExceptionClassifier的Javadocs。

            以下是一个将IllegalArgumentException添加到不可重试异常中的示例:

            @Bean
            public DefaultAfterRollbackProcessor errorHandler(BiConsumer record, Exception ex, int deliveryAttempt);
                default void recovered(ConsumerRecord record, Exception ex) {
                }
                default void recoveryFailed(ConsumerRecord record, Exception original, Exception failure) {
                }
            }
            

            十四、投递尝试头Delivery Attempts Header

            以下内容仅适用于record监听器,不适用于batch监听器。

            当使用实现DeliveryAttemptAware的ErrorHandler或AfterRollbackProcessor时,可以启用向记录添加“KafkaHeaders.DELIVERY_ATTEMPT” 的header(kafka_deliveryAttempt)。此header的值是一个从1开始的递增整数。收到原始ConsumerRecord时整数在byte[4]中。

            int delivery = ByteBuffer.wrap(record.headers()
                .lastHeader(KafkaHeaders.DELIVERY\_ATTEMPT).value())
                .getInt()
            

            当将@KafkaListener与DefaultKafkaHeaderMapper或SimpleKafkaHeaderMapper一起使用时,可以通过在listener方法中添加@Header(KafkaHeads.DELIVERY_ATTEMPT)int DELIVERY作为参数来获得。

            若要启用此header的填充,请将容器属性deliveryAttemptHeader设置为true。默认情况下,它是禁用的,以避免查找每条记录的状态和添加header的开销。

            DefaultErrorHandler和DefaultAfterRollbackProcessor支持此功能。

            十五、监听信息头Listener Info Header

            在某些情况下,能够知道监听器在哪个容器中运行是很有用的。

            你现在可以在监听器容器上设置listenerInfo属性,或者在@KafkaListener注解上设置info属性。然后,容器会将其添加到所有传入消息的“KafkaListener.LISTENER_INFO”头中;然后,它可以用于record拦截器、过滤器等,也可以用于监听器本身。

            @KafkaListener(id = "something", topic = "topic", filter = "someFilter",
                    info = "this is the something listener")
            public void listen2(@Payload Thing thing,
                    @Header(KafkaHeaders.LISTENER\_INFO) String listenerInfo) {
            ...
            **自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。**
            **深知大多数大数据工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!**
            **因此收集整理了一份《2024年大数据全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友。**
            ![img](https://img-blog.csdnimg.cn/img_convert/024b9bfddbccb4ea8714bd3e801e2bfd.png)
            ![img](https://img-blog.csdnimg.cn/img_convert/fe929de84131a1bf739b1f5585f65d56.png)
            ![img](https://img-blog.csdnimg.cn/img_convert/918deaa9e7e5afc6930d0073c324f521.png)
            ![img](https://img-blog.csdnimg.cn/img_convert/dac5ce7743bd5992f9875a7255f3e359.png)
            ![img](https://img-blog.csdnimg.cn/img_convert/3fc691d47d691903566665426b4e6db0.png)
            **既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上大数据开发知识点,真正体系化!**
            **由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新**
            **如果你觉得这些内容对你有帮助,可以添加VX:vip204888 (备注大数据获取)**
            ![img](https://img-blog.csdnimg.cn/img_convert/04c86f38bbf2368444cf5860ca5256d2.png)
            4年大数据全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友。**
            [外链图片转存中...(img-I4Kz2RX2-1712862863171)]
            [外链图片转存中...(img-6rucK2Di-1712862863172)]
            [外链图片转存中...(img-fnGE41Jk-1712862863172)]
            [外链图片转存中...(img-u5vmM90n-1712862863173)]
            [外链图片转存中...(img-v1Q0HsGE-1712862863173)]
            **既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上大数据开发知识点,真正体系化!**
            **由于文件比较大,这里只是将部分目录大纲截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且后续会持续更新**
            **如果你觉得这些内容对你有帮助,可以添加VX:vip204888 (备注大数据获取)**
            [外链图片转存中...(img-gdPLfNqQ-1712862863173)]
            
VPS购买请点击我

文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

目录[+]