KafkaController之leader选举成功

上文说到在kafka集群中如何选择leader,在选完leader后就会调用onControllerFailOver方法。该方法会处理以下几个事情:

1.读取zk中上个controller写入的epoch和version

2.将第一步读取出来的epoch加1并写入zk中,以便让其他broker知晓已经有broker被选为controller了(通过比较zk中的epoch与自身的epoch)

3.向/admin/reassign_partitions注册PartitionsReassignedListener,该listener主要是监测/admin/reassign_partitions数据改变的事件,该路径的数据是由ReassignPartitionsCommand命令行写入,写入数据的具体格式类似为:{version:1,partitions:{topic:xxx,partition:1,replicas:[1,2]}}。当写入数据后,该listener就会触发handleDataChange方法:

def handleDataChange(dataPath: String, data: Object) {
    debug("Partitions reassigned listener fired for path %s. Record partitions to be reassigned %s"
      .format(dataPath, data))
    val partitionsReassignmentData = ZkUtils.parsePartitionReassignmentData(data.toString)
    //在所要分配的partition中去除正在被分配的那些partition
    val partitionsToBeReassigned = inLock(controllerContext.controllerLock) {
      partitionsReassignmentData.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
    }
    partitionsToBeReassigned.foreach { partitionToBeReassigned =>
      inLock(controllerContext.controllerLock) {
        //判断该topic是否是待删除的topic
        if(controller.deleteTopicManager.isTopicQueuedUpForDeletion(partitionToBeReassigned._1.topic)) {
          error("Skipping reassignment of partition %s for topic %s since it is currently being deleted"
            .format(partitionToBeReassigned._1, partitionToBeReassigned._1.topic))
          controller.removePartitionFromReassignedPartitions(partitionToBeReassigned._1)
        } else {
          val context = new ReassignedPartitionsContext(partitionToBeReassigned._2)
          controller.initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1, context)
        }
      }
    }
}

removePartitionFromReassignedPartitions方法是将待删除的partition从/admin/reassign_partitions路径删除并将最新的待分配的partition数据更新到/admin/reassign_partitions中:

def removePartitionFromReassignedPartitions(topicAndPartition: TopicAndPartition) {
    if(controllerContext.partitionsBeingReassigned.get(topicAndPartition).isDefined) {
      // stop watching the ISR changes for this partition
      zkClient.unsubscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
        controllerContext.partitionsBeingReassigned(topicAndPartition).isrChangeListener)
    }
    // read the current list of reassigned partitions from zookeeper
    ///admin/reassign_partitions的数据是正在分配replica的那些partition,当分配完后相应的partition会被删除
    val partitionsBeingReassigned = ZkUtils.getPartitionsBeingReassigned(zkClient)
    // remove this partition from that list
    //更新正在分配的那些partition
    val updatedPartitionsBeingReassigned = partitionsBeingReassigned - topicAndPartition
    // write the new list to zookeeper
    //如果没有正在分配的partition了,那么直接删除该path,否则修改该path的数据
    ZkUtils.updatePartitionReassignmentData(zkClient, updatedPartitionsBeingReassigned.mapValues(_.newReplicas))
    // update the cache. NO-OP if the partition's reassignment was never started
    controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
}

initiateReassignReplicasForTopicPartition方法判断该partition是否需要重新分配replica,仅仅在所要分配的新的replica与之前向该partition分配的replica不一致并且新的replica都是有效的情况下才会为该partition重新分配replica:

def onPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) {
    val reassignedReplicas = reassignedPartitionContext.newReplicas
    //判断所需分配的replica是否都在之前分配的isr中
    areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas) match {
      case false =>
        //需要新分配的replicas有些不在以前的isr中,这时候需要修改相应的数据
        info("New replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
          "reassigned not yet caught up with the leader")
        //所需要分配的新的replica
        val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet
        //所有replica
        val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet
        //1. Update AR in ZK with OAR + RAR.
        updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq)
        //2. Send LeaderAndIsr request to every replica in OAR + RAR (with AR as OAR + RAR).
        //向所有replica发送修改leaderAndIsr请求
        updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition),
          newAndOldReplicas.toSeq)
        //3. replicas in RAR - OAR -> NewReplica
        //将需要添加的新的replica状态设置为NewReplica
        startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList)
        info("Waiting for new replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
          "reassigned to catch up with the leader")
      case true =>
        //所要分配的replica都在之前分配的isr中
        //4. Wait until all replicas in RAR are in sync with the leader.
        //不需要再进行分配的那些replica
        val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet
        //5. replicas in RAR -> OnlineReplica
        //将需要分配的replica的状态置为Online
        reassignedReplicas.foreach { replica =>
          replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition,
            replica)), OnlineReplica)
        }
        //6. Set AR to RAR in memory.
        //7. Send LeaderAndIsr request with a potential new leader (if current leader not in RAR) and
        //   a new AR (using RAR) and same isr to every broker in RAR
        //把最新需要分配的replica放入缓存并且向所有replica发出修改leaderAndIsr的消息
        //当所需分配的replica不包含当前该partition的leader则需要在要重新分配的replicas中重新选取leader
        moveReassignedPartitionLeaderIfRequired(topicAndPartition, reassignedPartitionContext)
        //8. replicas in OAR - RAR -> Offline (force those replicas out of isr)
        //9. replicas in OAR - RAR -> NonExistentReplica (force those replicas to be deleted)
        //将需要删除的replica的状态设置为NonExistentReplica
        stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext, oldReplicas)
        //10. Update AR in ZK with RAR.
        //更新zk中的数据
        updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas)
        //11. Update the /admin/reassign_partitions path in ZK to remove this partition.
        //分配完成后更新/admin/reassign_partitions的数据,如果/admin/reassign_partitions中没有需要分配的partition则删除该路径
        removePartitionFromReassignedPartitions(topicAndPartition)
        info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition))
        controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
        //12. After electing leader, the replicas and isr information changes, so resend the update metadata request to every broker
        //向所有broker发送UpdateMetadataRequest
        sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicAndPartition))
        // signal delete topic thread if reassignment for some partitions belonging to topics being deleted just completed
        deleteTopicManager.resumeDeletionForTopics(Set(topicAndPartition.topic))
    }
}

4.向/admin/preferred_replica_election注册PreferredReplicaElectionListener,/admin/preferred_replica_election路径的数据是由PreferredReplicaLeaderElectionCommand命令行写入,该命令行的作用是为partition重新选举leader replica的,写入的数据格式为:{partitions:[{topic:foo,partition:1},{topic:foobar,partition:2}]},当/admin/preferred_replica_election路径的数据发生改变时,就会触发PreferredReplicaElectionListener的handleDataChange方法:

def handleDataChange(dataPath: String, data: Object) {
    debug("Preferred replica election listener fired for path %s. Record partitions to undergo preferred replica election %s"
            .format(dataPath, data.toString))
    inLock(controllerContext.controllerLock) {
      val partitionsForPreferredReplicaElection = PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(data.toString)
      ////正在选举leader replica的那些partition
      if(controllerContext.partitionsUndergoingPreferredReplicaElection.size > 0)
        info("These partitions are already undergoing preferred replica election: %s"
          .format(controllerContext.partitionsUndergoingPreferredReplicaElection.mkString(",")))
      //去除那些正在选举replica的partition
      val partitions = partitionsForPreferredReplicaElection -- controllerContext.partitionsUndergoingPreferredReplicaElection
      //去除那些需要被删除的topic对应的partition
      val partitionsForTopicsToBeDeleted = partitions.filter(p => controller.deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
      if(partitionsForTopicsToBeDeleted.size > 0) {
        error("Skipping preferred replica election for partitions %s since the respective topics are being deleted"
          .format(partitionsForTopicsToBeDeleted))
      }
      controller.onPreferredReplicaElection(partitions -- partitionsForTopicsToBeDeleted)
    }
}

onPreferredReplicaElection方法的定义为:

def onPreferredReplicaElection(partitions: Set[TopicAndPartition], isTriggeredByAutoRebalance: Boolean = false) {
    info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(",")))
    try {
      //修改正在选举leader replica的数据
      controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions
      //将这些topic设置为延迟删除
      deleteTopicManager.markTopicIneligibleForDeletion(partitions.map(_.topic))
      //将这些partition的状态置为Online并为这些partition分别选举一个replica作为leader,选举的方法是直接取分配的所有replica的第一个作为leader
      partitionStateMachine.handleStateChanges(partitions, OnlinePartition, preferredReplicaPartitionLeaderSelector)
    } catch {
      case e: Throwable => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e)
    } finally {
      removePartitionsFromPreferredReplicaElection(partitions, isTriggeredByAutoRebalance)
      deleteTopicManager.resumeDeletionForTopics(partitions.map(_.topic))
    }
}

