《kafka权威指南》读书笔记2

Posted by BY KiloMeter on March 14, 2019

第五章 深入kafka

kafka集群中,broker的注册都是在zookeeper上进行的,每当有broker加入集群时,都会在zookeeper上注册一个临时节点/brokers/ids,当broker发生宕机,长时间垃圾回收停顿时,会断开和zookeeper的连接,zookeeper会删除该broker的临时节点,移除集群,并通知kafka该节点已退群。

关闭broker时,也会删除临时节点,如果之后有相同id的broker加入集群,将会拥有和旧broker相同的分区和主题。

集群中有一个控制器,除了有一般broker的功能外,还负责分区首领的选举

一般是集群中第一台启动的broker会首先在zookeeper上注册一个/controller,成为首领,其他broker试图创建该节点时将会报错,并且broker会在这里创建一个zookeeper watch对象监听这个节点,如果控制器挂了,则会申请成为新的控制器。同时zookeeper会维护一个递增的controller epoch,controller向其他broker发消息时,会发送这个controller epoch,如果遇到包含旧的controller epoch,会忽略这些消息。

如果宕掉的controller上刚好有分区副本的首领副本,新的controller将会负责选举出新的首领副本。

复制

kafka中的消息是存放在主题中的,每个主题有多个分区,而分区又有若干个副本,这些副本保存在broker上,每个broker可以保存许多属于不同主题的分区和副本。

每个分区都有自己的首领副本,为了保证数据的一致性,生产者和消费者都会和首领副本打交道

跟随者副本唯一的任务是从首领副本那里复制消息,和首领副本保持同步,如果首领副本发生崩溃,则跟随者副本会被提升为首领副本。

跟随者副本在向首领副本请求数据时,是会按顺序请求数据的,首领副本可以根据跟随者请求的偏移量判断该跟随者副本的复制情况,如果跟随者副本没有和首领副本保持一致,那么该跟随者副本是无法提升为首领副本的。

处理请求

broker会在每一个监听的端口上运行一个Accpet线程,遇到客户端发来的请求时,Accpet线程会创建一个连接,并把该连接交给Processor线程(网络线程)处理,网络线程主要负责从客户端获取请求消息(如生产者发送的请求,或者消费者和跟随者副本要从首领副本读取消息时的请求),把请求消息放进请求队列,然后从响应队列获取响应的消息,并发送给客户端。请求消息放进请求队列后,IO线程会负责处理它们。

上面讲到客户端发送请求到broker上,但是前面复制部分讲到,生产者和消费者都必须和首领副本打交道,那客户端是如何把消息发送过去的呢?实际上,客户端在请求的时候,发送的请求是元数据请求,该请求包含了客户端想要请求的主题,然后服务器端会把与之相关的主题分区,副本,首领副本信息发送给客户端。而这些元数据是缓存在所有的broker上的,因此元数据请求可以发往任意一个broker。

一般情况下,客户端会把服务器端响应的消息缓存起来,客户端还会定期刷新元数据。

生产请求

当包含首领副本的broker接收到客户端生产请求时,会验证以下内容:

1、该用户是否有写入权限?

2、acks值是否有效(0,1或者all)

3、如果acks=all,是否有足够多的同步副本保证消息已被安全写入?

消息写入kafka时,并不是立刻写入磁盘,而是写入到文件系统缓存里,不保证何时会被刷新到磁盘上,如果acks被设置为all,那么只有等所有的跟随者副本都复制了消息,才会响应给客户端。

获取请求

获取数据时,会判断请求的偏移量是否存在

还能够设置获取数据的上限和下限,这样可以减少CPU和网络开销,同样也可以定义超时时间。通常情况下,消费者只能够读取已经同步到所有跟随者副本的消息

kafka物理存储

分区分配

假设现在有6台broker,10个分区,副本数为3。kafka的分区和副本的分配遵循以下原则:

1、在broker中平均分布分区副本,每台broker上有5个副本

2、确保每个分区的每个副本在不同的机器上

3、如果为broker指定了机架信息,那么尽可能把每个分区的副本分配到不同的机架上。

为了实现以上三点,先随机选择一个broker(假设编号为4),采用轮询的方式先确认首领分区的位置,分区0的首领副本在编号为4的broker,分区1的首领副本在编号为5的broker上…. 副本同理,分区0的跟随者副本依次分布在编号为5,0的broker上。

文件存储机制

我们都知道kafka中的topic分成若干个partition,但是一个partition底层是由多个segment组成的,想象一下,如果消费者持续往一个partition中写入数据,如果只有一个文件的话,那么该文件将会变得相当庞大,会对文件的清除造成带来严重影响,因此每个partition又分成多个segment,在使用kafka的过程中,数据保存的时候,是以topic名称+partition作为文件目录的,而这个文件目录下面存放的index和log文件就是segment。index文件保存大量的元数据索引,log文件保存真正的数据。

00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log

每个index文件的命名是上一个index文件最后的偏移量+1(第一个文件为000…000)

假如想要读取offset=368776的message,需要通过下面2个步骤查找。

  1. 查找segment file 00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0.第二个文件00000000000000368769.index的消息量起始偏移量为368770 = 368769 + 1.同样,第三个文件00000000000000737337.index的起始偏移量为737338=737337 + 1,其他后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据offset 二分查找文件列表,就可以快速定位到具体文件。 当offset=368776时定位到00000000000000368769.index|log
  2. 通过segment file查找message 通过第一步定位到segment file,当offset=368776时,依次定位到00000000000000368769.index的元数据物理位置和00000000000000368769.log的物理偏移地址,然后再通过00000000000000368769.log顺序查找直到offset=368776为止。

segment index file并没有为数据文件中的每条message建立索引,而是采取稀疏索引存储方式,每隔一定字节的数据建立一条索引,它减少了索引文件大小,通过map可以直接内存操作,稀疏索引为数据文件的每个对应message设置一个元数据指针,它比稠密索引节省了更多的存储空间,但查找起来需要消耗更多的时间。