【Kafka】SpringBoot整合Kafka详细介绍及代码示例

06-20 1581阅读

Kafka介绍

Apache Kafka是一个分布式流处理平台。它最初由LinkedIn开发,后来成为Apache软件基金会的一部分,并在开源社区中得到了广泛应用。Kafka的核心概念包括Producer、Consumer、Broker、Topic、Partition和Offset。

【Kafka】SpringBoot整合Kafka详细介绍及代码示例
(图片来源网络,侵删)
  • Producer:生产者,负责将数据发送到Kafka集群。
  • Consumer:消费者,从Kafka集群中读取数据。
  • Broker:Kafka服务器实例,Kafka集群通常由多个Broker组成。
  • Topic:主题,数据按主题进行分类。
  • Partition:分区,每个主题可以有多个分区,用于实现并行处理和提高吞吐量。
  • Offset:偏移量,每个消息在其分区中的唯一标识。

    使用场景

    Kafka适用于以下场景:

    1. 日志收集:集中收集系统日志和应用日志,通过Kafka传输到大数据处理系统。
    2. 消息队列:作为高吞吐量、低延迟的消息队列系统。
    3. 数据流处理:实时处理数据流,用于实时分析、监控和处理。
    4. 事件源架构:将所有的变更事件存储在Kafka中,实现事件溯源和回放。
    5. 流数据管道:构建数据管道,连接数据源和数据存储系统。

    Spring Boot整合Kafka 

    项目结构

    springboot-kafka
    │
    ├── src
    │   ├── main
    │   │   ├── java
    │   │   │   └── com.example.kafka
    │   │   │       ├── KafkaApplication.java
    │   │   │       ├── config
    │   │   │       │   └── KafkaConfig.java
    │   │   │       ├── producer
    │   │   │       │   └── KafkaProducer.java
    │   │   │       ├── consumer
    │   │   │       │   └── KafkaConsumer.java
    │   │   │       └── controller
    │   │   │           └── KafkaController.java
    │   │   └── resources
    │   │       ├── application.yml
    │   │       └── logback-spring.xml (可选)
    │   └── test
    │       └── java
    │           └── com.example.kafka
    │               └── KafkaApplicationTests.java
    └── pom.xml
    

    1. 创建Spring Boot项目并添加依赖

    pom.xml
        
            org.springframework.boot
            spring-boot-starter
        
        
            org.springframework.boot
            spring-boot-starter-web
        
        
            org.springframework.boot
            spring-boot-starter-kafka
        
        
            org.springframework.boot
            spring-boot-starter-test
            test
        
    
    

    2. 配置Kafka

    application.yml
    spring:
      kafka:
        bootstrap-servers: localhost:9092
        consumer:
          group-id: my-group
          auto-offset-reset: earliest
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        producer:
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
    

    3. 创建Kafka配置类

    KafkaConfig.java
    package com.example.kafka.config;
    import org.apache.kafka.clients.admin.NewTopic;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    @Configuration
    public class KafkaConfig {
        @Bean
        public NewTopic myTopic() {
            return new NewTopic("my-topic", 1, (short) 1);
        }
    }
    

    4. 创建Kafka生产者

    KafkaProducer.java
    package com.example.kafka.producer;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.stereotype.Service;
    @Service
    public class KafkaProducer {
        private final KafkaTemplate kafkaTemplate;
        public KafkaProducer(KafkaTemplate kafkaTemplate) {
            this.kafkaTemplate = kafkaTemplate;
        }
        public void sendMessage(String topic, String message) {
            kafkaTemplate.send(topic, message);
        }
    }
    

    5. 创建Kafka消费者

    KafkaConsumer.java
    package com.example.kafka.consumer;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Service;
    @Service
    public class KafkaConsumer {
        @KafkaListener(topics = "my-topic", groupId = "my-group")
        public void listen(String message) {
            System.out.println("Received message: " + message);
        }
    }
    

    6. 创建控制器发送消息

    KafkaController.java
    package com.example.kafka.controller;
    import com.example.kafka.producer.KafkaProducer;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;
    @RestController
    public class KafkaController {
        private final KafkaProducer kafkaProducer;
        public KafkaController(KafkaProducer kafkaProducer) {
            this.kafkaProducer = kafkaProducer;
        }
        @GetMapping("/send")
        public String sendMessage(@RequestParam String message) {
            kafkaProducer.sendMessage("my-topic", message);
            return "Message sent";
        }
    }
    

    7. 创建Spring Boot主类

    KafkaApplication.java
    package com.example.kafka;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    @SpringBootApplication
    public class KafkaApplication {
        public static void main(String[] args) {
            SpringApplication.run(KafkaApplication.class, args);
        }
    }
    

    8. 测试应用

    通过访问以下URL来发送消息:

    http://localhost:8080/send?message=HelloKafka

    9. 日志配置(可选)

    为了更好地查看Kafka的日志,可以添加logback-spring.xml配置:

    logback-spring.xml
        
            
                
                    %d{yyyy-MM-dd HH:mm:ss} - %msg%n
                
            
            
            
                
            
        
    
    

    10. 测试类(可选)

    KafkaApplicationTests.java
    package com.example.kafka;
    import org.junit.jupiter.api.Test;
    import org.springframework.boot.test.context.SpringBootTest;
    @SpringBootTest
    class KafkaApplicationTests {
        @Test
        void contextLoads() {
        }
    }
    

    至此,你已经完成了Spring Boot整合Kafka的详细配置和代码示例。你可以根据实际需求进一步扩展和修改这个基础代码。

VPS购买请点击我

文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

目录[+]