Kafka的ISR收缩机制

article/2025/9/13 19:42:22
  1. ISR什么时候收缩
  2. ISR什么时候扩展
  3. ISR的传播机制
  4. Broker宕机之后怎么ISR的收缩?

Kafka在启动的时候,会启动一个副本管理器ReplicaManager,这个副本管理器会启动几个定时任务

  1. ISR过期定时任务isr-expiration,每隔replica.lag.time.max.ms/2毫秒就执行一次。
  2. ISR变更的传播定时任务isr-change-propagation,每隔2500毫秒就执行一次。

replica.lag.time.max.ms : 如果一个follower在这个时间内没有发送fetch请求,leader将从ISR中移除这个follower。从2.5开始 ,默认值从 10 秒增加到 30 秒。

文章目录

      • 1.ISR收缩 isr-expiration
      • 2.ISR 扩展
      • 3.ISR广播 maybePropagateIsrChanges
      • 4.Controller监听/isr_change_notification/子节点
      • 5.Q/A

1.ISR收缩 isr-expiration

每隔replica.lag.time.max.ms/2毫秒执行一次

ReplicaManager#maybeShrinkIsr

  // 尝试收缩ISR, 遍历所有在线状态的分区,检查是否需要收缩private def maybeShrinkIsr(): Unit = {allPartitions.keys.foreach { topicPartition =>nonOfflinePartition(topicPartition).foreach(_.maybeShrinkIsr())}}

,先遍历所有的分区,找出本台Broker上所有在线的分区 进行遍历,去尝试收缩ISR。

