Kafka集群生产数据

kafka生产数据主要通过Producer类完成,在生产数据时有两种方式可以使用:sync及async,顾名思义,sync方式是一旦有数据产生就马上进行处理,async是将产生的数据放入一个队列中,等待相关线程(ProducerSendThread)去批量处理数据。这两种方式最终都会由DefaultEventHandler的handle方法处理产生的数据。

在async方式中,将产生的数据放入queue时有三种不同的放入方式:

1)当queue.enqueue.timeout.ms=0,则立即放入queue中并返回true,若queue已满,则立即返回false

2)当queue.enqueue.timeout.ms<0,则立即放入queue,若queue已满,则一直等待queue释放空间

3)当queue.enqueue.timeout.ms>0,则立即放入queue中并返回true,若queue已满,则等待queue.enqueue.timeout.ms指定的时间以让queue释放空间,若时间到queue还是没有足够空间,则立即返回false

ProducerSendThread线程会不断地从queue中取出数据直到取出的数据为shutdownCommand,在取出数据后,如果取出的数据总量达到设置的batch.num.messages属性的值或者取出的数据为null则立即处理取出的所有数据。

最后真正处理数据由DefaultEventHandler.handle()方法进行:

def handle(events: Seq[KeyedMessage[K,V]]) {
    //序列化数据,分别由keyEncoder和valueEncoder对key和value进行序列化
    val serializedData = serialize(events)
    serializedData.foreach {
      keyed =>
        val dataSize = keyed.message.payloadSize
        producerTopicStats.getProducerTopicStats(keyed.topic).byteRate.mark(dataSize)
        producerTopicStats.getProducerAllTopicsStats.byteRate.mark(dataSize)
    }
    var outstandingProduceRequests = serializedData
    var remainingRetries = config.messageSendMaxRetries + 1
    val correlationIdStart = correlationId.get()
    debug("Handling %d events".format(events.size))
    //重试次数达到一定次数退出不再发送
    while (remainingRetries > 0 && outstandingProduceRequests.size > 0) {
      topicMetadataToRefresh ++= outstandingProduceRequests.map(_.topic)
      //超过刷新间隔
      if (topicMetadataRefreshInterval >= 0 &&
          SystemTime.milliseconds - lastTopicMetadataRefreshTime > topicMetadataRefreshInterval) {
        //更新topic信息
        CoreUtils.swallowError(brokerPartitionInfo.updateInfo(topicMetadataToRefresh.toSet, correlationId.getAndIncrement))
        //sendPartitionPerTopicCache存储的是上次将数据发送到了哪个partition,存储起来是为了在一段时间内能够一直发送到该partition,不会频繁的去取模决定发送到哪个partition,一段时间后clear掉该数据是为了能够使数据分布均匀
        sendPartitionPerTopicCache.clear()
        topicMetadataToRefresh.clear
        lastTopicMetadataRefreshTime = SystemTime.milliseconds
      }
      outstandingProduceRequests = dispatchSerializedData(outstandingProduceRequests)
      //发送之后如果还有数据未发送则sendPartitionPerTopicCache.clear(),确保这次发送的partition和上次的partition不一样,达到均衡分布的目的
      if (outstandingProduceRequests.size > 0) {
        //重试发送
        info("Back off for %d ms before retrying send. Remaining retries = %d".format(config.retryBackoffMs, remainingRetries-1))
        // back off and update the topic metadata cache before attempting another send operation
        Thread.sleep(config.retryBackoffMs)
        // get topics of the outstanding produce requests and refresh metadata for those
        CoreUtils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.topic).toSet, correlationId.getAndIncrement))
        sendPartitionPerTopicCache.clear()
        remainingRetries -= 1
        producerStats.resendRate.mark()
      }
    }
    if(outstandingProduceRequests.size > 0) {
      producerStats.failedSendRate.mark()
      val correlationIdEnd = correlationId.get()
      error("Failed to send requests for topics %s with correlation ids in [%d,%d]"
        .format(outstandingProduceRequests.map(_.topic).toSet.mkString(","),
        correlationIdStart, correlationIdEnd-1))
      throw new FailedToSendMessageException("Failed to send messages after " + config.messageSendMaxRetries + " tries.", null)
    }
}