逻辑并不难,只是为指定的partition选举一个leader replica出来,选举的方法是直接取replicas的第一个作为leader。

5.为/brokers/topics注册TopicChangeListener和DeleteTopicsListener,当topic数量发生改变时,就会触发TopicChangeListener的handleChildChange方法:

def handleChildChange(parentPath : String, children : java.util.List[String]) {
  inLock(controllerContext.controllerLock) {
    if (hasStarted.get) {
      try {
        //当前在zk中最新的topic数据
        val currentChildren = {
          import JavaConversions._
          debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(",")))
          (children: Buffer[String]).toSet
        }
        //新增的topic
        val newTopics = currentChildren -- controllerContext.allTopics
        //被删除的topic
        val deletedTopics = controllerContext.allTopics -- currentChildren
        controllerContext.allTopics = currentChildren

        //获取新增的topic对应的replica分布情况
        val addedPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, newTopics.toSeq)
        //更新partitionReplicaAssignment数据
        controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
          !deletedTopics.contains(p._1.topic))
        controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
        info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics,
          deletedTopics, addedPartitionReplicaAssignment))
        if(newTopics.size > 0)
          //为新增的topic注册partitionChangeListener,并将这些topic对应的partition的状态设置为OnlinePartition
          controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet.toSet)
      } catch {
        case e: Throwable => error("Error while handling new topic", e )
      }
    }
  }
}

当删除一个topic时(通过命令行:TopicCommand完成),比如删除xxx这个topic,这时就会在zk中创建/admin/delete_topics/xxx的路径,创建完成之后就会触发DeleteTopicsListener的handleChildChange方法:

def handleChildChange(parentPath : String, children : java.util.List[String]) {
  inLock(controllerContext.controllerLock) {
    //待删除的topic
    var topicsToBeDeleted = {
      import JavaConversions._
      (children: Buffer[String]).toSet
    }
    debug("Delete topics listener fired for topics %s to be deleted".format(topicsToBeDeleted.mkString(",")))
    val nonExistentTopics = topicsToBeDeleted.filter(t => !controllerContext.allTopics.contains(t))
    if(nonExistentTopics.size > 0) {
      warn("Ignoring request to delete non-existing topics " + nonExistentTopics.mkString(","))
      //不存在的topic直接删除路径
      nonExistentTopics.foreach(topic => ZkUtils.deletePathRecursive(zkClient, ZkUtils.getDeleteTopicPath(topic)))
    }
    topicsToBeDeleted --= nonExistentTopics
    if(topicsToBeDeleted.size > 0) {
      info("Starting topic deletion for topics " + topicsToBeDeleted.mkString(","))
      // mark topic ineligible for deletion if other state changes are in progress
      topicsToBeDeleted.foreach { topic =>
        //该topic正在选举leader replica
        val preferredReplicaElectionInProgress =
          controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic).contains(topic)
        //该topic正在分配partition
        val partitionReassignmentInProgress =
          controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic)
        if(preferredReplicaElectionInProgress || partitionReassignmentInProgress)
          //延迟删除该topic
          controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
      }
      // add topic to deletion list
      //加入删除队列,唤醒TopicDeletionThread
      controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)
    }
  }
}

DeleteTopicsThread线程主要做三件事:

(1).向所有的broker发送UpdateMetadata请求,以使broker不再接受待删除的topic的请求

(2).设置topic的replica的状态为OffLine,这时会发送StopReplicaRequest到相应的replica并向leader replica发送LeaderAndIsrRequest,如果leader replica也被设置为OffLine,那么leader会被设置为-1

(3).设置topic的replica的状态为ReplicaDeletionStarted,这时会向broker发送StopReplicaRequest,进而删除replica的所有临时数据

主要代码如下:

