欢迎光临散文网 会员登陆 & 注册

Kafka关键原理

2023-06-09 20:08 作者:程序员-王坚  | 我要投稿

日志分段切分条件

日志分段文件切分包含以下4个条件,满足其一即可:

  1. 当前日志分段文件的大小超过了broker端参数 log.segment.bytes 配置的值。log.segment.bytes参数的默认值为 1073741824,即1GB

  2. 当前日志分段中消息的最小时间戳与当前系统的时间戳的差值大于log.roll.mslog.roll.hours参数配置的值。如果同时配置了log.roll.mslog.roll.hours参数,那么log.roll.ms的优先级高,默认情况下,只配置了log.roll.hours参数,其值为168,即7天。

  3. 偏移量索引文件或时间戳索引文件的大小达到 broker 端参数log.index.size.max.bytes配置的值。log.index.size .max.bytes的默认值为10485760,即10MB

  4. 追加的消息的偏移量与当前日志分段的起始偏移量之间的差值大于Integer.MAX_VALUE, 即要追加的消息的偏移量不能转变为相对偏移量(offset - baseOffset > Integer.MAX_VALUE)。

什么是Controller

Controller作为Kafka集群中的核心组件,它的主要作用是在Apache ZooKeeper的帮助下管理和协调整个Kafka集群。

Controller与Zookeeper进行交互,获取与更新集群中的元数据信息。其他broker并不直接与zookeeper进行通信,而是与Controller进行通信并同步Controller中的元数据信息。

Kafka集群中每个节点都可以充当Controller节点,但集群中同时只能有一个Controller节点。

Controller简单来说,就是kafka集群的状态管理者

controller竞选机制:简单说,先来先上!

Broker 在启动时,会尝试去 ZooKeeper 中创建 /controller 节点。Kafka 当前选举控制器的规则是:第一个成功创建 /controller 节点的 Broker 会被指定为控制器。

在Kafka集群中会有一个或者多个broker,其中有一个broker会被选举为控制器(Kafka Controller),它负责维护整个集群中所有分区和副本的状态及分区leader的选举。

当某个分区的leader副本出现故障时,由控制器负责为该分区选举新的leader副本。当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其元数据信息。当使用kafka-topics.sh脚本为某个topic增加分区数量时,同样还是由控制器负责分区的重新分配。

Kafka中的控制器选举的工作依赖于Zookeeper,成功竞选为控制器的broker会在Zookeeper中创建/controller这个临时(EPHEMERAL)节点,此临时节点的内容参考如下:

{"version":1,"brokerid":0,"timestamp":"1529210278988"}

其中version在目前版本中固定为1,brokerid表示成为控制器的broker的id编号,timestamp表示竞选成为控制器时的时间戳。

在任意时刻,集群中有且仅有一个控制器。每个broker启动的时候会去尝试去读取zookeeper上的/controller节点的brokerid的值,如果读取到brokerid的值不为-1,则表示已经有其它broker节点成功竞选为控制器,所以当前broker就会放弃竞选;如果Zookeeper中不存在/controller这个节点,或者这个节点中的数据异常,那么就会尝试去创建/controller这个节点,当前broker去创建节点的时候,也有可能其他broker同时去尝试创建这个节点,只有创建成功的那个broker才会成为控制器,而创建失败的broker则表示竞选失败。每个broker都会在内存中保存当前控制器的brokerid值,这个值可以标识为activeControllerId。

controller的职责

  • 监听partition相关变化

对Zookeeper中的/admin/reassign_partitions节点注册PartitionReassignmentListener,用来处理分区重分配的动作。 对Zookeeper中的/isr_change_notification节点注册IsrChangeNotificetionListener,用来处理ISR集合变更的动作。 对Zookeeper中的/admin/preferred-replica-election节点添加PreferredReplicaElectionListener,用来处理优先副本选举。

  • 监听topic增减变化

对Zookeeper中的/brokers/topics节点添加TopicChangeListener,用来处理topic增减的变化; 对Zookeeper中的/admin/delete_topics节点添加TopicDeletionListener,用来处理删除topic的动作

  • 监听broker相关的变化

对Zookeeper中的/brokers/ids/节点添加BrokerChangeListener,用来处理broker增减的变化

  • 更新集群的元数据信息

从Zookeeper中读取获取当前所有与topic、partition以及broker有关的信息并进行相应的管理。 对各topic所对应的Zookeeper中的/brokers/topics/[topic]节点添加PartitionModificationsListener,用来监听topic中的分区分配变化。并将最新信息同步给其他所有broker。

  • 启动并管理分区状态机和副本状态机。

  • 如果参数auto.leader.rebalance.enable设置为true,则还会开启一个名为“auto-leader-rebalance-task”的定时任务来负责维护分区的leader副本的均衡。

