【SpringBoot系列】SpringBoot整合Kafka(含源码)

03-07 1221阅读

文章目录

      • 前言
      • 什么是Kafka?
      • Kafka的应用场景?
        • 日志收集:
        • 消息系统:
        • 用户活动跟踪:
        • 指标和日志聚合:
        • 事件源:
        • 示例
          • 版本依赖
          • 代码
            • KafkaConfig
            • KafkaSender
            • KafkaReceiver
            • KafkaController
            • 测试
            • 遇见问题
              • Error connecting to node xxxxxx:9092 (id: 0 rack: null)
              • 解决方案
              • 总结
              • 源码获取
              • 写在最后

                【SpringBoot系列】SpringBoot整合Kafka(含源码)

                前言

                在现代的微服务架构中,消息队列已经成为了一个不可或缺的组件。

                它能够帮助我们在不同的服务之间传递消息,并且能够确保这些消息不会丢失。

                在众多的消息队列中,Kafka 是一个非常出色的选择。

                它能够处理大量的实时数据,并且提供了强大的持久化能力。

                在本文中,我们将会探讨如何在 SpringBoot 中整合 Kafka。


                什么是Kafka?

                Apache Kafka 是一个开源的流处理平台,由 LinkedIn 团队开发并于 2011 年贡献给 Apache 基金会。Kafka 以其高吞吐量、可扩展性和容错性而闻名。它是一个基于发布/订阅模式的消息系统,通常用于大型实时数据流处理应用。

                Kafka 的主要组件包括:

                • Producer:负责发布消息到 Kafka 服务器。
                • Broker:是 Kafka 服务器实例,负责消息的存储、接收和发送。
                • Consumer:从 Kafka 服务器读取消息。
                • Topic:消息的类别或者说是消息的标签,Producer 将消息发布到特定的 Topic,Consumer 从特定的 Topic 读取消息。

                  Kafka 可以在分布式系统中用于构建实时流数据管道,它可以在系统或应用之间可靠地获取数据。此外,Kafka 可以和 Apache Storm、Apache Hadoop、Apache Spark 等进行集成,用于大数据处理和分析。


                  Kafka的应用场景?

                  日志收集:

                  一个公司可能有很多服务器,每个服务器上运行着很多服务,Kafka 可以用来实现这些服务的日志收集功能。各服务的日志分别发送到 Kafka 的不同 Topic 中。

                  消息系统:

                  Kafka 能够作为一个大规模的消息处理系统,各生产者将消息发送到 Kafka,消费者从 Kafka 中读取消息进行处理。

                  用户活动跟踪:

                  Kafka 也常用于用户活动跟踪和实时分析。例如,用户的点击、搜索等行为可以实时写入到 Kafka,然后进行实时或者离线分析。

                  在 Kafka 上可以进行实时的流处理。例如,使用 Apache Storm 集成 Kafka 来进行实时的数据处理。

                  指标和日志聚合:

                  统计数据和监控数据也是 Kafka 的一个重要应用场景。例如,通过 Kafka 可以收集各种分布式应用的数据,然后进行统一的处理和分析。

                  事件源:

                  Kafka 可以作为大规模事件处理的源头,例如,用户的行为、系统的状态等都可以作为事件,通过 Kafka 进行分发处理。


                  示例

                  版本依赖
                  模块版本
                  SpringBoot3.1.0
                  JDK17
                  代码
                  KafkaConfig
                  @Configuration
                  @EnableKafka
                  public class KafkaConfig {
                      @Bean
                      public KafkaReceiver listener() {
                          return new KafkaReceiver();
                      }
                  }
                  
                  KafkaSender
                  @Component
                  @Slf4j
                  public class KafkaSender {
                      @Resource
                      private KafkaTemplate kafkaTemplate;
                      public void send(String topic, String key, String data) {
                          //发送消息
                          CompletableFuture completable = kafkaTemplate.send(topic, key, data);
                          completable.whenCompleteAsync((result, ex) -> {
                              if (null == ex) {
                                  log.info(topic + "生产者发送消息成功:" + result.toString());
                              } else {
                                  log.info(topic + "生产者发送消息失败:" + ex.getMessage());
                              }
                          });
                      }
                  }
                  
                  KafkaReceiver
                  @Component
                  @Slf4j
                  public class KafkaReceiver {
                      /**
                       * 下面的主题是一个数组,可以同时订阅多主题,只需按数组格式即可,也就是用","隔开
                       */
                      @KafkaListener(topics = {"testTopic"})
                      public void receive(ConsumerRecord record){
                          log.info("消费者收到的消息key: " + record.key());
                          log.info("消费者收到的消息value: " + record.value().toString());
                      }
                  }
                  
                  KafkaController
                  /**
                   * kafka 测试接口
                   */
                  @RestController
                  public class KafkaController {
                      @Autowired
                      private KafkaSender kafkaSender;
                      @GetMapping("/sendMessageToKafka")
                      public String sendMessageToKafka() {
                          Map messageMap = new HashMap();
                          messageMap.put("message", "hello world!");
                          ObjectMapper objectMapper = new ObjectMapper();
                          String data = null;
                          try {
                              data = objectMapper.writeValueAsString(messageMap);
                          } catch (JsonProcessingException e) {
                              throw new RuntimeException(e);
                          }
                          String key = String.valueOf(UUID.randomUUID());
                          //kakfa的推送消息方法有多种,可以采取带有任务key的,也可以采取不带有的(不带时默认为null)
                          kafkaSender.send("testTopic", key, data);
                          return "ok";
                      }
                  }
                  
                  测试

                  http://127.0.0.1:8080/sendMessageToKafka

                  【SpringBoot系列】SpringBoot整合Kafka(含源码)


                  【SpringBoot系列】SpringBoot整合Kafka(含源码)


                  遇见问题

                  Error connecting to node xxxxxx:9092 (id: 0 rack: null)

                  Error connecting to node iZbp127a9vpra4v3kmkkmzZ:9092 (id: 0 rack: null)

                  解决方案

                  修改本地物理机hosts文件。文件目录:C:\Windows\System32\drivers\etc

                  【SpringBoot系列】SpringBoot整合Kafka(含源码)

                  新增 xx.xx.xx.xx iZbp127a9vpra4v3kmkkmzZ

                  如果没生效,则需要重启系统


                  总结

                  通过上述的步骤,我们已经成功地在 SpringBoot 中整合了 Kafka。

                  这使得我们的应用程序能够在不同的服务之间传递消息,而不需要担心消息的丢失。

                  我们也看到,通过使用 SpringBoot,我们可以非常轻松地完成这个过程。

                  希望这篇文章能够帮助你在自己的项目中更好地使用 Kafka。


                  源码获取

                  如果需要完整源码请关注公众号"架构殿堂" ,回复 "SpringBoot+Kafka"即可获得


                  写在最后

                  感谢您的支持和鼓励! 😊🙏

                  如果大家对相关文章感兴趣,可以关注公众号"架构殿堂",会持续更新AIGC,java基础面试题, netty, spring boot, spring cloud等系列文章,一系列干货随时送达!

                  【SpringBoot系列】SpringBoot整合Kafka(含源码)

VPS购买请点击我

文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

目录[+]