ReplicaManager#maybeShrinkIsr

 def maybeShrinkIsr(): Unit = {val needsIsrUpdate = inReadLock(leaderIsrUpdateLock) {// 判断是否需要伸缩:当前分区是Leader&&(follower副本LEO!=Leader副本LEO && ( (currentTimeMs - followerReplica.lastCaughtUpTimeMs) > replica.lag.time.max.ms))needsShrinkIsr()}val leaderHWIncremented = needsIsrUpdate && inWriteLock(leaderIsrUpdateLock) {leaderLogIfLocal match {case Some(leaderLog) =>// 再获取一次 OSR, 有点双重检查锁的意思。val outOfSyncReplicaIds = getOutOfSyncReplicas(replicaLagTimeMaxMs)if (outOfSyncReplicaIds.nonEmpty) {val newInSyncReplicaIds = inSyncReplicaIds -- outOfSyncReplicaIds// 更新zk中的isr信息和cache中的isr信息shrinkIsr(newInSyncReplicaIds)// we may need to increment high watermark since ISR could be down to 1maybeIncrementLeaderHW(leaderLog)} else {false}case None => false // do nothing if no longer leader}}// some delayed operations may be unblocked after HW changedif (leaderHWIncremented)tryCompleteDelayedRequests()}
  1. 找到所有需要收缩的副本OSR,判断条件:
    ①.当前分区必须是Leader
    ②.follower副本LEO!=Leader副本LEO(如果相等的话,那表示跟Leader保持最高同步了,也就没必要收缩)
    ③.follower副本中,当前时间 - 上一次去leader获取数据的时间戳 > replica.lag.time.max.ms(2.5版本开始默认30000ms)

  2. 计算新的 newISR = 当前ISR - 1中获取到的OSR .
    ①. 将newISR组装一下成newLeaderData对象(还包含leader和epoche等信息),然后将信息写入到zk持久节点/brokers/topics/{Topic名称}/partitions/{分区号}/state中.
    ②.如果写入成功,则更新一下2个对象内存, isrChangeSet对象保存着ISR变更记录,lastIsrChangeMs记录这最新一次ISR变更的时间戳。一会这两个两个对象,在ISR传播的时候需要用到。
    ③.如果写入成功,则更新一下2个对象内存,inSyncReplicaIds=newISR, zkVersion = newVersion。

  3. 尝试增加HW(高水位), maybeIncrementLeaderHW 这个方法可能会在 ①.ISR变更 ②.任何副本的LEO更改 这两种情况下触发调用。当然我们这种场景触发是因为ISR变更了。如果HW有更新,则返回true,否则返回false,具体逻辑,请看下面。

  4. 如果3中更新成功,则触发一下待处理的延迟操作。这里包含一些fetch、produce、deleteRecords等延迟请求。

增加HW(高水位)的逻辑:

Partition#maybeIncrementLeaderHW

  private def maybeIncrementLeaderHW(leaderLog: Log, curTime: Long = time.milliseconds): Boolean = {inReadLock(leaderIsrUpdateLock) {// 先将leader的LEO 设置为 newHWvar newHighWatermark = leaderLog.logEndOffsetMetadata//遍历所有副本,找到最新的HW, 计算逻辑就是,在同步副本内的 最小LEO.remoteReplicasMap.values.foreach { replica =>if (replica.logEndOffsetMetadata.messageOffset < newHighWatermark.messageOffset &&// 要么在ISR里面,要么上一次Fetche数据距离现在<= replica.lag.time.max.ms(curTime - replica.lastCaughtUpTimeMs <= replicaLagTimeMaxMs || inSyncReplicaIds.contains(replica.brokerId))) {newHighWatermark = replica.logEndOffsetMetadata}}leaderLog.maybeIncrementHighWatermark(newHighWatermark) match {//打印一些日志,并返回是否更新成功。}}}
  1. 遍历所有的副本,找到 所有在ISR中的副本和 上一次Fetche数据距离现在<=replica.lag.time.max.ms时间但是还没有来得及进入ISR列表的副本, 然后从这些副本中找到最小的LEO newHW.

  2. 如果newHW > 当前Leader的LEO,抛异常,这种情况有问题。

  3. 将newHW 和 oldHW做个对比,如果满足下面任意一个条件,则更新 HW的值,否则不更新。
    ①.oldHW.messageOffset < newHW.messageOffset(新的HW>旧的HW)
    ②.oldHW.messageOffset==newHW.messageOffset&&oldHW.onOlderSegment(newHW)。这里解释一下,当LogSegment滚动到新的Segment的时候,就会出现这种情况,更新一下HW(因为日志段变成新的了)

2.ISR 扩展

ISR的缩小,是有一个定时任务定时检查,而ISR扩展可不一样,它是在Follower副本向Leader副本发起 Fetch请求请求的时候会尝试检查是否需要重新加入到ISR中。

当发现Follower副本不在ISR列表的时候,就会执行下面的方法。

Partition#maybeExpandIsr

  private def maybeExpandIsr(followerReplica: Replica, followerFetchTimeMs: Long): Unit = {//检查一下是否满足 扩展的条件val needsIsrUpdate = inReadLock(leaderIsrUpdateLock) {needsExpandIsr(followerReplica)}if (needsIsrUpdate) {inWriteLock(leaderIsrUpdateLock) {// 再坚持一遍是否需要伸展,双重检查。if (needsExpandIsr(followerReplica)) {val newInSyncReplicaIds = inSyncReplicaIds + followerReplica.brokerIdinfo(s"Expanding ISR from ${inSyncReplicaIds.mkString(",")} to ${newInSyncReplicaIds.mkString(",")}")// update ISR in ZK and cacheexpandIsr(newInSyncReplicaIds)}}}}//判断Follower副本是否有资格进入isr列表 followLEO>=HWprivate def isFollowerInSync(followerReplica: Replica, highWatermark: Long): Boolean = {val followerEndOffset = followerReplica.logEndOffsetfollowerEndOffset >= highWatermark && leaderEpochStartOffsetOpt.exists(followerEndOffset >= _)}
  1. 检查当前发起 Fetch请求 请求的Follower副本是否满足加入ISR的条件, 条件如下(与运算):
    ①. 当前副本不在ISR列表中
    ②. Follower的LEO>=HW(高水位) && Follower的LEO>= 当前Leader的LogStartOffset

  2. 如果满足条件,则开始执行 ISR扩展的流程,这里的流程跟上面 ISR缩小 的时候差不多。
    ①. 将newISR组装一下成newLeaderData对象(还包含leader和epoche等信息),然后将信息写入到zk持久节点/brokers/topics/{Topic名称}/partitions/{分区号}/state中.
    ②.如果写入成功,则更新一下2个对象内存, isrChangeSet对象保存着ISR变更记录,lastIsrChangeMs记录这最新一次ISR变更的时间戳。一会这两个两个对象,在ISR传播的时候需要用到。
    ③.如果写入成功,则更新一下2个对象内存,inSyncReplicaIds=newISR, zkVersion = newVersion。

在这里插入图片描述

上面的ISR伸缩,只是去zk上修改了ISR的数据和Controller里面的内存数据。啥时候通知对应的Broker ISR已经变更了呢?

3.ISR广播 maybePropagateIsrChanges

每隔2500毫秒就执行一次。

上面的ISR的收缩和扩展,最终呈现的结果是 修改ISR和内存,但是并没有通知到每个Broker。

只修改zk中的/brokers/topics/{Topic名称}/partitions/{分区号}/state节点,并不会通知集群,ISR已经变更了,因为正常情况下,Broker是没有去监听每一个state节点的。

因为在整个集群中,state节点太多了,一个分区一个,每个节点都去监听的话成本有点高。

除了在分区副本重分配的时候,会去监听迁移的state节点,其他情况都没有监听。

定时任务定时去传播ISR的变更。

ReplicaManager#maybePropagateIsrChanges

  def maybePropagateIsrChanges(): Unit = {val now = System.currentTimeMillis()isrChangeSet synchronized {if (isrChangeSet.nonEmpty &&(lastIsrChangeMs.get() + ReplicaManager.IsrChangePropagationBlackOut < now ||lastIsrPropagationMs.get() + ReplicaManager.IsrChangePropagationInterval < now)) {zkClient.propagateIsrChanges(isrChangeSet)isrChangeSet.clear()lastIsrPropagationMs.set(now)}}}
  1. 判断是否满足传播条件,条件为下(同时满足)
    ①. 判断isrChangeSet不为空值,这里的isrChangeSet就是我们上面ISR收缩成功之后装填的值。
    ②. (lastIsrChangeMs(上次ISR变更时间) + 5000 < 当前时间)或者
    (lastIsrPropagationMs(上次传播时间) + 60000< 当前时间)
    总结: 有ISR变更过了&&(上一次ISR变更时间距离现在超过了5秒 || 上次传播时间距离现在超过了60秒)。
    这避免了短时间内多次ISR变更发起多次传播。当超过60秒都没有发起过传播,则立马发起传播。

  2. 开始传播!
    传播的方式就是,创建一个顺序的持久节点/isr_change_notification/isr_change_序号,节点内容就是 isrChangeSet。

  3. 清空isrChangeSet,更新 lastIsrPropagationMs(上次传播时间)

4.Controller监听/isr_change_notification/子节点

上面我们说因为正常情况下,Broker是没有去监听每一个state节点的(除了分区副本重分配),那么为了避免监听多个节点,只要有ISR变更就创建了/isr_change_notification/isr_change_序号节点,Controller只需要监听这个节点就可以指定哪个ISR变更了。

KafkaController#processIsrChangeNotification

  private def processIsrChangeNotification(): Unit = {def processUpdateNotifications(partitions: Seq[TopicPartition]): Unit = {val liveBrokers: Seq[Int] = controllerContext.liveOrShuttingDownBrokerIds.toSeqdebug(s"Sending MetadataRequest to Brokers: $liveBrokers for TopicPartitions: $partitions")sendUpdateMetadataRequest(liveBrokers, partitions.toSet)}if (!isActive) return// 去zk顺序节点/isr_change_notification 获取所有子节点的序号val sequenceNumbers = zkClient.getAllIsrChangeNotificationstry {// 拿到了子节点路径之后,就去zk查询所有子节点的数据。val partitions = zkClient.getPartitionsFromIsrChangeNotifications(sequenceNumbers)// 如果有的话,则做一些更新if (partitions.nonEmpty) {// 这里是去zk把变更过的Partitions 读取state节点的数据,然后重新加载到内存中updateLeaderAndIsrCache(partitions)//向所有Broker发送更新元数据的请求processUpdateNotifications(partitions)}} finally {// 处理完之后 把刚刚获取到的/isr_change_notification 子节点删除掉。zkClient.deleteIsrChangeNotifications(sequenceNumbers, controllerContext.epochZkVersion)}}
  1. 去zk获取/isr_change_notification节点的所有zk节点
  2. 根据获取到的子节点路径,然后再去zk读取这些子节点的数据
  3. 第2步骤拿到的是分区号,这时候根据分区号去对应的/brokers/topics/{Topic名称}/partitions/{分区号}/state节点读取新的数据, 然后将新的数据重载到当前Controller的内存中。
  4. 向所有Broker发 UpdateMetadata 请求
  5. 删除/isr_change_notification节点下面的数据。

节点数据结构/isr_change_notification/isr_change_0000000001

在这里插入图片描述

5.Q/A

1.ISR的收缩,是由Controller来控制的,还是每台Broker都可以收缩?

每个Broker都会启动一个收缩定时任务,去监测 当前Broker里面所有在线的Leader分区, 是否有满足收缩条件, 然后进行收缩(写入zk节点信息),Controller角色的作用是在ISR传播的时候监测zk节点,去广播ISR变更了。

2.伸缩过程,会触发Leader选举吗

不会,伸缩只是ISR的变更。不涉及到Leader选举


http://chatgpt.dhexx.cn/article/IkrHALJU.shtml

相关文章

Kafka ISR

ISR&#xff08;in-sync replica&#xff09; 就是 Kafka 为某个分区维护的一组同步集合&#xff0c;即每个分区都有自己的一个 ISR 集合&#xff0c;处于 ISR 集合中的副本&#xff0c;意味着 follower 副本与 leader 副本保持同步状态&#xff0c;只有处于 ISR 集合中的副本才…

kafka中的ISR、AR又代表什么?ISR伸缩又是什么?

kafka中的ISR、AR又代表什么&#xff1f;ISR伸缩又是什么&#xff1f; ​ 分区中的所有副本统称为AR&#xff08;Assigned Repllicas&#xff09;。所有与leader副本保持一定程度同步的副本&#xff08;包括Leader&#xff09;组成ISR&#xff08;In-Sync Replicas&#xff09…

Kafka之ISR机制的理解

Kafka对于producer发来的消息怎么保证可靠性&#xff1f; 每个partition都给配上副本&#xff0c;做数据同步&#xff0c;保证数据不丢失。 副本数据同步策略 和zookeeper不同的是&#xff0c;Kafka选择的是全部完成同步&#xff0c;才发送ack。但是又有所区别。 所以&…

中断ISR技术架构

架构一 ISR采用立即响应思路&#xff0c;技术架构如下图&#xff1a; 优点&#xff1a;简单。 缺点&#xff1a;处理性能不高&#xff0c;中断优先级规划性不高(仅仅区分CPU的32个优先级别&#xff0c;针对不同类型中断优先级不支持)。 选型&#xff1a;对于硬件支持多级中断…

【Java面试】什么是 ISR,为什么需要引入 ISR

Hi&#xff0c;大家好&#xff0c;我是Mic。 一个工作5年的粉丝&#xff0c;在简历上写精通Kafka。 结果在面试的时候直接打脸。 面试官问他&#xff1a;“什么是ISR&#xff0c;为什么需要设计ISR” 然后他一脸懵逼的看着面试官。 下面看看普通人和高手的回答。 需要高手面试文…

2022年正式赛题网络系统管理Linix模块 NFS部分

共享/webdata/目录;用于存储AppSrv主机的WEB数据;仅允许AppSrv主机访问该共享;考虑安全,不论登入NFS 的使用者身份为何,都将其设置为匿名用户访问。

NXP迅为i.MX8Mmini开发板Linix固件编译下

4 编译 Ubuntu20 桌面版本 1 如果大家想要编译 Ubuntu20 桌面版本&#xff0c;首先要将光盘资料“iTOP-i.MX8MM 开发板\01-i.MX8MM 开发板光盘资料\20210830\07-Ubuntu20 系统源码\Ubuntu20 桌面版本” 下的压缩包拷贝到 Linux 源码的根目录下&#xff0c;解压压缩包得到 ubunt…

Linix(CentOS6.5)详细安装

CentOS6.5的安装&#xff08;Minimal&#xff09; 点击CentOS6镜像文件下载 点击CentOS7镜像文件下载 1.点击创建新的虚拟机 2.选择自定义模式 3.选择VMware Workstation的版本 4.选择稍后安装 5.选择合适操作系统&#xff08;64位OR32位&#xff09; 6.更改默认的安装…

Linix环境搭建及概述

linux环境搭建及概述 前言 Linux 的安装&#xff0c;安装步骤比较繁琐&#xff0c;现在其实云服务器挺普遍的&#xff0c;价格也便宜&#xff0c;如果直接不想搭建&#xff0c;也可以直接买一台学习用用&#xff01;废话不多说直接开整 一、安装CentOS&#xff08;虚拟机安装…

Linux系统常用命令--LInix系统随笔(四)

前言&#xff1a;虽然一直在用linux但是一直没有系统的学习过&#xff0c;趁着暑假花了几天看着鸟哥的书学习了一下。下面是我记录的一些笔记&#xff0c;本人属于入门的小白所以难免有不足之处&#xff0c;还望发现的表哥们多多指正。 ①命令格式与目录处理命令ls ls--list…

util-linix 实用程序包中包含了许多系统管理员常用的其它命令

util-linix 实用程序包中包含了许多系统管理员常用的其它命令。这些实用程序是由 Linux 内核组织发布的&#xff0c;这 107 条命令中几乎每一个都来自原本是三个单独的集合 —— fileutils、shellutils 和 textutils&#xff0c;2003 年它们被合并成一个包&#xff1a;util-lin…

LINIX 通过进程号查端口、通过端口查进程号

可以通过 netstat -nlp|grep pid或port&#xff0c;来查询端口、进程号 1.通过进程查PORT 2.通过PORT查进程 netstat命令参数说明 n 直接使用ip地址&#xff0c;而不通过域名服务器 -l 显示监控中的服务器的 Socket -p 显示正在使用 Socket 的程序识别码和程序名称

Linu X

LINU X 基本命令 / 根目录 几个盘几个根目录 dev 设备目录 boot 启动文件 etc 配置目录 home 家目录 用户1.管理员 2.一般用户 proc硬件信息 【benlocalhost desktop]#管理员 管理员 主机名 当前目录 $普通用户 cd 修改&#xff0c;进入当前目录 ls显示当前目录下的项目 ll显示…

linx

linx ls -l 命令详解 上图用ls -l命令查看某一个目录会得到一个7个字段的列表 1. 文件类型     “-”表示普通文件&#xff1b; “d”表示目录&#xff1b; “l”表示链接文件&#xff1b; “p”表示管理文件&#xff1b; “b”表示块设备文件&#xff1b; “c”表示…

Linux-

文章目录 基础知识Linux使用命令整理Linux系统下文件类型颜色表示含义 基础知识 Linux使用命令整理 zip 文件名——&#xff08;压缩命令&#xff09;将所有.jpg的文件压缩成一个zip包 &#xff0c;案例&#xff1a;zip all.zip &#xff1b;zip *.jpg unzip 文件名——&…

Linix

Linix 一、Linix的基本使用1.1 、Linux目录结构1.2、文件目录属性1.3、目文件展示1.4、用户及权限管理1.4.1、概述1.4.2、用户管理1.4.3、组管理1.4.4、权限管理 1.5、查看用户信息 查看用户组信息 二、命令相关2.1、系统相关1.4.4、权限管理 2.2、进程相关2.2.1、端口占用情况…

Linux目录结构与路径

目录 一、Linux目录结构 二、绝对路径与相对路径 一、Linux目录结构 Linux 系统中没有盘符的概念&#xff0c;所有的文件和目录都被组织成以一个根节点开始的倒置的树状结构&#xff0c; 文件系统的最顶层是由根目录开始的&#xff0c;系统使用 / 来表示根目录&#xff0c;呈…

Linux基础

Linix概述 unix 是多用户、多任务的操作系统&#xff0c;Linux是基于Unix的&#xff0c;Linux的版本分为两种&#xff1a;内核版本和发行版本&#xff1b;内核版本是指在 Linus领导下的内核小组开发维护的系统内核的版本号 Linux的远程访问&#xff1a;远程访问的软件:CRT lin…

0.1 什么是计算机

Redhat linux 红帽认证管理员(RHCSA,全称为Red Hat Certified System Administrator)属于红帽Linux的初级入门认证&#xff0c;要求学生能够熟练的执行linux命令&#xff0c;主要考察学生对红帽Linux系统基础管理与维护的能力&#xff0c;如添加用户、修改密码、添加硬盘分区、…

STM32中断向量表的位置,重定向

http://blog.csdn.net/u012722571/article/details/47295245 lanmanck原创】 这篇文章已经说了STM32的启动过程&#xff1a; http://blog.csdn.net/lanmanck/article/details/8252560 我们也知道怎么跳到main函数了&#xff0c;那么&#xff0c;中断发生后&#xff0c;又是…