`

Kafka启动的流程

 
阅读更多

调用KafkaServer的startup方法启动kafka

 /**
   * Start up API for bringing up a single instance of the Kafka server.
   * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
   */
  def startup() {
    try {
      info("starting")
      brokerState.newState(Starting)
      isShuttingDown = new AtomicBoolean(false)
      shutdownLatch = new CountDownLatch(1)

      /* start scheduler */
      kafkaScheduler.startup()
    
      /* setup zookeeper */
      zkClient = initZk()

      /* start log manager */
      logManager = createLogManager(zkClient, brokerState)
      logManager.startup()
      //socket server接收请求,和处理请求
      socketServer = new SocketServer(config.brokerId,
                                      config.hostName,
                                      config.port,
                                      config.numNetworkThreads,
                                      config.queuedMaxRequests,
                                      config.socketSendBufferBytes,
                                      config.socketReceiveBufferBytes,
                                      config.socketRequestMaxBytes,
                                      config.maxConnectionsPerIp,
                                      config.connectionsMaxIdleMs,
                                      config.maxConnectionsPerIpOverrides)
      socketServer.startup()
      //replicat manager,负责写多个副本
      replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown)

      /* start offset manager */
      offsetManager = createOffsetManager()
      //kafka controller,相当于master,用于协调管理,kafka的master和broker合在一起,不用单独部署
      kafkaController = new KafkaController(config, zkClient, brokerState)
    
      /* start processing requests */
      apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, zkClient, config.brokerId, config, kafkaController)
     //请求处理线程池 
     requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
      brokerState.newState(RunningAsBroker)
   
      Mx4jLoader.maybeLoad()

      replicaManager.startup()

      kafkaController.startup()
      
      //用于监控配置信息的变更
      topicConfigManager = new TopicConfigManager(zkClient, logManager)
      topicConfigManager.startup()
    
      /* tell everyone we are alive */
      kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient)
      kafkaHealthcheck.startup()

    
      registerStats()
      startupComplete.set(true)
      info("started")
    }
    catch {
      case e: Throwable =>
        fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e)
        shutdown()
        throw e
    }
  }

 

分享到:
评论

相关推荐

    OGG_KAFKA问题及解决方法

    KAFKA和OracleGoldenGate安装过程中出现的问题及其解决方法

    kafka--summary:kafka学习总结,源码剖析

    kafka-summarykafka学习总结,源码剖析目录一、基础篇开篇说明概念说明配置说明znode分类kafka协议分类Kafka线程日志存储格式kakfa架构设计二、流程篇1、kafka启动过程2、日志初始化和清理过程3、选举controller过程...

    kafka-manager-1.3.3.22.zip

    已编译 Kafka-Manager-1.3.3.22 linux下直接解压解压kafka-manager-1.3.3.22.zip到/opt/module目录 ...备注:指定端口号看启动过程中 "-Dhttp.port=7456" 端口可以自己设置 http://hadoop102:7456

    尚硅谷大数据视频_Kafka视频教程

    Kafka_课程介绍.avi 02_尚硅谷_Kafka_消息队列介绍.avi 03_尚硅谷_Kafka_概念.avi 04_尚硅谷_Kafka_架构.avi 05_尚硅谷_Kafka_集群搭建&启动.avi 06_尚硅谷_Kafka_命令行操作.avi 07_尚硅谷_Kafka_工作流程分析.avi ...

    kafka思维导图.png

    最近总结了一下kafka的介绍、启动流程、监控界面、角色、基本命令、为什么选择kafka、核心设计理念、特点、性能测试和参数调优、多租户等等

    kafka集群搭建

    简单实现了kafka的搭建过程,以及kafka搭建时的启动及配置说明

    企业级消息队列Kafka视频教程

    2.3 Kafka一键启动/关闭脚本 第三章 基础操作 3.1 创建topic 3.2 生产消息到Kafka 3.3 从Kafka消费消息 3.4 使用Kafka Tools操作Kafka 第四章 Kafka基准测试 第五章 Java编程操作Kafka 5.1 同步生产消息到Kafka中 ...

    kafka-lead 的选举过程

    在kafka集群中,每个代理节点(Broker)在启动都会实例化一个KafkaController类。该类会执行一系列业务逻辑,选举出主题分区的leader节点。 (1)第一个启动的代理节点,会在Zookeeper系统里面创建一个临时节点/...

    kafka-topic-exporter:消费Kafka主题并导出到Prometheus

    启动过程时,必须将配置文件作为参数传递。 ## Kafka Exporter Properties # HTTP port used for the exporter exporter.port=12340 # Time in seconds that the metrics, once retrieved, will consider as valid...

    带弹簧启动的Kafka、Kafka和Kafka工具图形用户界面设置(高分毕设).zip

    它采用MVC(Model-View-Controller,模型-视图-控制器)的架构模式,将应用程序分为模型层、视图层和控制器层,提供了处理请求、渲染视图和管理流程的功能。 3. MyBatis框架:MyBatis是一个持久层框架,用于与数据库...

    flowbro:基于Kafka的分布式系统的实时流程图可视化

    Flowbro 基于Kafka的分布式系统的实时流程图可视化。安装go get github.com/marianogappa/flowbro或获取适用于您的操作系统的最新二进制文件: : 入门启动flowbro: $ cd $GOPATH/src/github....

    kafka.pdf 介绍 为何使用消息系统

    在项目启动之初来预测将来项目会碰到什么需求, 是极其困难的。 消息系统在处理过程中间插入了一个隐含的、 基于数据的接口层, 两边的处理过程都要实现这一接口。 这允许你独立的扩展或修改两边的处理过程, 只要...

    kafka:kafka生产者消费者

    ##Mac安装kafka 并生产消息和消费消息 安装kafka $ brew install kafka 1、 安装过程将依赖安装 zookeeper 2、 软件位置 /usr/local/Cellar/zookeeper /usr/local/Cellar/kafka ...3、启动kafka服务 kafka-serve

    HyperLedger Fabric开发实战 -Kafka集群部署

    在采用Kafka作为启动过类型的Fabric网络中,configtx.yaml 及 cryto-config.yaml配置文件依然有着重要的地位,但是其中的配置样本与先前的内容会有些不同。 本章将进行基于Kafka集群的部署,其中重要的概念是对前三...

    ochonetes-kafka:通过 K8SOchopod 即时部署 Kafka!

    奥乔尼特斯概述您需要设置并快速运行一堆代理... 请注意,我包含了一个相当简单的容器,它将使用本地/var/lib/kafka来存储其日志。 随意挂载一些东西,甚至动态分配一个卷。做吧!第 1 步:在运行门户的情况下在 AWS 上

    kafka-streams-migration

    使用Replicator的Kafka Streams迁移示例关键点如果Kafka Streams exactly-once ,则复制器必须配置为isolation.level=read_committed 迁移的应用程序需要使用auto.offset.reset=latest重新启动为了一个接一个地迁移...

    debezium-kafka-postgres-nodejs-example

    debezium-kafka-postgres-nodejs-示例一个简单的示例,说明如何使用Debezium,Kafka,Postgres设置CDC流程,以及如何使用Node.JS使用者使用事件。设置启动一个postgres容器,它将成为我们的数据源。 docker run --rm...

    kafka-ruby-docker:适用于Kafka和Ruby的演示应用程序(在Docker中)

    当应用程序启动时,除了 Zookeeper 和 Kafka 之外,它由四个独立的单元组成。 每个单元都是一个单独的过程。 这些进程不必在同一台计算机上。 这些单位是: 原料药 处理器 订户一 订户二 主API监听主机操作系统的 ...

    log-analysis-kafka-storm-docker:在Docker中运行的Kafka,Storm,Zookeeper和Openfire

    ##下载源码git clonekafka-log-appender:将日志内容写到kafka程序log-kafka-storm:docker-compose脚本和storm程序##准备docker环境###启动dockerdocker-compose环境搭建过程请查看我的进入log-kafka-storm目录,...

    大数据学习笔记

    第14章 KAFKA工作流程 53 14.1 PUB-SUB信息工作流 53 14.2 队列信息工作流/消费者组 53 14.3 Zookeeper在KAFKA中扮演的角色 54 第15章 KAFKA安装 55 第16章 KAFKA基本操作 56 16.1 启动zookeeper服务 56 16.2 单个单...

Global site tag (gtag.js) - Google Analytics