2024-02-27(Kafka)
1.Kafka中所有的消息都是保存在主题中的,要生产消息到Kafka,首先必须要创建一个主题。
2.Kafka的生产者/消费者
安装kafka集群,可以测试如下:
创建一个topic主题(消息都是存放在topic中,类似mysql建表的过程)
基于kafka的内置测试生产者脚本来读取标准输入(键盘输入)的数据,并放入到topic中
基于kafka的内置测试消费者脚本来消费topic中的数据
推荐在开发中使用kafka tool
浏览kafka集群节点,多少个topic,多少个分区
创建topic/删除topic
浏览Zookeeper中的数据
3.Kafka的基准测试工具
Kafka中提供了内置的性能测试工具
生产者:测试出来每秒传输的数据量(多少条数据,多少M的数据)
消费者:测试消费者每秒拉取的数据量
对比生产者和消费者:消费者的速度更快
4.最简单的Kafka集群图
broker:
一个Kafka集群通常是由多个broker组成,这样才能实现f负载均衡,以及容错机制。
broker是无状态(Stateless)的,它们是通过Zookeeper来维护集群状态。
一个Kafka的broker每秒可以处理数十万次读写,每个broker都可以处理TB消息而不互相影响性能。
zookeeper:
ZK用来管理协调broker的,并且存储了Kafka的元数据(例如:有多少个topic,partition,consumer)
ZK服务主要用于通知生产者和消费者Kafka集群中有新的broker加入,或者Kafka集群中出现故障的broker。
题外话:Kafka正在逐步想办法将ZK剥离,维护两套集群成本较高,社区提出KIP-500就是要替换掉Zookeeper的依赖。“Kafka on Kafka”-----Kafka自己来管理自己的元数据。
生产者(producer):
生产者负责将数据推送给broker的topic。
消费者(consumer):
消费者负责从broker的topic中拉取数据,并自己处理。
消费者组(consumer group):
consumer group 是kafka提供的可扩展且具有容错机制的消费者机制。
一个消费者组可以包含多个消费者。
一个消费者组有一个唯一的ID(group id)
组内的消费者一起消费主题的所有分区数据。
分区(partition):
在Kafka集群中,主题被分为多个分区。
Kafka集群的分布式就是由这个分区来实现的。一个Topic中的数据(消息)可以分布在Topic中的不同partition中。
副本(Replicas):
副本用来实现Kafka集群的容错,其实就是分区partition的容错,一个topic应该至少包含大于1个的副本。
副本可以确保某个服务器出现故障时,确保数据依然可以用。
在Kafka中,一般都会设计副本的数量 > 1。
主题(Topic):
一个topic可以包含多个分区(注意:这里是大数据里面的分区的概念),如下图所示:
主题是一个逻辑概念,用于生产者发布数据,消费者消费数据。
Kafka中的主题必须要有标识符,而且是唯一的,Kafka中可以有任意数量的主题,没有数量上的限制。
在主题中的消息是有结构的,一般一个主题包含某一类消息。
一旦生产者发送消息到主题中,这些消息就不能被更新(更改)。
偏移量(offset):
offset记录着下一条要发送给Consumer的消息序号。
默认Kafka将offset存储在zookeeper中。
在一个分区中,消息是有顺序的方式存储则着,每个在分区的消费都是一个递增的id,这个就是偏移量offset。
偏移量在分区中才是最有意义的。在分区之间,offset是没有任何意义的。
5.Kafka生产者的幂等性
如上图所示:在生产者生产消息的时候,如果出现retry,有可能会一条消息被发送了多次,如果Kafka不具备幂等性,就有可能会在partition中保存多条一模一样的消息。
代码中配置幂等性:props.put("enable.idempotence",true);
幂等性原理:
为了实现幂等性,Kafka引入了Producer ID(PID)和Sequence Number的概念:
a.PID:每个Producer在初始化时,都会分配一个唯一的PID,这个PID对用户来说,是透明的。
b.Sequence Number:针对每个生产者(对于PID)发送到指定主题分区的消息都对应一个从0开始递增的Sequence Number;
具体看下图:
综述:
生产者消息重复问题:
Kafka生产者生产消息到partition,如果直接发送消息,kafka会将消息保存到分区中,但是kafka会返回一个ack给生产者,表示当前操作是否成功,是否已经保存了这条消息。如果ack响应的过程失败了,此时生产者会重试,继续发送没有发送成功的消息,Kafka又会保存一条一模一样的消息。
在Kafka中可以开启幂等性:
a.当Kafka的生产者生产消息时,会增加一个pid(生产者的唯一编号)和sequence number(针对消息的一个递增序列)
b.发送消息,会连着pid和sequence number一并发送
c.kafka收到消息,会将消息和pid,sequence number一并保存下来
d.如果ack响应失败,生产者重试,再次发送消息时,kafka会根据pid,sequence number来判断是否需要在保存这条消息。
e.判断条件:生产者发送过来的sequence number是否小于等于partition中消息对应的sequence number。
6.生产者分区写入策略
生产者写入消息到topic,Kafka将根据不同的策略将数据分配到不同分区中去。策略:
a.轮询分区策略(默认策略,key为null,就用这个策略)
b.随机分区策略(不用了)
c.按key分区分配策略(可能出现数据倾斜,key.hash()%分区数量)
d.自定义分区策略
7.消费者组的Rebalance机制
Kafka中的Rebalance称之为再均衡,是Kafka中确保消费者组中所有的消费者如何达成一致,分配订阅topic中每个分区的机制
Rebalance触发的时机:
1.消费者组中消费者的个数发生了变化,比如有新的消费者加入或者某个消费者停止了。
2.订阅的topic数量发生变化
3.订阅的topic分区数发生了变化
Rebalance的不良影响:
1.发生再分配(rebalance)时,消费者组下所有的消费者都会协调在一起共同参与,Kafka使用分配策略尽可能达到最公平的分配。
2.再分配过程会对消费者组产生非常严重的影响,再分配的过程中所有消费者都将停止工作,直到再分配的完成。
8.消费者的分区分配策略
目的是保证每个消费者尽量能够均衡的消费分区的数据,不能出现某个消费者消费分区的数量特别多,某个消费者消费分区的数量特别少。
1.Range范围分配策略
range范围分配策略是Kafka默认的分配策略,它可以确保每个消费者消费的分区数量是均衡的。
注意:Range范围分配策略是针对每个Topic的。
算法公式:
m = 分区数量 / 消费者数量
n = 分区数量 % 消费者数量
前m个消费者消费n + 1 个
剩余消费者消费n个
2.RoundRobin轮询策略
RoundRobin轮询策略是将消费者组内所有消费者以及消费者所订阅的所有Topic的partition按照字典序排序(topic和分区的hashcode进行排序),然后通过轮询的方式逐个将分区以此分配给每个消费者。
3.Stricky粘性分配策略
Kafka 0.11x引入次策略。目的:
1)分区分配尽可能均匀
2)在发生rebalance的时候,分区的分配尽可能与上一次分配保持相同
没有发生rebalance时,Stricky粘性分配策略和RoundRobin分配策略类似。
9.副本机制
副本的目的就是冗余备份,当某个Broker上的分区数据丢失时,依然可以保障数据可用。因为在其他的Broker上的副本是可用的。
生产者会不断的往Kafka中写入数据,写入数据会有一个返回结果表示是否写入成功。这里对应有一个ACKs的配置。
producer的ACKs参数:
1)acks配置为0:
不等待broker确认,直接发送下一条数据,性能最高,但可能会存在数据丢失的情况
2)acks配置为1:
等待leader副本确认接收后,才会发送下一条数据,性能中等。
3)acks配置为-1或者all:
等待所有的副本已经将数据同步后,才会发送下一条数据,性能最慢。
根据业务情况来选择ack机制,是要求高性能,一部分数据丢失影响不大,可以选择0/1,如果要求数据一定不能丢失,就配置为-1/all。
分区中有leader和follower概念。为了确保消费者消费的数据是一致的,只能从分区leader去读写消息,follower做的事情就是同步数据。
10.Kafka-Eagle:kafka监控工具
11.分区的leader和follower
(注意:leader和follower这两个概念是针对分区来的,而不是broker)
在Kafka中,每个topic都可以配置多个分区以及多个副本。每个分区都有一个leader以及0个或者多个follower,在创建topic时,Kafka会将每个分区的leader均匀的分配在每个broker上。我们正常使用kafka是感受不到leader,follower的存在的。但其实,所有的读写操作都是由leader处理,而所有的follower都复制leader的日志数据文件,如果leader出现故障,follower就会被选举为leader。所以可以这样说:
Kafka中的leader负责处理读写操作,而follower只负责副本数据的同步。
如果leader出现故障,其他follower会被重新选举为leader。
follower像一个consumer一样,拉取leader对应分区的数据,并保存到日志数据文件中。
12.AR,ISR,OSR
AR(Assigned Replicas------已分配的副本):表示一个Topic下的所有副本。
ISR(In-sync replicas------在同步中的副本):正在同步的副本(可以理解为当前有几个follower是存活的)。
OSR(Out-of-Sync Replicas):不在同步的副本。
大白话总结:AR所有的,ISR正常的,OSR异常的。
正常情况下,所有的follower副本都应该与leader副本保持同步,即AR = ISR,OSR集合为空。
13.Leader选举
kafka的吞吐量很高,延迟很低,所以要选举leader的话,必须要快。
Controller介绍:
Kafka启动时,会在所有的broker中选择一个controller
前面leader和follower是针对partition分区,而controller是针对broker的
创建topic、或者添加分区、修改副本数量之类的管理任务都是由controller完成的
Kafka分区leader的选举,也是由Controller决定的:
1)所有的分区的leader选举都由controller决定
2)controller会将leader的改变直接通过RPC的方式通知需为此做出响应的Broker
3)controller读取到当前分区的ISR,只要有一个Replica还存活,就选择其中一个作为leader,否则,则任意选择一个Replica作为leader。
4)如果该分区的所有Replica都已经宕机,则新的leader为-1。
Controller:controller是kafka集群的老大,是针对broker的一个角色
controller是高可用的,是通过ZK来选举的
Leader:是针对partition分区的一个角色
Leader是通过ISR来快速选举的
14.Kafka中生产者的数据写入流程
生产者先从ZK的 "/brokers/topics/主题名/partitions/分区名/state"节点找到该分区的leader。
生产者在ZK中找到该ID对应的broker
broker进程上的leader将消息写入到本地log中
follower从leader上拉取消息,写入本地log,并向leader发送ACK
leader接收到所有ISR中的Replica的ACK后,并向生产者返回ACK
15.Kafka的读写流程
写流程:
通过ZK找到分区对应的leader,leader是负责读写的
生产者开始写入数据
ISR里面的follower开始同步数据,并返回给leader ACK确认
最后返回给生产者ACK
读流程:
通过ZK找到分区对应的leader,leader是负责读写的
通过ZK找到消费者对应的offset
从offset往后顺序拉取数据
提交offset