真正发送数据的方法是dispatchSerializedData:

private def dispatchSerializedData(messages: Seq[KeyedMessage[K,Message]]): Seq[KeyedMessage[K, Message]] = {
    val partitionedDataOpt = partitionAndCollate(messages)
    partitionedDataOpt match {
      case Some(partitionedData) =>
        val failedProduceRequests = new ArrayBuffer[KeyedMessage[K, Message]]
        for ((brokerid, messagesPerBrokerMap) <- partitionedData) {
          if (logger.isTraceEnabled) {
            messagesPerBrokerMap.foreach(partitionAndEvent =>
              trace("Handling event for Topic: %s, Broker: %d, Partitions: %s".format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
          }
          val messageSetPerBrokerOpt = groupMessagesToSet(messagesPerBrokerMap)
          messageSetPerBrokerOpt match {
            case Some(messageSetPerBroker) =>
              //发送到broker对应的partition中
              val failedTopicPartitions = send(brokerid, messageSetPerBroker)
              failedTopicPartitions.foreach(topicPartition => {
                messagesPerBrokerMap.get(topicPartition) match {
                    //将发送有误的topicPartition对应的数据重新放入failedProduceRequests
                  case Some(data) => failedProduceRequests.appendAll(data)
                  case None => // nothing
                }
              })
            case None => // failed to group messages
              messagesPerBrokerMap.values.foreach(m => failedProduceRequests.appendAll(m))
          }
        }
        failedProduceRequests
      case None => // failed to collate messages
        messages
    }
}

partitionAndCollate方法是将数据决定放入topic的哪个partition中:

//将数据放入topic的指定partition对应的arrayBuffer中
def partitionAndCollate(messages: Seq[KeyedMessage[K,Message]]): Option[Map[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]] = {
    val ret = new HashMap[Int, collection.mutable.Map[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]
    try {
      for (message <- messages) {
        //获取topic的所有partition
        val topicPartitionsList = getPartitionListForTopic(message)
        //决定将数据放在哪个partition,如果sendPartitionPerTopicCache有相应的数据(表示未到刷新时间)则直接返回相应的partition,否则通过取模算出并将结果放入sendPartitionPerTopicCache
        val partitionIndex = getPartition(message.topic, message.partitionKey, topicPartitionsList)
        val brokerPartition = topicPartitionsList(partitionIndex)

        // postpone the failure until the send operation, so that requests for other brokers are handled correctly
        val leaderBrokerId = brokerPartition.leaderBrokerIdOpt.getOrElse(-1)

        var dataPerBroker: HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]] = null
        ret.get(leaderBrokerId) match {
          case Some(element) =>
            dataPerBroker = element.asInstanceOf[HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]]
          case None =>
            dataPerBroker = new HashMap[TopicAndPartition, Seq[KeyedMessage[K,Message]]]
            ret.put(leaderBrokerId, dataPerBroker)
        }

        //将数据放入leader的指定partition中
        val topicAndPartition = TopicAndPartition(message.topic, brokerPartition.partitionId)
        var dataPerTopicPartition: ArrayBuffer[KeyedMessage[K,Message]] = null
        dataPerBroker.get(topicAndPartition) match {
          case Some(element) =>
            dataPerTopicPartition = element.asInstanceOf[ArrayBuffer[KeyedMessage[K,Message]]]
          case None =>
            dataPerTopicPartition = new ArrayBuffer[KeyedMessage[K,Message]]
            dataPerBroker.put(topicAndPartition, dataPerTopicPartition)
        }
        dataPerTopicPartition.append(message)
      }
      Some(ret)
    }catch {    // Swallow recoverable exceptions and return None so that they can be retried.
      case ute: UnknownTopicOrPartitionException => warn("Failed to collate messages by topic,partition due to: " + ute.getMessage); None
      case lnae: LeaderNotAvailableException => warn("Failed to collate messages by topic,partition due to: " + lnae.getMessage); None
      case oe: Throwable => error("Failed to collate messages by topic, partition due to: " + oe.getMessage); None
    }
}

