kafka 快速上手
下载 Apache Kafka
演示window 安装
编写启动脚本,脚本的路径根据自己实际的来
启动说明
先启动zookeeper后启动kafka,关闭是先关kafka,然后关闭zookeeper
巧记: 铲屎官(zookeeper)总是第一个到,最后一个走
启动zookeeper
call bin/windows/zookeeper-server-start.bat config/zookeeper.properties
启动kafka
call bin/windows/kafka-server-start.bat config/server.properties
测试脚本,主要用于创建主题 ‘test-topic’
# 创建主题(窗口1) bin/window> kafka-topics.bat --bootstrap-server localhost:9092 --topic test-topic --create # 查看主题 bin/window> kafka-topics.bat --bootstrap-server localhost:9092 --list bin/window> kafka-topics.bat --bootstrap-server localhost:9092 --topic test-topic --describe # 修改某主题的分区 bin/window> kafka-topics.bat --bootstrap-server localhost:9092 --topic test-topic --alter --partitions 2 # 生产消息(窗口2)向test-topic主题发送消息 bin/window> kafka-console-producer.bat --bootstrap-server localhost:9092 --topic test-topic >hello kafka # 消费消息(窗口3)消费test-topic主题的消息 bin/window> kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test-topic
package com.ldj.kafka.admin; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.NewTopic; import java.util.*; /** * User: ldj * Date: 2024/6/13 * Time: 0:00 * Description: 创建主题 */ public class AdminTopic { public static void main(String[] args) { Map adminConfigMap = new HashMap(); adminConfigMap.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); AdminClient adminClient = AdminClient.create(adminConfigMap); /** * 使用kafka默认的分区算法创建分区 */ NewTopic topic1 = new NewTopic("topic-01", 1, (short) 1); NewTopic topic2 = new NewTopic("topic-02", 2, (short) 2); CreateTopicsResult addResult1 = adminClient.createTopics(Arrays.asList(topic1, topic2)); /** * 手动为主题(topic-03)分配分区 * topic-03主题下的0号分区有2个副本,它们中的一个在节点id=1中,一个在节点id=2中; * list里第一个副本就是leader(主写),后面都是follower(主备份) * 例如:0分区,nodeId=1的节点里的副本是主写、2分区,nodeId=3的节点里的副本是主写 */ Map partition = new HashMap(); partition.put(0, Arrays.asList(1, 2)); partition.put(1, Arrays.asList(2, 3)); partition.put(2, Arrays.asList(3, 1)); NewTopic topic3 = new NewTopic("topic-03", partition); CreateTopicsResult addResult2 = adminClient.createTopics(Collections.singletonList(topic3)); //DeleteTopicsResult delResult = adminClient.deleteTopics(Arrays.asList("topic-02")); adminClient.close(); } }
package com.ldj.kafka.producer; import com.alibaba.fastjson.JSON; import com.ldj.kafka.model.UserEntity; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.concurrent.Future; /** * User: ldj * Date: 2024/6/12 * Time: 21:08 * Description: 生产者 */ public class KfkProducer { public static void main(String[] args) throws Exception { //生产者配置 Map producerConfigMap = new HashMap(); producerConfigMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); producerConfigMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); producerConfigMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //批量发送 producerConfigMap.put(ProducerConfig.BATCH_SIZE_CONFIG, 2); //消息传输应答安全级别 0-消息到达broker(效率高,但不安全) 1-消息在leader副本持久化(折中方案) -1/all -消息在leader和flower副本都持久化(安全,但效率低) producerConfigMap.put(ProducerConfig.ACKS_CONFIG, "all"); //ProducerState 缓存5条数据,重试数据会与5条数据做比较,结论只能保证一个分区的数据幂等性,跨会话幂等性需要通过事务操作解决(重启后全局消息id的随机id会发生改变) //消息发送失败重试次数,重试会导致消息重复!!(考虑幂等性),消息乱序(判断偏移量是否连续,错乱消息回到在缓冲区重新排序)!! producerConfigMap.put(ProducerConfig.RETRIES_CONFIG, 3); //kafka有消息幂等性处理(全局唯一消息id/随机id-分区-偏移量),默认false-不开启 producerConfigMap.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); //解决跨会话幂等性,还需结合事务操作,忽略 //producerConfigMap.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx_id"); //创建生产者 KafkaProducer producer = new KafkaProducer(producerConfigMap); //TODO 事务初始化方法 //producer.initTransactions(); //构建消息 ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable headers) try { //TODO 开启事务 //producer.beginTransaction(); for (int i = 0; i { if (Objects.isNull(var2)) { System.out.printf("[%s]消息发送成功!", userEntity.getUserId()); } else { System.out.printf("[%s]消息发送失败!err:%s", userEntity.getUserId(), var2.getCause()); } }); //TODO 提交事务 //producer.commitTransaction(); //注意没有下面这行代码,是异步线程从缓冲区读取数据异步发送消息,反之是同步发送,必须等待回调消息返回才会往下执行 System.out.printf("发送消息[%s]----", userEntity.getUserId()); RecordMetadata recordMetadata = future.get(); System.out.println(recordMetadata.offset()); } } finally { //TODO 终止事务 //producer.abortTransaction(); //关闭通道 producer.close(); } } }
package com.ldj.kafka.consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Collections; import java.util.HashMap; import java.util.Map; /** * User: ldj * Date: 2024/6/12 * Time: 21:10 * Description: 消费者 */ public class KfkConsumer { public static void main(String[] args) { //消费者配置 Map consumerConfigMap = new HashMap(); consumerConfigMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); consumerConfigMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); consumerConfigMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); //所属消费组 consumerConfigMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test123456"); //创建消费者 KafkaConsumer consumer = new KafkaConsumer(consumerConfigMap); //消费主题的消息 ConsumerRebalanceListener consumer.subscribe(Collections.singletonList("test-topic")); try { while (true) { ConsumerRecords records = consumer.poll(Duration.ofSeconds(5)); //数据存储结构:Map records; for (ConsumerRecord record : records) { System.out.println(record.value()); } } } finally { //关闭消费者 consumer.close(); } } }
文章版权声明:除非注明,否则均为主机测评原创文章,转载或复制请以超链接形式并注明出处。