调用KafkaServer的startup方法启动kafka
/** * Start up API for bringing up a single instance of the Kafka server. * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers */ def startup() { try { info("starting") brokerState.newState(Starting) isShuttingDown = new AtomicBoolean(false) shutdownLatch = new CountDownLatch(1) /* start scheduler */ kafkaScheduler.startup() /* setup zookeeper */ zkClient = initZk() /* start log manager */ logManager = createLogManager(zkClient, brokerState) logManager.startup() //socket server接收请求,和处理请求 socketServer = new SocketServer(config.brokerId, config.hostName, config.port, config.numNetworkThreads, config.queuedMaxRequests, config.socketSendBufferBytes, config.socketReceiveBufferBytes, config.socketRequestMaxBytes, config.maxConnectionsPerIp, config.connectionsMaxIdleMs, config.maxConnectionsPerIpOverrides) socketServer.startup() //replicat manager,负责写多个副本 replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown) /* start offset manager */ offsetManager = createOffsetManager() //kafka controller,相当于master,用于协调管理,kafka的master和broker合在一起,不用单独部署 kafkaController = new KafkaController(config, zkClient, brokerState) /* start processing requests */ apis = new KafkaApis(socketServer.requestChannel, replicaManager, offsetManager, zkClient, config.brokerId, config, kafkaController) //请求处理线程池 requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads) brokerState.newState(RunningAsBroker) Mx4jLoader.maybeLoad() replicaManager.startup() kafkaController.startup() //用于监控配置信息的变更 topicConfigManager = new TopicConfigManager(zkClient, logManager) topicConfigManager.startup() /* tell everyone we are alive */ kafkaHealthcheck = new KafkaHealthcheck(config.brokerId, config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs, zkClient) kafkaHealthcheck.startup() registerStats() startupComplete.set(true) info("started") } catch { case e: Throwable => fatal("Fatal error during KafkaServer startup. Prepare to shutdown", e) shutdown() throw e } }
相关推荐
KAFKA和OracleGoldenGate安装过程中出现的问题及其解决方法
kafka-summarykafka学习总结,源码剖析目录一、基础篇开篇说明概念说明配置说明znode分类kafka协议分类Kafka线程日志存储格式kakfa架构设计二、流程篇1、kafka启动过程2、日志初始化和清理过程3、选举controller过程...
已编译 Kafka-Manager-1.3.3.22 linux下直接解压解压kafka-manager-1.3.3.22.zip到/opt/module目录 ...备注:指定端口号看启动过程中 "-Dhttp.port=7456" 端口可以自己设置 http://hadoop102:7456
Kafka_课程介绍.avi 02_尚硅谷_Kafka_消息队列介绍.avi 03_尚硅谷_Kafka_概念.avi 04_尚硅谷_Kafka_架构.avi 05_尚硅谷_Kafka_集群搭建&启动.avi 06_尚硅谷_Kafka_命令行操作.avi 07_尚硅谷_Kafka_工作流程分析.avi ...
最近总结了一下kafka的介绍、启动流程、监控界面、角色、基本命令、为什么选择kafka、核心设计理念、特点、性能测试和参数调优、多租户等等
简单实现了kafka的搭建过程,以及kafka搭建时的启动及配置说明
2.3 Kafka一键启动/关闭脚本 第三章 基础操作 3.1 创建topic 3.2 生产消息到Kafka 3.3 从Kafka消费消息 3.4 使用Kafka Tools操作Kafka 第四章 Kafka基准测试 第五章 Java编程操作Kafka 5.1 同步生产消息到Kafka中 ...
在kafka集群中,每个代理节点(Broker)在启动都会实例化一个KafkaController类。该类会执行一系列业务逻辑,选举出主题分区的leader节点。 (1)第一个启动的代理节点,会在Zookeeper系统里面创建一个临时节点/...
启动过程时,必须将配置文件作为参数传递。 ## Kafka Exporter Properties # HTTP port used for the exporter exporter.port=12340 # Time in seconds that the metrics, once retrieved, will consider as valid...
它采用MVC(Model-View-Controller,模型-视图-控制器)的架构模式,将应用程序分为模型层、视图层和控制器层,提供了处理请求、渲染视图和管理流程的功能。 3. MyBatis框架:MyBatis是一个持久层框架,用于与数据库...
Flowbro 基于Kafka的分布式系统的实时流程图可视化。安装go get github.com/marianogappa/flowbro或获取适用于您的操作系统的最新二进制文件: : 入门启动flowbro: $ cd $GOPATH/src/github....
在项目启动之初来预测将来项目会碰到什么需求, 是极其困难的。 消息系统在处理过程中间插入了一个隐含的、 基于数据的接口层, 两边的处理过程都要实现这一接口。 这允许你独立的扩展或修改两边的处理过程, 只要...
##Mac安装kafka 并生产消息和消费消息 安装kafka $ brew install kafka 1、 安装过程将依赖安装 zookeeper 2、 软件位置 /usr/local/Cellar/zookeeper /usr/local/Cellar/kafka ...3、启动kafka服务 kafka-serve
在采用Kafka作为启动过类型的Fabric网络中,configtx.yaml 及 cryto-config.yaml配置文件依然有着重要的地位,但是其中的配置样本与先前的内容会有些不同。 本章将进行基于Kafka集群的部署,其中重要的概念是对前三...
奥乔尼特斯概述您需要设置并快速运行一堆代理... 请注意,我包含了一个相当简单的容器,它将使用本地/var/lib/kafka来存储其日志。 随意挂载一些东西,甚至动态分配一个卷。做吧!第 1 步:在运行门户的情况下在 AWS 上
使用Replicator的Kafka Streams迁移示例关键点如果Kafka Streams exactly-once ,则复制器必须配置为isolation.level=read_committed 迁移的应用程序需要使用auto.offset.reset=latest重新启动为了一个接一个地迁移...
debezium-kafka-postgres-nodejs-示例一个简单的示例,说明如何使用Debezium,Kafka,Postgres设置CDC流程,以及如何使用Node.JS使用者使用事件。设置启动一个postgres容器,它将成为我们的数据源。 docker run --rm...
当应用程序启动时,除了 Zookeeper 和 Kafka 之外,它由四个独立的单元组成。 每个单元都是一个单独的过程。 这些进程不必在同一台计算机上。 这些单位是: 原料药 处理器 订户一 订户二 主API监听主机操作系统的 ...
##下载源码git clonekafka-log-appender:将日志内容写到kafka程序log-kafka-storm:docker-compose脚本和storm程序##准备docker环境###启动dockerdocker-compose环境搭建过程请查看我的进入log-kafka-storm目录,...
第14章 KAFKA工作流程 53 14.1 PUB-SUB信息工作流 53 14.2 队列信息工作流/消费者组 53 14.3 Zookeeper在KAFKA中扮演的角色 54 第15章 KAFKA安装 55 第16章 KAFKA基本操作 56 16.1 启动zookeeper服务 56 16.2 单个单...