【Kafka】SpringBoot整合Kafka详细介绍及代码示例
Kafka介绍
Apache Kafka是一个分布式流处理平台。它最初由LinkedIn开发,后来成为Apache软件基金会的一部分,并在开源社区中得到了广泛应用。Kafka的核心概念包括Producer、Consumer、Broker、Topic、Partition和Offset。
(图片来源网络,侵删)
- Producer:生产者,负责将数据发送到Kafka集群。
- Consumer:消费者,从Kafka集群中读取数据。
- Broker:Kafka服务器实例,Kafka集群通常由多个Broker组成。
- Topic:主题,数据按主题进行分类。
- Partition:分区,每个主题可以有多个分区,用于实现并行处理和提高吞吐量。
- Offset:偏移量,每个消息在其分区中的唯一标识。
使用场景
Kafka适用于以下场景:
- 日志收集:集中收集系统日志和应用日志,通过Kafka传输到大数据处理系统。
- 消息队列:作为高吞吐量、低延迟的消息队列系统。
- 数据流处理:实时处理数据流,用于实时分析、监控和处理。
- 事件源架构:将所有的变更事件存储在Kafka中,实现事件溯源和回放。
- 流数据管道:构建数据管道,连接数据源和数据存储系统。
Spring Boot整合Kafka
项目结构
springboot-kafka │ ├── src │ ├── main │ │ ├── java │ │ │ └── com.example.kafka │ │ │ ├── KafkaApplication.java │ │ │ ├── config │ │ │ │ └── KafkaConfig.java │ │ │ ├── producer │ │ │ │ └── KafkaProducer.java │ │ │ ├── consumer │ │ │ │ └── KafkaConsumer.java │ │ │ └── controller │ │ │ └── KafkaController.java │ │ └── resources │ │ ├── application.yml │ │ └── logback-spring.xml (可选) │ └── test │ └── java │ └── com.example.kafka │ └── KafkaApplicationTests.java └── pom.xml
1. 创建Spring Boot项目并添加依赖
pom.xml
org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-kafka org.springframework.boot spring-boot-starter-test test
2. 配置Kafka
application.yml
spring: kafka: bootstrap-servers: localhost:9092 consumer: group-id: my-group auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer
3. 创建Kafka配置类
KafkaConfig.java
package com.example.kafka.config; import org.apache.kafka.clients.admin.NewTopic; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class KafkaConfig { @Bean public NewTopic myTopic() { return new NewTopic("my-topic", 1, (short) 1); } }
4. 创建Kafka生产者
KafkaProducer.java
package com.example.kafka.producer; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; @Service public class KafkaProducer { private final KafkaTemplate kafkaTemplate; public KafkaProducer(KafkaTemplate kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } public void sendMessage(String topic, String message) { kafkaTemplate.send(topic, message); } }
5. 创建Kafka消费者
KafkaConsumer.java
package com.example.kafka.consumer; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Service; @Service public class KafkaConsumer { @KafkaListener(topics = "my-topic", groupId = "my-group") public void listen(String message) { System.out.println("Received message: " + message); } }
6. 创建控制器发送消息
KafkaController.java
package com.example.kafka.controller; import com.example.kafka.producer.KafkaProducer; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController public class KafkaController { private final KafkaProducer kafkaProducer; public KafkaController(KafkaProducer kafkaProducer) { this.kafkaProducer = kafkaProducer; } @GetMapping("/send") public String sendMessage(@RequestParam String message) { kafkaProducer.sendMessage("my-topic", message); return "Message sent"; } }
7. 创建Spring Boot主类
KafkaApplication.java
package com.example.kafka; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class KafkaApplication { public static void main(String[] args) { SpringApplication.run(KafkaApplication.class, args); } }
8. 测试应用
通过访问以下URL来发送消息:
http://localhost:8080/send?message=HelloKafka
9. 日志配置(可选)
为了更好地查看Kafka的日志,可以添加logback-spring.xml配置:
logback-spring.xml
%d{yyyy-MM-dd HH:mm:ss} - %msg%n
10. 测试类(可选)
KafkaApplicationTests.java
package com.example.kafka; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest class KafkaApplicationTests { @Test void contextLoads() { } }
至此,你已经完成了Spring Boot整合Kafka的详细配置和代码示例。你可以根据实际需求进一步扩展和修改这个基础代码。
文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。