springboot 接双 Kafka

2024-03-13 1309阅读

温馨提示:这篇文章已超过378天没有更新,请注意相关的内容是否还可用!

配置文件

springboot 接双 Kafka
(图片来源网络,侵删)
spring:
  kafka:
    # 第一个kafka的配置
    first:
      bootstrap-servers: *********
      #生产者配置
      producer:
        # Kafka提供的序列化和反序列化类
        key-serializer: org.apache.kafka.common.serialization.StringSerializer #序列化
        value-serializer: org.apache.kafka.common.serialization.StringSerializer
        retries: 1 # 消息发送重试次数
        #acks = 0:设置成 表示 producer 完全不理睬 leader broker 端的处理结果。此时producer 发送消息后立即开启下 条消息的发送,根本不等待 leader broker 端返回结果
        #acks= all 或者-1 :表示当发送消息时, leader broker 不仅会将消息写入本地日志,同时还会等待所有其他副本都成功写入它们各自的本地日志后,才发送响应结果给,消息安全但是吞吐量会比较低。
        #acks = 1:默认的参数值。 producer 发送消息后 leader broker 仅将该消息写入本地日志,然后便发送响应结果给producer ,而无须等待其他副本写入该消息。折中方案,只要leader一直活着消息就不会丢失,同时也保证了吞吐量
        acks: 1 #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
        batch-size: 16384 #批量大小
        properties:
          linger:
            ms: 0 #提交延迟
        buffer-memory: 33554432 # 生产端缓冲区大小
      # 消费者配置
      consumer:
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        # 默认分组id
        #group-id: DEMO_${random.uuid}
        #是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
        enable-auto-commit: false
        #自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
        auto-commit-interval: 1s
        #当kafka中没有初始offset或offset超出范围时将自动重置offset
        # earliest:重置为分区中最小的offset;
        # latest:重置为分区中最新的offset(消费分区中新产生的数据);
        # none:只要有一个分区不存在已提交的offset,就抛出异常;
        auto-offset-reset: earliest
        #批量消费每次最多消费多少条消息
        #每次拉取一条,一条条消费,当然是具体业务状况设置
        max-poll-records: 100
      listener:
        # 在侦听器容器中运行的线程数。
        concurrency: 5
        #listner负责ack,每调用一次,就立即commit
        ack-mode: manual
        missing-topics-fatal: false
    # 第二个kafka的配置
    second:
      bootstrap-servers: *********
      producer:
        # Kafka提供的序列化和反序列化类
        key-serializer: org.apache.kafka.common.serialization.StringSerializer #序列化
        value-serializer: org.apache.kafka.common.serialization.StringSerializer
        retries: 1 # 消息发送重试次数
        #acks = 0:设置成 表示 producer 完全不理睬 leader broker 端的处理结果。此时producer 发送消息后立即开启下 条消息的发送,根本不等待 leader broker 端返回结果
        #acks= all 或者-1 :表示当发送消息时, leader broker 不仅会将消息写入本地日志,同时还会等待所有其他副本都成功写入它们各自的本地日志后,才发送响应结果给,消息安全但是吞吐量会比较低。
        #acks = 1:默认的参数值。 producer 发送消息后 leader broker 仅将该消息写入本地日志,然后便发送响应结果给producer ,而无须等待其他副本写入该消息。折中方案,只要leader一直活着消息就不会丢失,同时也保证了吞吐量
        acks: 1 #应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
        batch-size: 16384 #批量大小
        properties:
          linger:
            ms: 0 #提交延迟
        buffer-memory: 33554432 # 生产端缓冲区大小
      # 消费者配置
      consumer:
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        # 默认分组id
        #group-id: DEMO_${random.uuid}
        #是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
        enable-auto-commit: false
        #自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
        auto-commit-interval: 1s
        #当kafka中没有初始offset或offset超出范围时将自动重置offset
        # earliest:重置为分区中最小的offset;
        # latest:重置为分区中最新的offset(消费分区中新产生的数据);
        # none:只要有一个分区不存在已提交的offset,就抛出异常;
        auto-offset-reset: earliest
        #批量消费每次最多消费多少条消息
        #每次拉取一条,一条条消费,当然是具体业务状况设置
        max-poll-records: 1
      listener:
        # 在侦听器容器中运行的线程数。
        concurrency: 5
        #listner负责ack,每调用一次,就立即commit
        ack-mode: manual
        missing-topics-fatal: false

firstKafka

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
@Configuration
public class FirstKafkaConfig {
	/**
	 * @return 第一个kafka配置
	 */
	@Primary
	@ConfigurationProperties(prefix = "spring.kafka.first")
	@Bean
	public KafkaProperties firstKafkaProperties() {
		return new KafkaProperties();
	}
	/**
	 * @param firstKafkaProperties 第一个kafka配置
	 * @return 第一个kafka的生产者发送template
	 */
	@Primary
	@Bean(name = "firstKafkaTemplate")
	public KafkaTemplate firstKafkaTemplate(
			@Autowired @Qualifier("firstKafkaProperties") KafkaProperties firstKafkaProperties) {
		return new KafkaTemplate(firstProducerFactory(firstKafkaProperties));
	}
	/**
	 * 构建第一个kafka的消费者监听容器工厂
	 * @param firstKafkaProperties 第一个kafka配置
	 * @return 第一个kafka的消费者监听容器工厂
	 */
	@Bean("firstKafkaListenerContainerFactory")
	public KafkaListenerContainerFactory firstKafkaListenerContainerFactory(
			@Autowired @Qualifier("firstKafkaProperties") KafkaProperties firstKafkaProperties) {
		ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
		factory.setConsumerFactory(firstConsumerFactory(firstKafkaProperties));
		return factory;
	}
	/**
	 * 新建第一个kafka的消费者工厂
	 * @param firstKafkaProperties 第一个kafka配置
	 * @return 第一个kafka的消费者工厂
	 */
	private ConsumerFactory
VPS购买请点击我

免责声明:我们致力于保护作者版权,注重分享,被刊用文章因无法核实真实出处,未能及时与作者取得联系,或有版权异议的,请联系管理员,我们会立即处理! 部分文章是来自自研大数据AI进行生成,内容摘自(百度百科,百度知道,头条百科,中国民法典,刑法,牛津词典,新华词典,汉语词典,国家院校,科普平台)等数据,内容仅供学习参考,不准确地方联系删除处理! 图片声明:本站部分配图来自人工智能系统AI生成,觅知网授权图片,PxHere摄影无版权图库和百度,360,搜狗等多加搜索引擎自动关键词搜索配图,如有侵权的图片,请第一时间联系我们,邮箱:ciyunidc@ciyunshuju.com。本站只作为美观性配图使用,无任何非法侵犯第三方意图,一切解释权归图片著作权方,本站不承担任何责任。如有恶意碰瓷者,必当奉陪到底严惩不贷!

目录[+]