`

Kafka的consumer

阅读更多

Consumer的使用示例代码

//创建soncumer connector
    ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
            createConsumerConfig());
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
	//设置topic和监控份数的映射
    topicCountMap.put(topic, new Integer(1));
	//创建kafkaStream,一个topic可以对应多个kafkaStream。kafkaStream的份数和上面配置的监控份数相同
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    KafkaStream<byte[], byte[]> stream =  consumerMap.get(topic).get(0);
    ConsumerIterator<byte[], byte[]> it = stream.iterator();
	//循环遍历消息
    while(it.hasNext())
	  //处理消息
      System.out.println(new String(it.next().message()));

   创建ConsumerConnector

 /**
   *  Create a ConsumerConnector
   *
   *  @param config  at the minimum, need to specify the groupid of the consumer and the zookeeper
   *                 connection string zookeeper.connect.
   */
  def createJavaConsumerConnector(config: ConsumerConfig): kafka.javaapi.consumer.ConsumerConnector = {
    val consumerConnect = new kafka.javaapi.consumer.ZookeeperConsumerConnector(config)
    consumerConnect
  }

   ZookeeperConsumerConnector调用consume方法

  def consume[K, V](topicCountMap: scala.collection.Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V])
      : Map[String,List[KafkaStream[K,V]]] = {
    debug("entering consume ")
    if (topicCountMap == null)
      throw new RuntimeException("topicCountMap is null")
    //获取topic consumer线程
    val topicCount = TopicCount.constructTopicCount(consumerIdString, topicCountMap)
    val topicThreadIds = topicCount.getConsumerThreadIdsPerTopic

    // make a list of (queue,stream) pairs, one pair for each threadId
    val queuesAndStreams = topicThreadIds.values.map(threadIdSet =>
      threadIdSet.map(_ => {
        val queue =  new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages)
        val stream = new KafkaStream[K,V](
          queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId)
        (queue, stream)
      })
    ).flatten.toList

    val dirs = new ZKGroupDirs(config.groupId)
    //注册consumer信息到zk
    registerConsumerInZK(dirs, consumerIdString, topicCount)
    //初始化consumer
    reinitializeConsumer(topicCount, queuesAndStreams)

    loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[K,V]]]]
  }

 

 private def reinitializeConsumer[K,V](
      topicCount: TopicCount,
      queuesAndStreams: List[(LinkedBlockingQueue[FetchedDataChunk],KafkaStream[K,V])]) {

    val dirs = new ZKGroupDirs(config.groupId)

    // listener to consumer and partition changes
    if (loadBalancerListener == null) {
      val topicStreamsMap = new mutable.HashMap[String,List[KafkaStream[K,V]]]
      loadBalancerListener = new ZKRebalancerListener(
        config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_,_]]]])
    }

    // create listener for session expired event if not exist yet
    if (sessionExpirationListener == null)
      sessionExpirationListener = new ZKSessionExpireListener(
        dirs, consumerIdString, topicCount, loadBalancerListener)

    // create listener for topic partition change event if not exist yet
    if (topicPartitionChangeListener == null)
      topicPartitionChangeListener = new ZKTopicPartitionChangeListener(loadBalancerListener)

    val topicStreamsMap = loadBalancerListener.kafkaMessageAndMetadataStreams

    // map of {topic -> Set(thread-1, thread-2, ...)}
    val consumerThreadIdsPerTopic: Map[String, Set[ConsumerThreadId]] =
      topicCount.getConsumerThreadIdsPerTopic

    val allQueuesAndStreams = topicCount match {
      case wildTopicCount: WildcardTopicCount =>
        /*
         * Wild-card consumption streams share the same queues, so we need to
         * duplicate the list for the subsequent zip operation.
         */
        (1 to consumerThreadIdsPerTopic.keySet.size).flatMap(_ => queuesAndStreams).toList
      case statTopicCount: StaticTopicCount =>
        queuesAndStreams
    }

    val topicThreadIds = consumerThreadIdsPerTopic.map {
      case(topic, threadIds) =>
        threadIds.map((topic, _))
    }.flatten
    //判断thread ids和queue stream的大小是否一样
    require(topicThreadIds.size == allQueuesAndStreams.size,
      "Mismatch between thread ID count (%d) and queue count (%d)"
      .format(topicThreadIds.size, allQueuesAndStreams.size))
    val threadQueueStreamPairs = topicThreadIds.zip(allQueuesAndStreams)

    threadQueueStreamPairs.foreach(e => {
      val topicThreadId = e._1
      val q = e._2._1
      topicThreadIdAndQueues.put(topicThreadId, q)
      debug("Adding topicThreadId %s and queue %s to topicThreadIdAndQueues data structure".format(topicThreadId, q.toString))
      newGauge(
        "FetchQueueSize",
        new Gauge[Int] {
          def value = q.size
        },
        Map("clientId" -> config.clientId,
          "topic" -> topicThreadId._1,
          "threadId" -> topicThreadId._2.threadId.toString)
      )
    })

    val groupedByTopic = threadQueueStreamPairs.groupBy(_._1._1)
    groupedByTopic.foreach(e => {
      val topic = e._1
      val streams = e._2.map(_._2._2).toList
      topicStreamsMap += (topic -> streams)
      debug("adding topic %s and %d streams to map.".format(topic, streams.size))
    })

    // listener to consumer and partition changes
    zkClient.subscribeStateChanges(sessionExpirationListener)

    zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener)

    topicStreamsMap.foreach { topicAndStreams =>
      // register on broker partition path changes
      val topicPath = BrokerTopicsPath + "/" + topicAndStreams._1
      zkClient.subscribeDataChanges(topicPath, topicPartitionChangeListener)
    }

    // explicitly trigger load balancing for this consumer
    loadBalancerListener.syncedRebalance()
  }

 

   

  def syncedRebalance() {
      rebalanceLock synchronized {
        rebalanceTimer.time {
          if(isShuttingDown.get())  {
            return
          } else {
            for (i <- 0 until config.rebalanceMaxRetries) {
              info("begin rebalancing consumer " + consumerIdString + " try #" + i)
              var done = false
              var cluster: Cluster = null
              try {
                cluster = getCluster(zkClient)
                done = rebalance(cluster)
              } catch {
                case e: Throwable =>
                  /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating.
                    * For example, a ZK node can disappear between the time we get all children and the time we try to get
                    * the value of a child. Just let this go since another rebalance will be triggered.
                    **/
                  info("exception during rebalance ", e)
              }
              info("end rebalancing consumer " + consumerIdString + " try #" + i)
              if (done) {
                return
              } else {
                /* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should
                 * clear the cache */
                info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered")
              }
              // stop all fetchers and clear all the queues to avoid data duplication
              closeFetchersForQueues(cluster, kafkaMessageAndMetadataStreams, topicThreadIdAndQueues.map(q => q._2))
              Thread.sleep(config.rebalanceBackoffMs)
            }
          }
        }
      }

      throw new ConsumerRebalanceFailedException(consumerIdString + " can't rebalance after " + config.rebalanceMaxRetries +" retries")
    }

 

   private def rebalance(cluster: Cluster): Boolean = {
      val myTopicThreadIdsMap = TopicCount.constructTopicCount(
        group, consumerIdString, zkClient, config.excludeInternalTopics).getConsumerThreadIdsPerTopic
      val brokers = getAllBrokersInCluster(zkClient)
      if (brokers.size == 0) {
        // This can happen in a rare case when there are no brokers available in the cluster when the consumer is started.
        // We log an warning and register for child changes on brokers/id so that rebalance can be triggered when the brokers
        // are up.
        warn("no brokers found when trying to rebalance.")
        zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, loadBalancerListener)
        true
      }
      else {
        /**
         * fetchers must be stopped to avoid data duplication, since if the current
         * rebalancing attempt fails, the partitions that are released could be owned by another consumer.
         * But if we don't stop the fetchers first, this consumer would continue returning data for released
         * partitions in parallel. So, not stopping the fetchers leads to duplicate data.
         */
        closeFetchers(cluster, kafkaMessageAndMetadataStreams, myTopicThreadIdsMap)

        releasePartitionOwnership(topicRegistry)

        val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkClient)
        //分配partition到工作线程
        val partitionOwnershipDecision = partitionAssignor.assign(assignmentContext)
        val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]](
          valueFactory = Some((topic: String) => new Pool[Int, PartitionTopicInfo]))

        // fetch current offsets for all topic-partitions
        val topicPartitions = partitionOwnershipDecision.keySet.toSeq

        val offsetFetchResponseOpt = fetchOffsets(topicPartitions)

        if (isShuttingDown.get || !offsetFetchResponseOpt.isDefined)
          false
        else {
          val offsetFetchResponse = offsetFetchResponseOpt.get
          topicPartitions.foreach(topicAndPartition => {
            val (topic, partition) = topicAndPartition.asTuple
            val offset = offsetFetchResponse.requestInfo(topicAndPartition).offset
            val threadId = partitionOwnershipDecision(topicAndPartition)
            addPartitionTopicInfo(currentTopicRegistry, partition, topic, offset, threadId)
          })

          /**
           * move the partition ownership here, since that can be used to indicate a truly successful rebalancing attempt
           * A rebalancing attempt is completed successfully only after the fetchers have been started correctly
           */
          if(reflectPartitionOwnershipDecision(partitionOwnershipDecision)) {
            allTopicsOwnedPartitionsCount = partitionOwnershipDecision.size

            partitionOwnershipDecision.view.groupBy { case(topicPartition, consumerThreadId) => topicPartition.topic }
                                      .foreach { case (topic, partitionThreadPairs) =>
              newGauge("OwnedPartitionsCount",
                new Gauge[Int] {
                  def value() = partitionThreadPairs.size
                },
                ownedPartitionsCountMetricTags(topic))
            }

            topicRegistry = currentTopicRegistry
            updateFetcher(cluster)
            true
          } else {
            false
          }
        }
      }
    }

    RangeAssignor分配partition到工作线程

def assign(ctx: AssignmentContext) = {
    val partitionOwnershipDecision = collection.mutable.Map[TopicAndPartition, ConsumerThreadId]()

    for ((topic, consumerThreadIdSet) <- ctx.myTopicThreadIds) {
      val curConsumers = ctx.consumersForTopic(topic)
      val curPartitions: Seq[Int] = ctx.partitionsForTopic(topic)

      val nPartsPerConsumer = curPartitions.size / curConsumers.size
      val nConsumersWithExtraPart = curPartitions.size % curConsumers.size

      info("Consumer " + ctx.consumerId + " rebalancing the following partitions: " + curPartitions +
        " for topic " + topic + " with consumers: " + curConsumers)

      for (consumerThreadId <- consumerThreadIdSet) {
        val myConsumerPosition = curConsumers.indexOf(consumerThreadId)
        assert(myConsumerPosition >= 0)
        val startPart = nPartsPerConsumer * myConsumerPosition + myConsumerPosition.min(nConsumersWithExtraPart)
        val nParts = nPartsPerConsumer + (if (myConsumerPosition + 1 > nConsumersWithExtraPart) 0 else 1)

        /**
         *   Range-partition the sorted partitions to consumers for better locality.
         *  The first few consumers pick up an extra partition, if any.
         */
        if (nParts <= 0)
          warn("No broker partitions consumed by consumer thread " + consumerThreadId + " for topic " + topic)
        else {
          for (i <- startPart until startPart + nParts) {
            val partition = curPartitions(i)
            info(consumerThreadId + " attempting to claim partition " + partition)
            // record the partition ownership decision
            partitionOwnershipDecision += (TopicAndPartition(topic, partition) -> consumerThreadId)
          }
        }
      }
    }

    当计算好当前consumer应该需要处理的partition之后,调用updateFetcher函数更新fetcher线程

   private def updateFetcher(cluster: Cluster) {
      // update partitions for fetcher
      var allPartitionInfos : List[PartitionTopicInfo] = Nil
      for (partitionInfos <- topicRegistry.values)
        for (partition <- partitionInfos.values)
          allPartitionInfos ::= partition
      info("Consumer " + consumerIdString + " selected partitions : " +
        allPartitionInfos.sortWith((s,t) => s.partitionId < t.partitionId).map(_.toString).mkString(","))

      fetcher match {
        case Some(f) =>
          f.startConnections(allPartitionInfos, cluster)
        case None =>
      }
    }

   再startConnections方法中会调用addFetcherForPartitions方法,用于启动fetcher线程

 

 def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition, BrokerAndInitialOffset]) {
    mapLock synchronized {
      val partitionsPerFetcher = partitionAndOffsets.groupBy{ case(topicAndPartition, brokerAndInitialOffset) =>
        BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicAndPartition.topic, topicAndPartition.partition))}
      for ((brokerAndFetcherId, partitionAndOffsets) <- partitionsPerFetcher) {
        var fetcherThread: AbstractFetcherThread = null
        fetcherThreadMap.get(brokerAndFetcherId) match {
          case Some(f) => fetcherThread = f
          case None =>
            fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)
            fetcherThreadMap.put(brokerAndFetcherId, fetcherThread)
            fetcherThread.start
        }

        fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map { case (topicAndPartition, brokerAndInitOffset) =>
          topicAndPartition -> brokerAndInitOffset.initOffset
        })
      }
    }

    info("Added fetcher for partitions %s".format(partitionAndOffsets.map{ case (topicAndPartition, brokerAndInitialOffset) =>
      "[" + topicAndPartition + ", initOffset " + brokerAndInitialOffset.initOffset + " to broker " + brokerAndInitialOffset.broker + "] "}))
  }

 然后我们看ConsumerFetcherThread中的processPartitionData方法,这个方法中将获得的数据插入到queue里面。后面stream就可以处理相应的数据了

  // process fetched data
  def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) {
    val pti = partitionMap(topicAndPartition)
    if (pti.getFetchOffset != fetchOffset)
      throw new RuntimeException("Offset doesn't match for partition [%s,%d] pti offset: %d fetch offset: %d"
                                .format(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset, fetchOffset))
    pti.enqueue(partitionData.messages.asInstanceOf[ByteBufferMessageSet])
  }

 

分享到:
评论

相关推荐

    kafkaConsumerDemo.zip

    这是使用java操作kafka consumer api的一个demo,欢迎下载交流,博客地址:https://blog.csdn.net/qq_26803795

    Go-Go-consumergroup采用golang编写的kafkaconsumer库

    Go-consumergroup:采用golang编写的kafka consumer库

    RdKafka::KafkaConsumer使用实例

    研究了一段时间后,根据网上的例子,做大量的削减及根据需要做出的最简化使用实例,并且加入了获取kafka的server端的状态信息,根据状态信息配置启动时读写位置

    KafkaConsumerDemo.java

    KafkaConsumerDemo.java

    Kafka设计解析(五)-KafkaConsumer设计解析

    很多时候,客户程序只是希望从Kafka读取数据,不太关心消息offset的处理。同时也希望提供一些语义,例如同一条消息只被某一个Consumer消费(单播)或被所有Consumer消费(广播)。因此,KafkaHightL

    phpkafkaconsumer是一个kafkaconsumer库支持group和rebalance

    php-kafka-consumer 主要是对 php_rdkafka 的 consumer 的 API 进行一层封装,增加了原程序中所没有的与 zookeeper 交互的功能。

    kafka_hdfs_consumer

    kafka_hdfs_consumer实现

    kettle整合kafka生产者消费者插件

    kettle7.1版本整合kafka,kafka插件包含生产者、消费者。直接在kettle安装目录plugins下创建steps目录,并解压下载文件到kettle/plugins/steps目录。具体可查看我博文。

    kafka demo ,两种线程消费方式

    kafka学习过程,maven工程,包含基础过程、提升过程。可供大家学习一下,里面有详细注释,一个groupid多个Consumer来消费消息和一个Consumer且有多个线程消费

    Kafka-MySQL-Avro:Kafka Consumer将avro记录插入mysql

    Kafka Consumer将avro记录插入mysql git clone https://github.com/cahuja1992/Kafka-MySQL-Avro.git python kafka-mysql-avro/setup.py install #!/usr/bin/env python from divoltemysql.kafkamysql import ...

    kafkaconsumer

    卡夫卡消费者Scala演示的消费者

    Java中与Kafka进行交互

    代码包括两个主要部分:生产者和消费者。...创建一个KafkaConsumer对象,并使用上一步中创建的Properties对象初始化它。 使用KafkaConsumer对象订阅主题并接收消息。 打印接收到的每条消息的键和值。

    配置Kafka生产者与消费者所需jar包

    Kafka是一种高吞吐量的分布式发布订阅消息系统,通过生产者与消费者进行消息传递。包里包括:kafka_2.11-0.8.2.2.jar、kafka-clients-0.8.2.2.jar、scala-library-2.11.1.jar(最后一个是关联jar)

    kafka-consumer-monitor.zip

    监控并展示topic、消费者组 Consumer GroupId、Total Lag、Kafka数据生产...主要使用kafka.admin.AdminClient和org.apache.kafka.clients.consumer.KafkaConsumer实现,并且对接了华为FusionInsight平台的身份认证。

    kafka-consumer:卡夫卡消费者

    kafka-consumer kafka-consumer是基于Kafka-0.8.20封装的consumer。kafka-consumer的目的在于让业务开发人员不必了解kafka就能开发。并且提供消息过滤功能。 Example 关于kafka-consumer的使用,可以参考如下代码。 ...

    python kafka 多线程消费者&手动提交实例

    from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata from consumers.db_util import * from consumers.json_dispose import * from collections import OrderedDict threads = []

    KAFKA分布式消息系统

    KAFKA分布式消息系统 KAFKA分布式消息系统

    disruptor-kafka-consumer:基于React流的卡夫卡消费者

    卡夫卡消费者的破坏者演示如何在Kafka 0.9 Consumer上使用LMAX Disruptor 好处-&gt;一旦先前的使用者完全处理完消息,便可以使用序列屏障来提交消息。想象力是极限。如果环形缓冲区可以容纳在L3缓存中,则处理速度会更...

    pentaho-kafka-consumer.zip

    kettle kafka 消费者插件,在plugins 下新建steps文件夹,把zip文件解压放到里面。

Global site tag (gtag.js) - Google Analytics