private def startReplicaDeletion(replicasForTopicsToBeDeleted: Set[PartitionAndReplica]) {
    replicasForTopicsToBeDeleted.groupBy(_.topic).foreach { case(topic, replicas) =>
      //该topic的有效replica
      var aliveReplicasForTopic = controllerContext.allLiveReplicas().filter(p => p.topic.equals(topic))
      //无效的replica
      val deadReplicasForTopic = replicasForTopicsToBeDeleted -- aliveReplicasForTopic
      //已删除的replica
      val successfullyDeletedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
      //待删除的replica
      val replicasForDeletionRetry = aliveReplicasForTopic -- successfullyDeletedReplicas
      // move dead replicas directly to failed state
      replicaStateMachine.handleStateChanges(deadReplicasForTopic, ReplicaDeletionIneligible)
      // send stop replica to all followers that are not in the OfflineReplica state so they stop sending fetch requests to the leader
      //将待删除的replica的状态设置为Offline
      replicaStateMachine.handleStateChanges(replicasForDeletionRetry, OfflineReplica)
      debug("Deletion started for replicas %s".format(replicasForDeletionRetry.mkString(",")))
      controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry, ReplicaDeletionStarted,
        new Callbacks.CallbackBuilder().stopReplicaCallback(deleteTopicStopReplicaCallback).build)
      if(deadReplicasForTopic.size > 0) {
        debug("Dead Replicas (%s) found for topic %s".format(deadReplicasForTopic.mkString(","), topic))
        markTopicIneligibleForDeletion(Set(topic))
      }
    }
}

6.为/brokers/ids注册BrokerChangeListener,/brokers/ids路径的数据由KafkaHealthcheck类写入,该类的startup方法会在KafkaServer中被调用,将当前的brokerid写入该路径中。当有broker增减时即/brokers/ids路径会有子路径发生变化就会触发该listener的handleChildChange方法:

def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) {
  info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.mkString(",")))
  inLock(controllerContext.controllerLock) {
    if (hasStarted.get) {
      ControllerStats.leaderElectionTimer.time {
        try {
          val curBrokerIds = currentBrokerList.map(_.toInt).toSet
          //新增的broker
          val newBrokerIds = curBrokerIds -- controllerContext.liveOrShuttingDownBrokerIds
          val newBrokerInfo = newBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _))
          val newBrokers = newBrokerInfo.filter(_.isDefined).map(_.get)
          //要删除的broker
          val deadBrokerIds = controllerContext.liveOrShuttingDownBrokerIds -- curBrokerIds
          //更新内存中broker数据
          controllerContext.liveBrokers = curBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
          info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s"
            .format(newBrokerIds.mkString(","), deadBrokerIds.mkString(","), controllerContext.liveBrokerIds.mkString(",")))
          //创建新增broker的channel,用于发送和接收数据
          newBrokers.foreach(controllerContext.controllerChannelManager.addBroker(_))
          //删除待删broker对应的channel
          deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker(_))
          if(newBrokerIds.size > 0)
            controller.onBrokerStartup(newBrokerIds.toSeq)
          if(deadBrokerIds.size > 0)
            controller.onBrokerFailure(deadBrokerIds.toSeq)
        } catch {
          case e: Throwable => error("Error while handling broker changes", e)
        }
      }
    }
  }
}

该方法中的onBrokerStartup和onBrokerFailure方法比较重要,这两个方法确保partition和replica在集群中动态发生变化。

onBrokerStartup方法主要做下面几件事情:

(1)向新增的broker发送UpdateMetadata请求,该请求会修改broker的一些内存数据,比如partition及replica的分配情况。

(2)将新增broker对应的replica状态置为Online

(3)将之前该broker被置为New或Offline的partition的状态重新置为Online

(4)将新增broker中正在分配partition的topic重新分配partition(onPartitionReassignment)。

(5)删除新增broker中需要删除的topic。

onBrokerFailure方法主要做一下几件事情:

(1)将partition的leader在将要去除的broker中的那些partition的状态置为Offline。

(2)将那些在需要去除的broker中但不需要删除的topic对应的broker的状态置为Offline。

(3)删除那些需要删除的topic。

7.初始化ControllerContext,包括当前可用broker,所有topic,partition及其replica分配情况,partition的leaderIsr信息等数据:

private def initializeControllerContext() {
    // update controller cache with delete topic information
    controllerContext.liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet
    controllerContext.allTopics = ZkUtils.getAllTopics(zkClient).toSet
    controllerContext.partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, controllerContext.allTopics.toSeq)
    controllerContext.partitionLeadershipInfo = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
    controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int]
    // update the leader and isr cache for all existing partitions from Zookeeper
    //将zk中的leaderIsr信息放入partitionLeadershipInfo中
    updateLeaderAndIsrCache()
    // start the channel manager
    startChannelManager()
    //更新partitionsUndergoingPreferredReplicaElection的数据,保留可以选举leader的partition
    initializePreferredReplicaElection()
    //更新partitionsBeingReassigned的数据,保留可以分配replica的partition
    initializePartitionReassignment()
    //初始化TopicDeletionManager
    initializeTopicDeletion()
    info("Currently active brokers in the cluster: %s".format(controllerContext.liveBrokerIds))
    info("Currently shutting brokers in the cluster: %s".format(controllerContext.shuttingDownBrokerIds))
    info("Current list of topics in the cluster: %s".format(controllerContext.allTopics))
}

