Kafka接收消息

07-16 1110阅读

文章目录

  • Acknowledgment
  • 读消息指定分区
  • 批量消费
  • 消息拦截
    // 采用监听得方式接收 @Payload标记消息体内容.
    @KafkaListener(topics = {"test"},groupId = "hello")
    public void onEvent(@Payload String event,
                       @Header(value = KafkaHeaders.RECEIVED_TOPIC) String topic,
                       @Header(value = KafkaHeaders.RECEIVED_PARTITION_ID) String partition){
       System.out.println("读取到了时间消息: " + event);
    }
    

    Acknowledgment

    开启手动确认模式;

    listener:
    	ack-mode: manual
    
    // 采用监听得方式接收 @Payload标记消息体内容.
    @KafkaListener(topics = {"test"},groupId = "hello")
    public void onEvent(@Payload String event,
                       @Header(value = KafkaHeaders.RECEIVED_TOPIC) String topic,
                       @Header(value = KafkaHeaders.RECEIVED_PARTITION_ID) String partition,
                       ConsumerRecord record,
                       Acknowledgment ack){
       ack.acknowledge(); // 手动确认,告诉kafka服务器该消息我已经收到了. 
       System.out.println("读取到了时间消息: " + event);
    }
    

    读消息指定分区

    @KafkaListener(groupId = "hello",
               topicPartitions = {
                   @TopicPartition(
                           topic = "${kafka.topic.test}",
                           partitions = {"0","1","2"}, // 0 1 2分区不限制偏移量
                           partitionOffsets = { // 3 分区只读 3偏移量之后的; 4分区只读 4偏移量之后的
                                   @PartitionOffset(partition = "3",initialOffset = "3"),
                                   @PartitionOffset(partition = "4",initialOffset = "3")
                           })
               }
    )
    

    批量消费

    修改配置

    kafka:
        bootstrap-servers: 192.168.225.128:9092
        listener:
          type: batch
        # 每次读取20条
        consumer:
          max-poll-records: 20
    

    消费者端接收一个List即可

    @KafkaListener(topics = {"hi"},groupId = "batchGroup2")
    public void onEvent3(List records){
        System.out.println(records.size());
    }
    

    消息拦截

    Kafka接收消息

VPS购买请点击我

文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

目录[+]