分区的负载分布

客户端请求创建一个topic时,每一个分区副本在broker上的分配,是由集群controller来决定;

结论:里面会创建出来两个随机数

第一个随机数确定0号分区leader的位置,往后1号分区2号分区的leader依次往后顺延1

第二个随机数确定每个分区的第一个副本的位置 在leader所在机器上往后顺延(随机数+1)台机器,该台机器就是第一个副本的位置,剩余副本依次往后顺延1

// 举例:// broker_id = 0~19 一共20台机器// 分区数20,副本数10// 第一个随机数:19// 第二个随机数:0(0,ArrayBuffer(19, 0, 1, 2, 3, 4, 5, 6, 7, 8)) (1,ArrayBuffer(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)) (2,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) (3,ArrayBuffer(2, 3, 4, 5, 6, 7, 8, 9, 10, 11)) (4,ArrayBuffer(3, 4, 5, 6, 7, 8, 9, 10, 11, 12)) (5,ArrayBuffer(4, 5, 6, 7, 8, 9, 10, 11, 12, 13)) (6,ArrayBuffer(5, 6, 7, 8, 9, 10, 11, 12, 13, 14)) (7,ArrayBuffer(6, 7, 8, 9, 10, 11, 12, 13, 14, 15)) (8,ArrayBuffer(7, 8, 9, 10, 11, 12, 13, 14, 15, 16)) (9,ArrayBuffer(8, 9, 10, 11, 12, 13, 14, 15, 16, 17)) (10,ArrayBuffer(9, 10, 11, 12, 13, 14, 15, 16, 17, 18)) (11,ArrayBuffer(10, 11, 12, 13, 14, 15, 16, 17, 18, 19)) (12,ArrayBuffer(11, 12, 13, 14, 15, 16, 17, 18, 19, 0)) (13,ArrayBuffer(12, 13, 14, 15, 16, 17, 18, 19, 0, 1)) (14,ArrayBuffer(13, 14, 15, 16, 17, 18, 19, 0, 1, 2)) (15,ArrayBuffer(14, 15, 16, 17, 18, 19, 0, 1, 2, 3)) (16,ArrayBuffer(15, 16, 17, 18, 19, 0, 1, 2, 3, 4)) (17,ArrayBuffer(16, 17, 18, 19, 0, 1, 2, 3, 4, 5)) (18,ArrayBuffer(17, 18, 19, 0, 1, 2, 3, 4, 5, 6)) (19,ArrayBuffer(18, 19, 0, 1, 2, 3, 4, 5, 6, 7))// 其分布策略源码如下:private def assignReplicasToBrokersRackUnaware( nPartitions: Int, //分区的个数   10replicationFactor: Int,  //副本的个数  5 brokerList: Seq[Int],//broker的集合    8   0~7fixedStartIndex: Int//默认值是-1  固定开始的索引位置startPartitionId: Int): Map[Int, Seq[Int]] //默认值是-1 分区开始的位置= {  val ret = mutable.Map[Int, Seq[Int]]()  val brokerArray = brokerList.toArray  val startIndex = if (fixedStartIndex >= 0) {      fixedStartIndex  }else {          rand.nextInt(brokerArray.length)  }  var currentPartitionId = math.max(0, startPartitionId)  var nextReplicaShift = if (fixedStartIndex >= 0) {          fixedStartIndex  }else {          rand.nextInt(brokerArray.length)  }  for (_ <- 0 until nPartitions) {    if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0)){      nextReplicaShift += 1        }    val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length    val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))    for (j <- 0 until replicationFactor - 1) {                                replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))    }    ret.put(currentPartitionId, replicaBuffer)    currentPartitionId += 1  }  ret }                   private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {  val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)  (firstReplicaIndex + shift) % nBrokers }

  • 副本因子不能大于 Broker 的个数(报错:Replication factor: 4 larger than available brokers: 3.);

  • partition_0的第1个副本(leader副本)放置位置是随机从 brokerList 选择的;

  • 其他分区的第1个副本(leader)放置位置相对于paritition_0分区依次往后移(也就是如果我们有5个 Broker,5个分区,假设partition0分区放在broker4上,那么partition1将会放在broker5上;patition2将会放在broker1上;partition3在broker2,依次类);

  • 各分区剩余的副本相对于分区前一个副本偏移随机数nextReplicaShift+1,然后后面的副本依次加1


Kafka关键原理的评论 (共 条)

分享到微博请遵守国家法律