一文带你理解Kafka的Header

03-20 1963阅读

Header简介

一文带你理解Kafka的Header

Kafka从 0.11.0.0 版本开始提供了一种在生产者和消费者之间传递元数据的机制,叫做 Kafka header。使用这个机制,你可以在消息中添加一些与数据内容无关的附加信息,如消息的来源、类型、版本、生产时间、过期时间、分区数、用户 ID 等等。

Kafka header 是由一个或多个键值对组成的列表,每个键值对都称为 header。 消息可以包含零个或多个 header。

下面是一些简单的理解 Kafka header 的方式:

  • Kafka header 可以看作是消息的元数据,因为它们不包含实际可用的消息负载。
  • Kafka header 的作用类似于 HTTP 或者 TCP/IP 协议中的 header 头部,在消息中添加一些描述性信息,方便消费者解析和处理消息。
  • Kafka header 的使用并不是强制性的。你完全可以不使用它们,只发送负载数据。
  • Kafka header 不同于消息的 key 和 value,因为它们与数据的生命周期无关。

    Header使用场景

    1. 消息追踪

      通过在Header中添加一个全局唯一的ID,可以跟踪消息在整个应用系统中的传递轨迹。

      ProducerRecord record = new ProducerRecord("mytopic", "myvalue");
      record.headers().add("messageId", UUID.randomUUID().toString().getBytes());
      producer.send(record);
      

      在消费消息的时候,可以获取到消息的KafkaHeader,并从中提取出消息ID进行追踪:

      ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
      for (ConsumerRecord record : records) {
          Headers headers = record.headers();
          String messageId = new String(headers.lastHeader("messageId").value());
          System.out.println("Received message with ID: " + messageId);
      }
      
    2. 消息路由

      Header可以为消息添加一个路由键,在消息传递的过程中,可以根据这个路由键进行消息的路由。

      ProducerRecord record = new ProducerRecord("mytopic", "myvalue");
      record.headers().add("routeKey", "myroutekey".getBytes());
      producer.send(record);
      

      在消费消息的时候,消费者可以根据路由键过滤出需要消费的消息:

      ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
      for (ConsumerRecord record : records) {
          Headers headers = record.headers();
          String routeKey = new String(headers.lastHeader("routeKey").value());
          if (routeKey.equals("myroutekey")) {
              //process the message
          }
      }
      
    3. 传递消息元数据

      Header可以在消息传递过程中携带一些重要的元数据,这些元数据可以用于解释消息的内容或者处理方式。

      ProducerRecord record = new ProducerRecord("mytopic", "myvalue");
      record.headers().add("content-type", "text/plain".getBytes());
      producer.send(record);
      

      在消费消息的时候,可以从KafkaHeader中提取出content-type元数据,来解释消息的内容格式:

      ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
      for (ConsumerRecord record : records) {
          Headers headers = record.headers();
          String contentType = new String(headers.lastHeader("content-type").value());
          if (contentType.equals("text/plain")) {
              //process the message
          }
      }
      

    Java使用Header案例

    1. 生产者

      package com.byd.dev.kfk;
      import org.apache.hadoop.security.UserGroupInformation;
      import org.apache.kafka.clients.producer.KafkaProducer;
      import org.apache.kafka.clients.producer.ProducerConfig;
      import org.apache.kafka.clients.producer.ProducerRecord;
      import org.apache.kafka.common.header.Header;
      import org.apache.kafka.common.header.internals.RecordHeader;
      import org.apache.kafka.common.serialization.StringSerializer;
      import java.io.IOException;
      import java.security.PrivilegedAction;
      import java.util.ArrayList;
      import java.util.List;
      import java.util.Properties;
      import java.util.Scanner;
      public class HeaderProducer {
          public static void main(String[] args) {
              // kerberos认证
              System.setProperty("java.security.krb5.conf", "D:/demo/krb5.conf");
              System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
              System.setProperty("java.security.auth.login.config", "D:/demo/jaas.conf");
              try {
                  UserGroupInformation.loginUserFromKeytab("c.dev.hdfs", "D:/demo/demo.keytab");
                  UserGroupInformation.getCurrentUser().doAs(new PrivilegedAction() {
                      @Override
                      public Object run() {
                          try {
                              runProducer();
                          } catch (Exception e) {
                              throw new RuntimeException(e);
                          }
                          return null;
                      }
                  });
              } catch (IOException e) {
                  throw new RuntimeException(e);
              }
          }
          /**
           * 启动生产者
           */
          public static void runProducer() throws InterruptedException {
              KafkaProducer producer = new KafkaProducer(getProducerProperties());
              sendHeaderMessage(producer);
          }
          /**
           * 生产带有Header的记录
           *
           * @param producer KafkaProducer
           */
          public static void sendHeaderMessage(KafkaProducer producer) throws InterruptedException {
              List headers = new ArrayList();
              headers.add(new RecordHeader("website", "www.xc.com".getBytes()));
              Scanner sc = new Scanner(System.in);
              for (int i = 0; i  
    2. 消费者

      package com.byd.dev.kfk;
      import org.apache.hadoop.security.UserGroupInformation;
      import org.apache.kafka.clients.consumer.ConsumerConfig;
      import org.apache.kafka.clients.consumer.ConsumerRecord;
      import org.apache.kafka.clients.consumer.ConsumerRecords;
      import org.apache.kafka.clients.consumer.KafkaConsumer;
      import org.apache.kafka.common.header.Header;
      import org.apache.kafka.common.header.Headers;
      import org.apache.kafka.common.serialization.StringDeserializer;
      import java.io.IOException;
      import java.security.PrivilegedAction;
      import java.time.Duration;
      import java.util.Arrays;
      import java.util.Properties;
      public class HeaderConsumer {
          public static void main(String[] args) {
              // kerberos认证
              System.setProperty("java.security.krb5.conf", "D:/demo/krb5.conf");
              System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
              System.setProperty("java.security.auth.login.config", "D:/demo/jaas.conf");
              try {
                  UserGroupInformation.loginUserFromKeytab("c.dev.hdfs",
                          "D:/project/lab-project/src/main/resources/kerberos/dev/ic.dev.hdfs.keytab");
                  UserGroupInformation.getCurrentUser().doAs(new PrivilegedAction() {
                      @Override
                      public Object run() {
                          try {
                              runConsumer();
                          } catch (Exception e) {
                              throw new RuntimeException(e);
                          }
                          return null;
                      }
                  });
              } catch (IOException e) {
                  throw new RuntimeException(e);
              }
          }
          /**
           * 启动消费者
           */
          public static void runConsumer(){
              KafkaConsumer consumer = new KafkaConsumer(getConsumerProperties());
              consumerMessageWithHeader(consumer);
          }
          /**
           * 获取连接Kafka的配置
           * @return Properties
           */
          public static Properties getConsumerProperties() {
              Properties consumerProperties = new Properties();
              consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                      "xxx:9092,xxx:9092,xxx:9092");
              consumerProperties.put("sasl.kerberos.service.name","kafka");
              consumerProperties.put("sasl.mechanism","GSSAPI");
              consumerProperties.put("security.protocol","SASL_PLAINTEXT");
              consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
              consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
              consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "ConsumerGroup1");
              return consumerProperties;
          }
          /**
           * 消费带有Header的记录
           * @param consumer KafkaConsumer
           */
          public static void consumerMessageWithHeader(KafkaConsumer consumer) {
              consumer.subscribe(Arrays.asList("xc"));
              while (true){
                  ConsumerRecords records = consumer.poll(Duration.ofMinutes(10));
                  for (ConsumerRecord record : records) {
                      System.out.println("消费记录的key:"+record.key());
                      System.out.println("消费记录的value:"+record.value());
                      Headers consumedHeaders = record.headers();
                      for (Header header : consumedHeaders) {
                          System.out.println("header的key:"+header.key());
                          System.out.println("header的value:"+new String(header.value()));
                      }
                  }
              }
          }
      }
      

    结果:

    生产者生产消息:

    hello
    生产第1条数据
    world
    生产第2条数据
    

    消费者消费消息:

    消费记录的key:key0
    消费记录的value:生产的数据为:hello
    header的key:website
    header的value:www.xc.com
    消费记录的key:key1
    消费记录的value:生产的数据为:world
    header的key:website
    header的value:www.xc.com
    

    Flink消费Header案例

    package com.byd.dev;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    import org.apache.hadoop.security.UserGroupInformation;
    import java.io.IOException;
    import java.security.PrivilegedAction;
    import java.util.Properties;
    public class FlinkKafkaHeader {
        public static String topic = "xc";
        public static void main(String[] args) {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            // kerberos认证
            System.setProperty("java.security.krb5.conf", "D:/demo/krb5.conf");
            System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
            System.setProperty("java.security.auth.login.config", "D:/demo/jaas.conf");
            try {
                UserGroupInformation.getCurrentUser().doAs(new PrivilegedAction() {
                    @Override
                    public Object run() {
                        try {
                            runApp(env);
                        } catch (Exception e) {
                            throw new RuntimeException(e);
                        }
                        return null;
                    }
                });
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        
        public static void runApp(StreamExecutionEnvironment env) throws Exception {
            FlinkKafkaConsumer consumer =
                    new FlinkKafkaConsumer(topic, new MyDeserializationSchema(),
                            getKafkaProperties());
            consumer.setStartFromEarliest();
            env.addSource(consumer).print();
            env.execute(FlinkKafkaHeader.class.getSimpleName());
        }
        /**
         * 获取连接Kafka的配置
         * @return Properties
         */
        public static Properties getKafkaProperties() {
            Properties props = new Properties();
            props.setProperty("bootstrap.servers", "xxx:9092,xxx:9092,xxx:9092");
            props.setProperty("group.id", "group1");
            props.setProperty("flink.partition-discovery.interval-millis", "10000");
            props.setProperty("security.protocol", "SASL_PLAINTEXT");
            props.setProperty("sasl.mechanism", "GSSAPI");
            props.setProperty("sasl.kerberos.service.name", "kafka");
            return props;
        }
    }
    
    package com.byd.dev;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.common.header.Header;
    public class MyDeserializationSchema implements KafkaDeserializationSchema {
        @Override
        public boolean isEndOfStream(String s) {
            return false;
        }
        @Override
        public String deserialize(ConsumerRecord consumerRecord) throws Exception {
            Iterable headers = consumerRecord.headers();
            for (Header header : headers) {
                String key = header.key();
                String value = new String(header.value(), "UTF-8");
                // 处理 header 数据
                System.out.println("kafka的header数据:" + key + ":" + value);
            }
            return new String(consumerRecord.value(), "UTF-8");
        }
        @Override
        public TypeInformation getProducedType() {
            return TypeInformation.of(String.class);
        }
    }
    

    Header设计思路

    KafkaHeader的设计思路是基于Kafka的消息体系结构设计的,消息体系结构包括了消息的消息体、消息头和消息尾。KafkaHeader被设计成一个可扩展的消息头,可以为消息添加一些有用的元数据。KafkaHeader的键使用字符串表示,并且值可以是任何字节序列。

    KafkaHeader还可以使用基于优先级的机制,覆盖或者添加KafkaHeader。当发送相同主题和分区的消息时,新的KafkaHeader将覆盖旧的KafkaHeader。

    KafkaProducer和KafkaConsumer都提供了API,用于访问和修改KafkaHeader。其中KafkaProducer提供的API可以为将要发送到Kafka服务器的消息添加或者删除KafkaHeader。而KafkaConsumer提供的API可以用于从接收到的消息中提取KafkaHeader,并对消息进行分析和处理。

    Header的性能影响和注意事项

    由于KafkaHeader是在消息中添加的元数据,因此在为消息添加KafkaHeader时需要注意以下几点:

    1. 注意KafkaHeader的大小:KafkaHeader的大小会影响网络传输的性能,因此在添加KafkaHeader时需要权衡添加元数据的重要程度和通信性能的需要。

    2. 注意KafkaHeader支持的数据类型:KafkaHeader支持的数据类型包括了任何字节序列,因此在添加KafkaHeader时需要确保添加的元数据可以正确地解析和处理。

    3. 注意KafkaHeader的覆盖机制:当发送相同主题和分区的消息时,新的KafkaHeader会覆盖旧的KafkaHeader,因此需要注意不要在消息发送过程中出现意外丢失KafkaHeader的情况。

    4. 注意KafkaHeader的存在:KafkaHeader虽然可以为消息提供有用的元数据,但是当使用者处理消息时需要确保消息本身是唯一的标识,而不是KafkaHeader。

    本文总结

    本文介绍了 Kafka 中的 header,它可以用来传递一些与数据内容无关的附加信息,方便消费者解析和处理消息。与数据的 key、value 不同,header 不包含实际的消息内容,它们只是元数据,不影响消息的生命周期。在生产者和消费者中都可以使用 Kafka header,可以通过 API 来添加、读取和操作 header。

    参考文献

    1. https://www.baeldung.com/java-kafka-custom-headers
    2. https://www.python100.com/html/55R0B31WXT2C.html
VPS购买请点击我

文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。

目录[+]