在发送到指定的broker时需要将发送有错误的数据重新获取到以重新发送:

//返回发送有错误的topicPartition
private def send(brokerId: Int, messagesPerTopic: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]) = {
    if(brokerId < 0) {
      warn("Failed to send data since partitions %s don't have a leader".format(messagesPerTopic.map(_._1).mkString(",")))
      messagesPerTopic.keys.toSeq
    } else if(messagesPerTopic.size > 0) {
      val currentCorrelationId = correlationId.getAndIncrement
      val producerRequest = new ProducerRequest(currentCorrelationId, config.clientId, config.requestRequiredAcks,
        config.requestTimeoutMs, messagesPerTopic)
      var failedTopicPartitions = Seq.empty[TopicAndPartition]
      try {
        //获取broker对应的SyncProducer(在启动broker时创建)
        val syncProducer = producerPool.getProducer(brokerId)
        debug("Producer sending messages with correlation id %d for topics %s to broker %d on %s:%d"
          .format(currentCorrelationId, messagesPerTopic.keySet.mkString(","), brokerId, syncProducer.config.host, syncProducer.config.port))
        val response = syncProducer.send(producerRequest)
        debug("Producer sent messages with correlation id %d for topics %s to broker %d on %s:%d"
          .format(currentCorrelationId, messagesPerTopic.keySet.mkString(","), brokerId, syncProducer.config.host, syncProducer.config.port))
        if(response != null) {
          //有数据未发送过去
          if (response.status.size != producerRequest.data.size)
            throw new KafkaException("Incomplete response (%s) for producer request (%s)".format(response, producerRequest))
          if (logger.isTraceEnabled) {
            val successfullySentData = response.status.filter(_._2.error == ErrorMapping.NoError)
            successfullySentData.foreach(m => messagesPerTopic(m._1).foreach(message =>
              trace("Successfully sent message: %s".format(if(message.message.isNull) null else message.message.toString()))))
          }
          //发送有错误或者处理有误的数据
          val failedPartitionsAndStatus = response.status.filter(_._2.error != ErrorMapping.NoError).toSeq
          failedTopicPartitions = failedPartitionsAndStatus.map(partitionStatus => partitionStatus._1)
          if(failedTopicPartitions.size > 0) {
            val errorString = failedPartitionsAndStatus
              .sortWith((p1, p2) => p1._1.topic.compareTo(p2._1.topic) < 0 ||
                                    (p1._1.topic.compareTo(p2._1.topic) == 0 && p1._1.partition < p2._1.partition))
              .map{
                case(topicAndPartition, status) =>
                  topicAndPartition.toString + ": " + ErrorMapping.exceptionFor(status.error).getClass.getName
              }.mkString(",")
            warn("Produce request with correlation id %d failed due to %s".format(currentCorrelationId, errorString))
          }
          failedTopicPartitions
        } else {
          Seq.empty[TopicAndPartition]
        }
      } catch {
        case t: Throwable =>
          warn("Failed to send producer request with correlation id %d to broker %d with data for partitions %s"
            .format(currentCorrelationId, brokerId, messagesPerTopic.map(_._1).mkString(",")), t)
          messagesPerTopic.keys.toSeq
      }
    } else {
      List.empty
    }
}

至此,数据的生产已经完成,但该过程并不涉及写磁盘操作,只是将批量的请求发送到socket(BlockingChannel)中,真正的持久化操作会在网络层接收到请求后再由具体的类来完成(详细过程会在讲KafkaApi的时候说到)。



Previous     Next
uohzoaix /
Published under (CC) BY-NC-SA in categories kafka  tagged with