8.为所有topic(路径为/brokers/topics/xxx)注册AddPartitionsListener,当某个topic的数据发生变化时就会触发handleDataChange方法:

def handleDataChange(dataPath : String, data: Object) {
  inLock(controllerContext.controllerLock) {
    try {
      info("Add Partition triggered " + data.toString + " for path " + dataPath)
      val partitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, List(topic))
      //新增的partition
      val partitionsToBeAdded = partitionReplicaAssignment.filter(p =>
        !controllerContext.partitionReplicaAssignment.contains(p._1))
      if(controller.deleteTopicManager.isTopicQueuedUpForDeletion(topic))
        error("Skipping adding partitions %s for topic %s since it is currently being deleted"
              .format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic))
      else {
        if (partitionsToBeAdded.size > 0) {
          info("New partitions to be added %s".format(partitionsToBeAdded))
          //将新增partition的状态置为Online
          controller.onNewPartitionCreation(partitionsToBeAdded.keySet.toSet)
        }
      }
    } catch {
      case e: Throwable => error("Error while handling add partitions for data path " + dataPath, e )
    }
  }
}

9.启动checkAndTriggerPartitionRebalance线程,该线程主要任务是检查当前集群中topic的leader replica与分配的replicas的第一个不同的topic数量是否占到总topic的数量的指定比例,如果占到了就为这些topic重新选取leader replica:

private def checkAndTriggerPartitionRebalance(): Unit = {
    if (isActive()) {
      trace("checking need to trigger partition rebalance")
      // get all the active brokers
      var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]] = null
      inLock(controllerContext.controllerLock) {
        preferredReplicasForTopicsByBrokers =
          //topic-0:[1,2] topic-1:[1,2,3] topic-2:[2,3]
          //1:{topic-0:[1,2],topic-1:[1,2,3]} 2:{topic-2:[2,3]}
          //按分配的replica的第一个元素进行groupBy
          controllerContext.partitionReplicaAssignment.filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p._1.topic)).groupBy {
            case(topicAndPartition, assignedReplicas) => assignedReplicas.head
          }
      }
      debug("preferred replicas by broker " + preferredReplicasForTopicsByBrokers)
      // for each broker, check if a preferred replica election needs to be triggered
      preferredReplicasForTopicsByBrokers.foreach {
        //这里的leaderBroker指的是assignedReplicas.head,即第一个replica作为leader
        case(leaderBroker, topicAndPartitionsForBroker) => {
          var imbalanceRatio: Double = 0
          var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = null
          inLock(controllerContext.controllerLock) {
            //获取topic当前leader与分配的replica第一个元素不同的topic
            topicsNotInPreferredReplica =
              topicAndPartitionsForBroker.filter {
                case(topicPartition, replicas) => {
                  controllerContext.partitionLeadershipInfo.contains(topicPartition) &&
                  controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker
                }
              }
            debug("topics not in preferred replica " + topicsNotInPreferredReplica)
            //broker下所有的topic数量
            val totalTopicPartitionsForBroker = topicAndPartitionsForBroker.size
            val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size
            imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker
            trace("leader imbalance ratio for broker %d is %f".format(leaderBroker, imbalanceRatio))
          }
          // check ratio and if greater than desired ratio, trigger a rebalance for the topic partitions
          // that need to be on this broker
          if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
            topicsNotInPreferredReplica.foreach {
              case(topicPartition, replicas) => {
                inLock(controllerContext.controllerLock) {
                  // do this check only if the broker is live and there are no partitions being reassigned currently
                  // and preferred replica election is not in progress
                  if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
                      controllerContext.partitionsBeingReassigned.size == 0 &&
                      controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0 &&
                      !deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
                      controllerContext.allTopics.contains(topicPartition.topic)) {
                    //重新选择leader
                    onPreferredReplicaElection(Set(topicPartition), true)
                  }
                }
              }
            }
          }
        }
      }
    }
}

至此,broker被选举为controller(leader)之后的操作就全部完成了。下文会讲broker在某些情况下重新被选举为leader之后的一些操作。



Previous     Next
uohzoaix /
Published under (CC) BY-NC-SA in categories kafka  tagged with