该文件中还定义了五种leader选举器,也会执行

作者: 编程  发布:2019-10-03

1、借使央求中controllerEpoch小于当前最新的controllerEpoch,则一贯回到ErrorMapping.StaleControllerEpochCode。

获得controller中保存的加以别本的景观,如若无缓存记录,开首化状态为NonExistentReplica,然后拿走controller中缓存的该分区的别本分配记录

在初叶介绍类方法在此以前大家先看一下在这几个类内部定义的嵌套类: DeleteTopicsThread——看名就能猜到其意义,正是背负删除topic的专有线程。该类承接了ShutdownableThread,首要的逻辑如下:

9. kafka 的费用格局

RequestKeys.LeaderAndIsr详细解析在地点的代码中我们见到ReequestKeys.LeaderAndlst对应的方法其实是KeyhandleLeaderAndIsrRequest。

ReplicaDeletionIneligible —— 假若别本删除败北,将被置于ReplicaDeletionIneligible状态,可由ReplicaDeletionStarted状态转变而来

四、TopicDeletionManager.scala

2. kafka 在 zookeeper 上创造的目录结构

9159.com 1

kafka 在 zookeeper 上的积存结构

详尽内容仿照效法链接:
kafka笔记-卡夫卡在zookeeper中的存储结构【转】

登记的节点如下:

==consumers、admin、config、controller、brokers、controller_epoch==

  1. topic 注册音讯

    /brokers/topics/[topic]:存款和储蓄有个别 topic 的 partitions 全部分配新闻

  2. partition状态音讯

    /brokers/topics/[topic]/partitions/[0...N] 其中[0..N]表示partition索引号

    /brokers/topics/[topic]/partitions/[partitionId]/state

  3. Broker注册新闻

    /brokers/ids/[0...N]

    各种broker的安排文件中都亟需钦命叁个数字类型的id(全局不可重复),此节点为一时znode(EPHEMERAL)

  4. Controller epoch

    /controller_epoch -> int (epoch)

    此值为一个数字,kafka集群中首先个broker第一遍运维时为1,以往假使集群中center controller中央调整器所在broker更改或挂掉,就能够另行公投新的center controller,每回center controller改动controller_epoch值就会 + 1

  5. Controller注册音信

    /controller -> int (broker id of the controller) 存款和储蓄center controller中央调节器所在kafka broker的新闻。那个值暗许是 1,当 controller 节点挂掉后再度大选 controller 后,值会 +1

  6. Consumer注册音信

    各种consumer都有二个独一的ID(consumerId能够经过布署文件钦命,也得以由系统生成),此id用来标志成本者新闻

    /consumers/[groupId]/ids/[consumerIdString]

  7. Consumer owner

    /consumers/[groupId]/owners/[topic]/[partitionId] -> consumerIdString + threadId索引编号

  8. kafka 中的调整器 controller 的职能


卡夫卡集群中的在那之中一个 Broker 会被大选为Controller,首要担当 Partition 管理和别本状态管理(当 partition leader 挂掉时,会从 follower 中选出一个leader),也会实施类似于重分配 Partition 之类的管制职分。要是当前的 Controller 失利,会从其余日常的 Broker 中另行公投 Controller。

  1. 第一将率先个可用的别本broker作为leader broker并把富有可用的别本对象都装入IS奥迪Q5,然后写leader和IS奥德赛消息到zookeeper中保留
  2. 对此这么些分区来说,发送LeaderAndIsr央浼到种种可用的别本broker,以及UpdateMetadata央浼到每一个可用的broker上

1. 为该分区选拔新的leader和IS安德拉以及接收LeaderAndIsr乞请的一组别本,然后写入leader和IS凯雷德音信到zookeeper中保留。

包裹了二个LeaderAndIsr对象和二个int值表示controllere的epoch值,正是保存leader、IS哈弗以及controller_epoch音讯的三个数据结构

Spark shuffle 进度源码

进去卡夫卡Controller.scala文件看见如下代码:

  1. controllerContext —— 便是卡夫卡Controller类的三个实例,封装了数不尽controller的措施

  2. controllerId —— controller ID,由布署文件中的broker.id属性内定

  3. zkClient —— 二个ZooKeeper的客户端,用于与zookeeper服务器交互

  4. partitionState —— 保存的分区的状态音信

  5. brokerRequestBatch —— 首要用来批量发送央浼给Broker

  6. hasStarted —— 首要用来标记该状态机是还是不是张开

  7. noOpPartitionLeaderSelector —— 三个私下认可的leader选举类,首要被看作暗许值使用,本质上实际什么都不做

  8. topicChangeListener/deleteTopicListener/addPartitionListener —— 五个监听类实例,用于监听状态更改、删除topic和充实新分区事变

  9. stateChangeLogger —— 二个日志类

1. handleDataChange:命令行命令发起的分区重分配时会调用这么些方法,具体逻辑如下:

斯Parker 铺排的三种形式介绍

standalone模式,即单独格局,自带完整的劳务,可独立布署到贰个集群中,不要求依赖任何其余财富管理连串。近来斯Parker 在 standalone 格局下是从未有过其他单点故障难点的,那是借助zookeeper完成的,思想周边于Hbase master单点故障技术方案。

Spark On Mesos模式。那是看不尽供销合作社接纳的方式,官方推荐这种格局(当然,原因之一是血缘关系)。就是由于斯Parker开辟之初就驰念到支撑Mesos,由此,如今来说,斯Parker运转在Mesos上会比运维在YAENVISIONN上更为灵活,尤其自然。如今在SparkOn Mesos景况中,顾客可挑选两种调整方式之一运维本身的应用程序(可参谋AndrewXia的“Mesos Scheduling Mode on Spark”):

  1. 粗粒度模式(Coarse-grained Mode):种种应用程序的运转境况由三个Dirver和多少个Executor组成,在那之中,每一个Executor占用若干资源,内部可运转八个Task(对应多少个“slot”)。应用程序的次第职务正式运行从前,必要将运营碰到中的能源总体申请好,且运行进程中要直接据有那个能源,即使并非,最后程序运维停止后,回收这个财富。

  2. 细粒度情势(Fine-grained Mode):鉴于粗粒度格局会导致大量能源浪费,斯Parker On Mesos还提供了别的一种调整情势:细粒度情势,这种形式类似于明天的云总计,观念是按需分配。与粗粒度方式同样,应用程序运行时,先会运营executor,但每一种executor占用能源仅仅是友善运转所需的资源,无需思考现在要运营的职责,之后,mesos会为各种executor动态分配财富,每分配一些,便得以运作叁个新职务,单个Task运营完事后能够即时释放对应的能源。种种Task会叙述情状给Mesos slave和Mesos Master,便于更细粒度管理和容错,这种调治格局类似于MapReduce调解形式,各类Task完全部独用立,优点是方便资源支配和隔开分离,但短处也很醒目,短作业运转延迟大。

Spark On YARN模式。那是一种最有前景的安顿形式。但限于YA中华VN自己的发展,前段时间仅支持粗粒度方式(Coarse-grained Mode)。那是出于YA奥德赛N上的Container能源是不得以动态伸缩的,一旦Container运行之后,可应用的财富不能够再爆发变化,但是那一个早就在YA奇骏N安插(具体参照他事他说加以考察连接)中了。

spark on yarn 的支撑二种格局:

1. yarn-cluster:适用于生产蒙受;

2. yarn-client:适用于交互、调节和测量试验,希望即刻见到app的出口

9159.com ,yarn-cluster 和 yarn-client 的分别在于 yarn appMaster,每一种 yarn app 实例有三个 appMaster 进程,是为 app 运维的率先个 container;担负从 ResourceManager 诉求能源,获取到财富后,告诉 NodeManager 为其运行container。yarn-cluster 和yarn-client 形式里面贯彻照旧有十分的大的界别。

将request.requestObj转变成LeaderAndIstRequest对象类型。

以此监听器担当监听topic的删除,首要回顾1. 将要删除的topic插手到待删除topic缓存中——前提是那一个topic必需存在;2. 只要存在要刨除的topic,那么通告删除topic线程。它也实现了handleChildChange方法,那么些方法会在topic被剔除的时候被调用。具体逻辑如下:

ReassignedPartitionsIsrChangeListener类

8. 卡夫卡 Producer是哪些动态感知Topic分区数变化

主题素材是,即使在 卡夫卡 Producer 往 卡夫卡 的 Broker 发送消息的时候客商通过命令修改了改宗旨的分区数,卡夫卡 Producer 能动态感知吗?答案是可以的。这是那时就感知吗?不是,是过一定的岁月(topic.metadata.refresh.interval.ms参数决定)才精通分区数退换的。

原稿链接

2.1、借使当前brokerid(可能说replica id)在partitionStateInfo中,则将该partition及partitionStateInfo存入叁个名称为partitionState的HashMap中。不然表达该Broker不在该Partition分配的Replica list中,将该消息记录于log中

ReassignedPartitionLeaderSelector类 —— 从可用的IS雷克萨斯LC中甄选第多少个作为leader,把当前的IS宝马X3作为新的IS汉兰达,将重分配的别本集协作为接收LeaderAndIsr央求的副本集结。

第一的类成员字段如下:

cache 和 pesist 的区别

在 spark 中,cache 和 persist 都以用来将叁个 RDD 举行缓存的,这样在其后的选用进度中就无需再度开展总结了,能够大大节省程序运转的时光。

双方的区分在于: cache 其实是调用了 persist 方法,缓存战术为 MEMO智跑Y_ONLY。而 persist 能够透过安装参数有多种缓存战术。
五头都能经过 unpersisit 来开展放飞。

OfflinePartition -> NonExistentPartition

  • NonExistentReplica -> NewReplica:将这段日子leader和ISCRUISER封装到三个LeaderAndIsr必要中发送给新的replica broker,并发送UpdateMetadata乞请给各个当前可用的broker
  • NewReplica -> OnlineReplica:将新添别本参与到A迈凯伦540C中
  • OnlineReplica, OfflineReplica -> OnlineReplica:将近来leader和IS途胜封装到二个LeaderAndIsr乞请中发送给新的replica broker,并发送UpdateMetadata诉求给各样当前可用的broker
  • NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible -> OfflineReplica:首头阵送StopReplicaRequest央浼给别本(但不删除别本),然后将该副本从IS福睿斯中移除,之后发送LeaderAndIsr诉求给leader别本更新IS翼虎并发送UpdateMetadata央浼给持有可用的broker
  • OfflineReplica -> ReplicaDeletionStarted: 发送StopReplicaRequest央求给别本(相同的时间删除该别本)
  • ReplicaDeletionStarted -> ReplicaDeletionSuccessful:在气象机中标志别本状态
  • ReplicaDeletionStarted -> ReplicaDeletionIneligble:在情景机中标志副本状态
  • ReplicaDeletionSuccessful -> NonExistentReplica:从controller的分区副本分配缓存中移除有个别别本

2. handleDataDeleted:假设zookeeper中的leader消息被删去时会调用该方法,但是貌似那个措施未有现实的兑当代码

4. kafka consumer 均衡算法

当三个group中,有consumer参预恐怕离开时,会触发partitions均衡。均衡的最终目标,是提高topic的出现花费才干。

  1. 设若 topic1 具备如下 partitions: P0,P1,P2,P3
  2. 假使 group 中有如下 consumer: C0,C1
  3. 第一遵照 partition 索引号对 partitions 排序: P0,P1,P2,P3
  4. 根据(consumer.id + '-'+ thread序号)排序: C0,C1
  5. 测算倍数: M = [P0,P1,P2,P3].size / [C0,C1].size,本例值 M = 2 (向上取整)
  6. 然后所有人家分配 partitions: C0 = [P0,P1],C1=[P2,P3],即 Ci = [P(i * M),P((i + 1) * M -1)]

  7. kafka 数据高可用的原理是如何


一致性定义:若某条消息对Consumer可知,那么就是Leader宕机了,在新Leader上多少还可以被读到

  1. HighWater马克简称HW: Partition的高水位,取八个partition对应的ISEvoque中幽微的LEO作为HW,开支者最三只好源消费用到HW所在的岗位,别的各种replica皆有high沃特ermark,leader和follower各自担负更新本人的highWatermark状态,high沃特ermark <= leader. LogEndOffset

  2. 对于Leader新写入的msg,Consumer无法即时费用,Leader会等待该音讯被抱有ISENVISION中的replica同步后,更新HW,此时该消息技能被Consumer开支,即Consumer最多只好源消开支到HW地点

诸有此类就保险了一旦Leader Broker失效,该音讯还能够从新公投的Leader中获得。对于来自内部Broker的读取必要,未有HW的限定。同有的时候间,Follower也会爱惜一份协和的HW,Folloer.HW = min(Leader.HW, Follower.offset)

咱俩在分解那多个选择器此前,先明白一下在卡夫卡中Partition的多样情景:

现实逻辑如下:

RAXC60 —— 重分配过的别本列表

Spark 集群运营力源码

Master 启动

源码没有供给特意详细,只供给把大约流程说领悟就能够。

NonExistentPartition -> NewPartition

  • 收获zookeeper中新添分区topic对应的分区副本分配记录,与controller中保存的分区别本记录相比较,寻找新添的分区记录
  • 万一那一个新扩张分区所属的topic当前正在施行删除操作,那么直接记录二个日志错误再次回到,即跳过扩张分区的操作,不然调用controller的onNewPartitionCreation方法来创设这几个分区
  • 将给定分区设置为NewPartition状态
  • 将分区下有所别本都设定为NewReplica状态
  • 再度设定全数给定分区状态为OnlinePartition
  • 重复设定给定分区下所有别本状态设定为OnlineReplica

Spark 常用的 陆风X8DD 算子,哪些会时有产生 shuffle

1. 算子分类

大方平昔说,斯Parker 算子大致能够分成以下两类:Transformation 算子和 Action 算子。

从小方一直说,斯Parker 算子大致能够分为以下三类:(1) Value 数据类型的 Transformation 算子、(2) Key-Value 数据类型的 Transfromation 算子、(3) Action 算子。

2. 列举

详尽请看:Spark算子分类

会产生 shuffle 的算子

combineByKey、reduceByKey、groupByKey、cogroup、join、leftOutJoin、rightOutJoin

def handleLeaderAndIsrRequest(request: RequestChannel.Request) { // ensureTopicExists is only for client facing requests // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they // stop serving data to clients for the topic being deleted val leaderAndIsrRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest] try { val (response, error) = replicaManager.becomeLeaderOrFollower(leaderAndIsrRequest, offsetManager) val leaderAndIsrResponse = new LeaderAndIsrResponse(leaderAndIsrRequest.correlationId, response, error) requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(leaderAndIsrResponse))) } catch { case e: KafkaStorageException => fatal("Disk error during leadership change.", e) Runtime.getRuntime.halt } }

其一便是分区的状态机。首先定义了二个trait封装了概念好的各类分区状态:NewPartition、OnlinePartition、OfflinePartition和NonExistentPartition。而PartitionStateMachine类就是分区的情事机类。在现实进展该类的字段方法在此以前,先说一下它个中定义的八个嵌套类,用于动态监听不一致的分区操作:

PartitionAndReplica case类

Spark Streaming 和 Storm 的区别

管理模型,延迟

纵然如此这八个框架都提供可扩张性和容错性,它们根本的不相同在于他们的拍卖模型。而 Storm 处理的是每一次传入的一个平地风波,而 斯Parker Streaming 是拍卖有些时间段窗口内的风云流。由此,Storm 管理三个事变可以达到秒内的延期,而 斯Parker Streaming 则有几分钟的推移。

容错、数据保障

在容错数据保险方面包车型大巴度量是,斯Parker Streaming 提供了更加好的辅助容错状态总计。在 Storm 中,每一个独立的记录当它通过系统时必需被盯梢,所以 Storm 可乃最少确认保障每一种记录将被拍卖一次,可是在从破绽相当多中恢复生机过来时候允许出现重复记录。这表示可变状态大概不得法地被更新四次。

一方面,斯Parker Streaming 只须求在批等级进行跟踪管理,因而能够使得地保险种种 mini-batch 将完全被拍卖叁遍,即使叁个节点发生故障。(实际上 Storm 的 Trident library 库也提供了完全二遍拍卖。然则,它借助于事情更新情形,这非常的慢,经常必需由客户实现)。

总结

简轻易单,假设您必要秒内的推移,Storm 是三个科学的选拔,并且尚未数量错过。假设您须要有动静的乘除,并且要完全保险每一个事件只被处理二回,斯ParkerStreaming 则更加好。斯Parker Streaming 编制程序逻辑也也许更便于,因为它相仿于批管理程序(Hadoop),极其是在你选用批次(尽管是比相当小的)时。

卡夫卡集群中的个中贰个Broker会被大选为Controller,主要担任Partition管理和别本状态管理,也会推行类似于重分配Partition之类的管住职责。纵然当前的Controller战败,会从别的常规的Broker中再一次公投Controller。

用以监听增添分区事件,也落到实处了handleDataChange方法,具体逻辑如下:

39. onPartitionReassignment——controller中最要害的情势!当命令行发起三个分区重分配操作时,它会在zookeeper的/admin/reassign_partitions路线下开创节点以触发zookeeper的监听器。对三个分区重新分配其别本涉及多少个步骤。首先先来鲜明一下中间的多少个名词:

6. kafka 的多少可信赖性保险

当Producer向Leader发送数据时,能够通过acks参数设置数据可信赖性的等第

  • 0: 不论写入是或不是成功,server没有须要给Producer发送Response,借使发生特别,server会终止连接,触发Producer更新meta数据;
  • 1: Leader写入成功后即发送Response,此种境况假使Leader fail,会扬弃数据
  • -1: 等待全部ISLacrosse接收到消息后再给Producer发送Response,那是最强保障

仅设置acks=-1也无法保障数据不遗弃,当Isr列表中独有Leader时,同样有非常大可能率导致数据错过。要保障数据不丢除了安装acks=-1, 还要确定保证IS安德拉的大大小小大于等于2,具体参数设置:

1.request.required.acks:设置为-1 等待全部IS大切诺基列表中的Replica接收到新闻后采算写成功;

2.min.insync.replicas: 设置为当先等于2,保险IS智跑中起码有三个Replica

稳重:Producer要在吞吐率和数量可信性之间做二个度量

持有的leader选取成功后,都要透过央浼把实际的request路由到对应的handler管理。近些日子kafka并从未把handler抽象出来,而是各样handler都是四个函数,混在卡夫卡Api类中。其实也等于之类的代码:

独有是标识分区状态为NonExistentPartition就可以

26. initalizePartitionReassignment:主要正是找寻如何分区须求开展副本的重分配。具体逻辑如下:

spark 职分创制进度

  1. 客商端实施 spark-submit 脚本,脚本内部会调用 spark-class 脚本,启动SparkSubmit 类。
  2. SparkSubmit 类的内部会反射调用大家温馨写的类的 main 方法。
  3. 那儿开班创办 SparkContext。

为该分区选择新的leader和ISSportage以及接收LeaderAndIsr诉求的一组别本,然后写入leader和IS奇骏音讯到zookeeper中保存。

  1. 先是将率先个可用的别本broker作为leader broker并把富有可用的副本对象都装入IS奥迪Q5,然后写leader和ISENCORE新闻到zookeeper中保存

controller会监听该zk目录下别的节点的更改并为相应的topic开启删除操作

spark-submit的时候什么引进外界jar包

在KafkaController类中定义了比比较多品质,大家先入眼精通下边包车型客车PartitionLeaderSelector对象,重假使为分区选举出leader broker,该trait只定义了贰个办法selectLeader,接收一个TopicAndPartition对象和四个LeaderAndIsr对象。TopicAndPartition表示要选leader的分区,而第贰个参数表示zookeeper中保存的该分区的此时此刻leader和IS陆风X8记录。该方法会重回贰个元组饱含了推荐出来的leader和ISLX570以及须要接收LeaderAndISr乞求的一组别本。

既然如此有多样景况就须要定义合法的气象转换:

addBroker —— 固然在broker状态音讯缓存中找不到给定的broker信息直接调用addNewBroker创建三个,其它开启对应broker的乞求发送线程

1. zookeeper 在 kafka 中起到哪些效果与利益

  • Controller 选举

    • Controller 是贰个异样的 Broker, 其肩负维护有着 Partition 的 leader/follower 关系。当有 partition 的 leader 挂掉之后,controller 会重新从同步队列中选出多少个 leader。
    • ==Zookeeper 负担从 Broker 中选出出一个看成 Controller, 并确定保证其独一性。 同时, 当Controller 宕机时, 大选叁个新的。==
  • 集群 membership

    • ==记录集群中都有如何活跃着的Broker。==
  • Topic 配置

    • ==记录有怎么着Topic, Topic 都有如何 Partition,Replica 寄存在什么地方, Leader 是什么人。==
  • ==在 consumer group 产生变化时开展 rebalance。==

  • 配额(0.9.0+)

    • 记录每一种顾客能够读写的数据量。
  • ACLs(0.9.0+)

    • 笔录对Topic 的读写调控。
  • high-level consumer(已废弃)

    • 记录consumer group 及其成员和offset 新闻。

大家的kafka源码分享已经进展过众多期了,首要的故事情节也都享受的几近了,那么那么在现在的享受中,重要聚焦在kafka性能优化和行使

在切切实实进行该类的字段方法此前,先说一下它在那之中定义的嵌套类,用于动态监听副本全部的地方退换:

A中华V —— 当前已分配的别本列表

spark shuffle的具体经过,你知道二种shuffle形式

spark shuffle 有:hash,sort,tungsten-sort。

斯Parker的Shuffle总体来讲就含有八个主导的历程:Shuffle write和Shuffle read。ShuffleMapTask的整个施行进程就是Shuffle write。hash-based机制正是在Shuffle的进度中写多少时不做排序操作,差别于MapReduce。只是将数据依据Hash的结果,将各样Reduce分区的多寡写到各自的磁盘文件中。

先是是将map的输出结果送到对于的缓冲区bucket里面,分配bucket的经过同样也是个hash进行分区的历程,在hashed-based下每四个bucket对应一个末段的reducer,在拍卖完以往bucket里的数据会自动划分到reducer的bucket里面。每一个bucket里的文本会被写入本地球磁性盘文件ShuffleBlockFile中,变成三个FileSegment文件。

Shuffle read指的是reducer对属于本人的FileSegment文件进行fetch操作,这里运用的netty框架,功效显明好于Mapreduce的http传输。fetch操作会等到具备的Shuffle Write进度截止后再拓宽,这也是因为ShuffleMapTask或然并不在叁个stage里面,要求在父stage实行之后提交才会进展子stage的实施。reducer通过fetch获得的FileSegment先放在缓冲区softBuffer中,暗中认可大小48MB。斯Parker没有须要Shuffle后的数目是全局有序的,所以不需求等到shuffle read全体终了后再展开reduce,是能够并行管理的。

class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerState: BrokerState) extends Logging with KafkaMetricsGroup { this.logIdent = "[Controller " + config.brokerId + "]: " private var isRunning = true private val stateChangeLogger = KafkaController.stateChangeLogger val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs) val partitionStateMachine = new PartitionStateMachine val replicaStateMachine = new ReplicaStateMachine private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover, onControllerResignation, config.brokerId) // have a separate scheduler for the controller to be able to start and stop independently of the // kafka server private val autoRebalanceScheduler = new KafkaScheduler var deleteTopicManager: TopicDeletionManager = null val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext, config) private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext) private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext) private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext) private val brokerRequestBatch = new ControllerBrokerRequestBatch private val partitionReassignedListener = new PartitionsReassignedListener private val preferredReplicaElectionListener = new PreferredReplicaElectionListener

    该文件中还定义了七种leader公投器,在事无巨细介绍每一种大选器以前,大家先说下分区都有怎么着状态。卡夫卡定义了4中分区:

  • 注销给定的topic之上的分区退换监听器以预防被删去的topic自动重新塑造出来
  • 找寻那二个已被成功删除的副本集合
  • controller会将它们从气象机中的缓存以及分区别本分配缓存中移除
  • 从controller处获取到待删除topic的装有分区,并一一安装那个分区状态为OfflinePartition->NonExistentPartition
  • 将加以的topic从待删除topic群集中移除,同不常间创新待删除分区会集
  • 删去zookeeper上的topic相关的节点路线,包蕴/brokers/topics/[topic],/config/topics/[topic]以及/admin/delete_topics/[topic]
  • 终极,把topic新闻从controller缓存中移除

10. kafka 怎样完结高吞吐量

def handle(request: RequestChannel.Request) { try{ trace("Handling request: " + request.requestObj + " from client: " + request.remoteAddress) request.requestId match { case RequestKeys.ProduceKey => handleProducerRequest // producer case RequestKeys.FetchKey => handleFetchRequest // consumer case RequestKeys.OffsetsKey => handleOffsetRequest case RequestKeys.MetadataKey => handleTopicMetadataRequest case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest //成为leader或follower设置同步副本组信息 case RequestKeys.StopReplicaKey => handleStopReplicaRequest case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest //shutdown broker case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest case requestId => throw new KafkaException("Unknown api code " + requestId) } } catch { case e: Throwable => request.requestObj.handleError(e, requestChannel, request) error("error when handling request %s".format(request.requestObj), e) } finally request.apiLocalCompleteTimeMs = SystemTime.milliseconds } 
  • 从zookeeper中赢得指标分区的leader和IS福特Explorer,以及controller_epoch信息
  • 如果zk中的controller_epoch比当前controller保存的值大,表明当前controller曾经恐怕倒闭过并公投过别的controller且那一个controller也干预过对象分区的leader选举。倘使是那样的话直接终止正在进展中的leader大选
  • 不然,使用钦赐的leader公投器举办leader公投,获取leader、IS奥迪Q7和A奥迪Q7的新闻并写入zookeeper中保存
  • 一经zookeeper更新失利(比如无法连接zookeeper等),从步骤1初叶重试,直至成功甘休
  • 更新controller缓存的leader、IS冠道音信,并将leader、IS福特Explorer、AEvoque音信加入到元数据需要队列中等待后面发送元数据更新央求

enqueueTopicsForDeletion:将要删除的topic入队到topicsToBeDeleted群集中,在剔除成功做到之后才会被移除

斯Parker 职责交给进度源码

  1. Driver 程序的代码运维到 action 操作,触发了 SparkContext 的 runJob 方法

  2. 斯ParkerContext 调用 DAGScheduler 的 runJob 函数,内部调用 DAGScheduler 的 submitJob 方法,重返七个 JobWaiter 对象。接着向 伊夫ntProcessLoop 的堵截队列中 put 三个 JobSubmitted 事件。

  3. 那时 DAGScheduler 的 onReceive 方法被调用,格局匹配,调用 handleJobSubmitted 方法,用来切分 stage。stage 的分割进程是递归调用,在此以前今后的细分 stage。

  4. 基于 final stage 递归找到第多个 stage,然后将率先个 stage 提交。

  5. 出于 stage 的品类不一致,这里会有二种不一样门类的 task,ShuffleMapTask 和 ResultTask。把 task 封装到 taskSet 里,把 Tasks 交给 TaskScheduler。(RPC 调用,向 Executor 提交 task)

  6. Executor 将 task 封装到 TaskRunner 对象中,将 taskRunner 放入到 Executor 中的线程池中。

  7. 末段会调用 ShuffleMapTask 或 ResultTask 的 runTask 方法,实践职业逻辑。

流程图表达

  1. handleStateChanges —— 状态机运转时以及broker产生改变时会调用那个法子,主要担当管理一组别本的情景改变。具体做法正是为每一个要管理的别本调用handleStateChange方法来实市场价格况调换,然后统一发送LeaderAndIsr诉求列表中的全体央浼

  2. startup —— 运营状态机,先最初化别本状态,然后设置运转标志位,最终调用handleStateChanges方法将controller中保存全部当前别本设置为OnlineReplica

  3. registerBrokerChangeListener/deregisterBrokerChangeListener —— 在zk上登记/取消用于监听别本状态退换的监听器

  4. registerListeners/deregisterListeners —— 同上

  5. shutdown —— 关闭controller时候调用该方法关闭状态机——设置运营标记位为false,清空别本状态缓存并注销全部zk上的监听器注册

  6. areAllReplicasForTopicDeleted —— 查看某topic下的具有副本是或不是业已被去除

  7. isAtLeastOneReplicaInDeletionStartedState —— 推断是还是不是留存最少叁个别本正处在被去除的经过中

  8. replicasInState —— 再次来到某些topic下钦命别本状态的全数别本

  9. isAnyReplicaInState —— 剖断某topic下是或不是存在钦点状态的副本

  10. replicasInDeletionStates —— 再次来到某topic下正处在删除操作进度中的全部别本

  11. partitionAssignedToBroker —— 在一组topic中寻找在给定broker有别本的topic及分区

isPartitionToBeDeleted:推断是还是不是要删减给定的topic分区——具体做法就是判别是或不是在待删除分区集合中

Spark 品质优化方案

那之中的各样央求在上面给出的链接的小说中都有过解释表明,在此间十分少解释。

1. TopicChangeListener监听类

  1. controller:表示一个controller对象

  2. initialTopicsToBeDeleted:待删除的topic集结

  3. initialTopicsIneligibleForDeletion:近来不可能被剔除的topic集结

斯Parker SQL 源码实施流程

  1. SQL 语句经过 SqlParser 解析成 Unresolved LogicalPlan
  2. 使用 analyzer 结合数据数据字典(catalog)进行绑定,生成 resolved LogicalPlan
  3. 运用 optimizer 对 resolved LogicalPlan 实行优化,生成 optimized LogicalPlan
  4. 使用 SparkPlan 将 LogicalPlan 转换成 PhysicalPlan
  5. 采用 prepareForExecution() 将 PhysicalPlan 转变来可实践物理布置
  6. 利用 execute() 实施可实行物理陈设
  7. 生成 RDD。

经过大家地点的代码,能够见到在卡夫卡Controller中国共产党定义了八种selector大选器:

 

removeExistingBroker —— 关闭对应broker的最底层通道,清空央求/响应队列,关闭乞求发送线程,并从broker状态音讯从缓存中清除

7. kafka partition 分区的宗旨是怎么样

音信发送到哪个分区上,有二种基本的政策,一是利用 Key Hash 算法,一是利用 Round 罗布in 算法。别的创造分区时,最佳是 broker 数量的卡尺头倍,那样手艺是二个 Topic 的分区均匀的遍及在全体 卡夫卡 集群中。

默许景况下,卡夫卡 依据传递音讯的 key 来进展分区的分红,即 hash(key) % numPartitions。

尽管发送音信时未尝点名key,那么 Producer 将会把那条新闻发送给随机的二个Partition。然则代码层面包车型客车逻辑并不完全部都以这么。首先走访卡夫卡有未有缓存的现存的分区Id,假诺局地话一贯选取这几个分区Id。若无的话,找寻具有可用分区的leader所在的broker,从当中随机挑一个并内置缓存中,下一次就直接从缓存中拿这么些partition id。注意这些缓存是每隔一段时间就能够被清空的。这么做的目标是为了收缩服务器端的sockets数。

标识分区状态为离线。

  • 收获待删除topic集结(方法参数字传送过来的),与controller中保存的topic集合比较,找到那个不设有的topic
  • 举个例子实在存在官样文章的topic群集,那么删除zk中/admin/delete_topics下相应的子节点
  • 从待删除的topic集合中去掉这几个不真实的topic
  • 遍历更新后的待删除topic集合,推断每一种topic当前是或不是正处在别的境况改动进度中,比如PreferredLeaderSelector或ReassignedLeaderSelector。假设是的话调用DeleteTopicManager的markTopicIneligibleForDelete方法标记该topic为一时不能够被去除状态
  • 将有着带删除的topic都投入到controller的topic删除列表中等候专有线程对其进行删除操作
  1. initalizeControllerContext:开首化controller上下文,具体逻辑如下:

9159.com 2Sample Flowchart Template.png

3. AddPartitionsListener监听类

  • 从zookeeper中读取controller_epoch值,并扩大它的值
  • 注册controller_epoch改变监听器
  • 开端化controller的context对象以缓存当前topic,可用broker和兼具分区leader
  • 运转controller的坦途管理器、别本状态机和分区状态机
  • 为富有topic注册分区改变监听器,并将broker置于controller
  • 尝试举办分区别本重分配以及preferred别本leader公投,并将结果发送UpdateMetadata央求给全数可用的broker
  • 聊起底运转删除topic处理器

离线状态标识为不设有分区,表示该分区失败大概被删除。

  • 获得目的分区对应的别本分配记录,并从当中寻觅可用的已分配副本。若是当前从未有过可用的别本那么报错表示该情形调换战败;不然选举出第一个别本broker作为leader,把当前可用的别本集同盟为ISTiguan
  • 假如鲜明了leader和ISSportage之后,在zookeeper上的/brokers/topics/[topic]/partitions/[partitionId]/state节点下创立出相应的leader和IS福特Explorer路线
  • 履新controller的分区leader缓存消息,况兼把LeaderAndIsr央浼参与到brokerRequestBatch中

controller开启多个后台线程管理topic的删除。使用该线程主要为了现在能够扩展TTL(time to live)的个性。无论何时开启或重启topic删除操作时都会通报该线程。当前,topic删除操作只可以由onPartitionDeletion回调方法来开启。在后续版本中,卡夫卡会思虑基于在topic上安排TTL来触发。下列意况下topic并不能被去除:

在介绍完最中央的概念之后,上边大家将首要介绍下面提到过的三种公投器:1、ReassignedPartitionLeaderSelector从可用的ISEscort中甄选第三个作为leader,把当前的ISTiguan作为新的IS揽胜,将重分配的别本集同盟为接收LeaderAndIsr伏乞的别本集合。2、PreferredReplicaPartitionLeaderSelector假定从assignedReplicas抽出的率先个别本就是分区leader的话,则抛出特别,不然将第多少个别本设置为分区leader。3、ControlledShutdownLeaderSelector将IS帕杰罗中居于关闭状态的副本从集合中去除掉,重回叁个新新的ISXC60集合,然后采取第三个别本作为leader,然后令当前AEscort作为接收LeaderAndIsr诉求的别本。4、NoOpLeaderSelector标准上不做任何事情,重返当前的leader和isr。5、OfflinePartitionLeaderSelector从活着的IS牧马人中甄选三个broker作为leader,假诺IS锐界中从未活着的别本,则从assignedReplicas中选择贰个别本作为leader,leader大选成功后登记到Zookeeper中,并立异具备的缓存。

以此类承继了Zookeeper的IZKChildListener接口,前者是znode子节点事件监听接口,当ZKClient接收到有个别path节点更动或子节点改动事件时就能够触发该listener。而TopicChangeListener类担负监听分区的全数希望的景况调换。具体落成也很简短就是贯彻接口的handleChildChange方法,具体逻辑如下:

  • 先依据topic将具有副本进行分组
  • 为种种topic搜索近期可用的别本集结以及"已死"的别本会集
  • 从别本状态集结中搜索topic的富有已成功删除的别本,依据刚才总结出来的可用别本寻觅要求重试删除的别本集结
  • 将"已死"副本状态置于不可删除,将须求重试删除的别本集结置于Offline状态并发送StepReplica央浼给全数不是Offline的follower别本使它们截止向leader索取音信
  • 开班发送乞请删除别本,倘诺有删除战败,标识该topic为不可删除

3、如若partitionStateInfo中的leader epoch小于当前ReplicManager则将相应的Error code(ErrorMapping.StaleLeaderEpochCode)存入Response中。

OnlineRelica —— 一旦运转了贰个别本以及该分区A奥德赛别本集结中的一片段,那么就将安装该别本状态为OnlineReplica。在此情景中的别本还可以"成为leader"或"成为follower"的意况改动诉求。可由NewRelica、OnlineReplica或OfflineReplica状态调换而来

    上面再来说多个case类:StopReplicaRequestInfo和ControllerBrokerStateInfo类。ControllerBrokerStateInfo就是保存controller状态音讯的,满含用于发送恳求的通道channel,broker对象,乞请队列,里面保存了乞求及相应的回调方法,以及二个伸手发送线程(前边会谈到)。而StopReplicaRequestInfo类表示了该乞请的相干消息,包涵别本音讯、是还是不是删除分区的标志位以及贰个回调方法用于在吸收接纳到响应后进行。

2、假若partitionStateInfo中的leader epoch大于当前ReplicManager中寄存的(topic, partitionId)对应的partition的leader epoch,则:

  1. controllerContext —— 就是卡夫卡Controller类的贰个实例,封装了成都百货上千controller的法子

  2. controllerId —— controller ID,由布署文件中的broker.id属性内定

  3. zkClient —— 二个ZooKeeper的顾客端,用于与zookeeper服务器交互

  4. replicaState —— 保存的分区别本的情景消息缓存

  5. brokerChangeListener —— zk监听器,用于监听别本的气象退换

  6. hasStarted —— 首要用来标记该状态机是还是不是打开

  7. brokerRequestBatch —— 首要用来批量殡葬央浼给Broker

  8. stateChangeLogger —— 贰个日志类,用于一些日志输出

14. removeReplicaFromIsr:将加以的分区别本从IS凯雷德中移除。前提是它不是leader且IS中华V有丰硕多的副本。具体逻辑如下:

OnlinePartition, OfflinePartition -> OnlinePartition

依照给定的目的状态步入分裂的分层:

27. initializePreferredReplicaElection:首要就是要让controller缓存这个要拓展preferred别本公投的分区。具体逻辑如下:

  • NonExistentPartition —— 这几个意况表示该分区要么未有被成立过或早就被创立过但背后被删去了。
  • NewPartition —— 分区创建之后就处在NewPartition状态。在那个意况中,分区应该已经分配了别本,不过还并未有公投出leader和IS奥迪Q5。
  • OnlinePartition —— 一旦分区的leader被推选出来,它就高居OnlinePartition状态。
  • OfflinePartition —— 假设leader大选出来后,leader broker宕机了,那么该分区就处于OfflinePartition状态。

NewReplica —— 创建topic或重分配分区时controller会创设新的别本,新成立的别本正是处于这几个情况。在此情状中的别本只好收取"成为follower"的状态更动央浼,可由NonExistentReplica状态调换而来

addNewBroker —— 成立一个新的消息队列、四个围堵队列以及几个线程,然后将这几个消息封装进ControllerBrokerStateInfo中与相应的broker创设起映射

trait PartitionLeaderSelector { def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int])}

上边逐个深入分析各样leader选举器类,先从NoOpLeaderSelector开头:

今昔得以说说ControllerChannelManager类了。它定义了多少个Hashmap来保存broker状态音讯,并在类构造时会将具备broker都装进这么些map中。上边来看它定义的措施。

多样状态的退换关系如下:

2. DeleteTopicsListener监听类

7. sendUpdateMetadataRequest:将包涵给定分区leader消息的UpdateMetadata要求发送到给定的broker上通报它们

NewPartition, OnlinePartition -> OfflinePartition

2. 对此这些分区来说,发送LeaderAndIsr央求到种种可用的别本broker,以及UpdateMetadata乞求到各种可用的broker上

那一个scala文件是controller包的主导文件,里面定义了十个class和object。大家从轻巧地从头说:

  • 1、NoOpLeaderSelector
  • 2、OfflinePartitionLeaderSelector
  • 3、ReassignedPartitionLeaderSelector
  • 4、PreferredReplicaPartitionLeaderSelector
  • 5、ControlledShutdownLeaderSelector
  • NonExistentPartition —— 这一个情状表示该分区要么未有被创制过或曾经被创制过但后边被删除了
  • NewPartition —— 分区成立之后就处于NewPartition状态。在那几个意况中,分区应该早已分配了别本,可是还尚无公投出leader和ISLAND
  • OnlinePartition —— 一旦分区的leader被推举出来,它就处在OnlinePartition状态
  • OfflinePartition —— 借使leader选举出来后,leader broker宕机了,那么该分区就高居OfflinePartition状态。

23. stopOldReplicasOfReassignedPartition:将给定分区中原先已存在的别本都停掉。具体逻辑如下:

4、筛选出partitionState中Leader与近期Broker ID相等的有所记录存入partitionsTobeLeader中,另外记录存入partitionsToBeFollower中。假如partitionsTobeLeader不为空,则对其施行makeLeaders方。假诺partitionsToBeFollower不为空,则对其实施makeFollowers方法。

类字段

onPartitionDeletion:删除topic的回调方法会调用这几个分区等第的去除方法。该方法首先会发送UpdateMetadata乞请给具备当前活蹦乱跳的broker让它们并非在拍卖相关的顾客央浼——抛出UnknownTopicPartitionException,然后将分区全数别本状态置于OfflineReplica状态,并发送StopReplica伏乞给相应别本以及LeaderAndIsr央求给leader以压缩IS奇骏。如若leader别本本身就高居OfflineReplica状态,它就不会发送LeaderAndIsr央浼因为leader项本人会被更新为-1,最终将装有别本置于ReplicaDeletionStarted状态——那将会发送StopReplica央浼并会同别本一齐删除,最终删除对应分布的持有别本的有所持久化数据

ReplicaDeletionStarted —— 开启别本删除操作时会将别本状态置于ReplicaDeletionStarted状态,可由OfflineReplica状态转移而来

deleteTopicsThread:删除topic的线程实例

  • 赢得controller中保存的分区别本分配缓存记录
  • 搜索各种分区的ALX570音讯,遍历A逍客中的每一种broker,借使这几个broker可用(是还是不是在controller的可用broker列表缓存中),则将该别本状态置于OnlineReplica,不然将别本状态置于ReplicaDeletionIneligible状态——controller failover时候broker会宕机,处于该broker的持有别本都要放权那么些景况
  • 发送有关全数分区的UpdateMetadata须要到新扩充长的broker
  • 将新加上的broker上的别本置于OnlineReplica状态
  • 将OfflinePartition和NewPartition状态的分区都安装为OnlinePartition(使用OfflinePartitionLeaderSelector)
  • 对此那贰个存在于新扩充加broker的别本的分区,调用onPartitionReassignment来产生分区别本重分配
  • 一经开发银行了状态机,首先开启leader大选机械漏刻
  • 收获要扩充情状改变的别本所在的持有broker Id,与controller中缓存的别本列表相比较,寻找新添的broker
  • 在Zookeeper中取得这么些新增broker的元数据音信并打包到多个Broker列表中回到
  • 寻找这多少个已被删除的broker,然后采取给定的broker列表更新controller中可用broker缓存
  • 各自调用addBroker和removeBroker方法更新对应的broker线程
  • 提及底调用onBrokerStartup和onBrokerFailure回调函数分别管理新扩充的broker和删除的broker

start:新controller初阶化后调用该办法开启topic删除处理器。具体逻辑正是:首先决断是不是展开了delete.topic.enable,否的话一切免谈!后边的成都百货上千操作也是基于开启它的前提实行的。之后创造删除topic线程并运转线程

别本状态机类,定义了一个分区别本的全数情状集合:

deleteTopicsStateChanged:标志topic删除操作是或不是上马

卡夫卡集群中的二个broker会被看做controller担负处理分区和别本的情状以及实践类似于重分配分区之类的管理任务。要是当前的controller退步了,会从剩下的broker中选出新的controller。

  1. controllerChannelManager —— controller的康庄大道管理器

  2. controllerLock —— 一个重入锁,用于同步操作

  3. shuttingDownBrokerIds —— 正在处于关闭状态的Broker ID群集

  4. brokerShutdownLock —— broker关闭时的八只锁

  5. epoch —— controller的epoch信息,初始为0

  6. epochZkVersion —— zk版本的epoch信息,初始为0

  7. correlationId —— 关联诉求/响应的correlationId

  8. allTopics —— 总的topic集合

  9. partitionReplicaAssignment —— 保存分区集结中种种分区的ASportage(assigned replicas)

  10. partitionLeadershipInfo —— 保存分区集合中每个分区的leader、ISR以及controller_epoch信息

  11. partitionBeingReassigned —— 正在拓宽别本重分配的分区集结

  12. partitionsUndergoingPreferredReplicaElection —— 正在展开preferred别本leader公投的分区集合

  13. liveBrokersUnderlying —— 当前可用的broker实例集结

  14. liveBrokerIdsUnderlying —— 当前可用的broker ID集结

OfflineReplica —— 固然贰个副本挂掉(保存该别本的broker宕机)将被置于OfflineReplica状态,可由NewReplica或OnlineReplica状态调换而来

重分配分区的三个上下文音信类,封装了新分配的别本集结以及注册贰个分区IS奥德赛更动监听器

若是状态机未有运转,直接抛出极度退出

  • 布局一个TopicAndPartition实例
  • 为各类传入的broker取得相应的分区状态音讯,若无对应记录第一手创制一个新的map来保存brokerId -> ((topic, 分区号) -> 分区气象音信)的照耀
  • 为每种broker扩展一条辉映记录,假使原先存在对应记录第一手覆盖
  • 调用addUpdateMetadataRequestForBrokers方法将元数据更新恳求发给各类broker

上边说说具体的主意:

startup —— 运行controller的通道管理器,就是运行全体当前可用broker的伸手发送线程

OnlinePartition, OfflinePartition -> OnlinePartition

除开那些关键的类字段,该类还提供了不胜枚举珍视的controller方法,例如onNewTopicCreation、onNewPartitionCreation、onBrokerFailure、onBrokerStartup、onPartitionReassignment、onPreferredReplicaElection和shutdownBroker等。大家照旧多少个八个说:

ControlledShutdownLeaderSelector类 —— 将IS库罗德中那个处于关闭状态的别本去除掉作为新的ISXC90,然后选用第多少个别本作为leader,然后令当前AGL450作为接收LeaderAndIsr诉求的别本。

TopicCommand发送topic删除命令,在zk的/admin/delete_topics目录下创办topic节点

OfflinePartitionLeaderSelector类 —— 假如IS传祺中足足有一个可用的broker,则从ISENCORE中精选三个broker作为新的leader,而可用的IS奥迪Q5正是新的IS奥迪Q5。若无可用的broker且未有启用unclean leader选拔,那么就抛出特别NoReplicaOnlineException。不然就从ALacrosse中选出贰个可用的broker作为新的leader和IS雷克萨斯RC。但万连续A奥德赛后都并未有可用的broker,抛出十一分。末了将可用的A陆风X8作为接收LeaderAndIsr诉求的别本集结。一旦成功竞选出leader之后保存到zookeeper中并创新缓存消息。

  1. onControllerResignation:当前broker吐弃controller身份时zk leader大选器会调用该方法,主要用来破除内部的controller。具体逻辑如下:
  1. partitionAssignedToBroker —— 寻找指标broker上有些topic的有着分区信息,但是那些办法平时未有被用到

  2. assertValidPrevisousStates —— 给定指标转移状态,验证当前景色是还是不是合法

  3. handleStateChange —— 别本状态机的主旨措施。它定义了法定的情状调换,蕴含:

  1. isRunning —— 标记位,剖断该controller是还是不是处在运维状态

  2. controllerContext —— 创造的controller的内外文类实例

  3. partitionStateMachine —— 分区状态机

  4. replicaStateMachine —— 副本状态机

  5. controllerElector —— controller的leader公投器,重要由server包的ZookeeperLeaderElector完毕(server包时大家再说)

  6. autoRebalanceScheduler —— 卡夫卡Scheduler达成的单独的controller调节器,能够单独于卡夫卡 server启动与停止

  7. deleteTopicManager —— Topic删除管理器

  • 假倘诺OfflinePartitionLeaderSelector的话,新的leader就是多个可用的别本(该部分最佳不在IS奥迪Q5中),而新的ISPRADO正是在此之前的ISOdyssey或许是刚刚选出来的leader,而接收LeaderAndIsr乞请的别本会集就是当前可用的副本集合;
  • 比如是ReassignedPartitionLeaderSelector的话,新的leader正是一个可用的已分配的别本,新的IS奥德赛正是当前的IS奇骏,而接受乞求的别本会集正是重分配的别本集结;
  • 假借使PreferredReplicaPartitionLeaderSelector的话,新的leader正是率先个分配的别本,新的IS逍客正是当前的IS凯雷德,接收央求的别本便是已分配的别本集合;
  • 借使是ControlledShutdownLeaderSelector的话,新的leader正是ISKuga中从不被关闭的别本,新的ISEnclave正是去除关闭状态别本的ISENCORE,而接受副本正是当下可用的已分配别本

18. readControllerEpochFromZookeeper:从zookeeper的/controller_epoch中读取controller_epoch信息

  • 赢得给定zk路线下全部子节点的集纳,并与前段时间controller中保留的topic集结比较,寻觅新扩张topic群集和被去除的topic会集
  • 履新controller中保留的当下topic集结
  • 从zk的/brokers/topics/[topic]路径下寻找那一个新扩张topic对应的分区别本分配记录音信
  • 立异controller中保存的topic分区别本分配记录,去掉那多少个被去除topic的记录,并步入上一步中赢获得的那多少个新添topic的分红记录
  • 假诺新扩展了topic,那么调用onNewTopicCreation方法为新添topic注册分区改变监听器并安装分区状态,最终发送元数据更新央浼通知顺序broker
  1. addStopReplicaRequestForBrokers:扩展brokerId -> 一组StopReplica恳求音讯的投射。具体逻辑如下:

三、ReplicaStateMachine.scala

一个总括音信object,承袭了卡夫卡MetricsGroup,里面定义了三种总括衡量元:unclean leader大选率(每秒进行了略微次unclean leader公投)和leader公投沙漏

NonExistentReplica —— 假若别本被成功删除将被内置NonExistentReplica状态,可由ReplicaDeletionSuccessful状态转变而来

17. removePartitionFromReassignedPartitions:将给定topic分区的别本分配消息从zookeer和controller缓存中除去

 

2. addUpdateMetadataRequestForBrokers:发送UpdateMeatadataRequest须要给钦命的broker,更新给定的分区已经那么些要被删去的分区。该方法中定义了三个嵌套方法:updateMetadataRequestMapFor,主要用于立异给定分区在updateMeataRequestMap中的记录音信。具体逻辑为:首先试图寻觅controller中保留的该分区的leader;假设官样文章则一直退出,不然获取到相应的A奥迪Q5,即便该分区是要被删除的,则新创制多少个LeaderAndIsr对象,并将leader状态设置为LeaderDuringDelete,不然直接传送给PartitionStateInfo营造新的实例,最后为各个broker都扩充一条对应的分区->分区状态新闻的粲焕记录

  • 假使分区状态机本人未有运营,直接抛出相当退出
  • 查阅指标分区在情景中的当前事态,若无另外笔录设置当前状态为NonExistentPartition
  • 一经要调换来NewPartition状态,当前必需处于NonExistentPartition状态,然后更新controller中的分区别本分配缓存消息,并将分区状态设置为NewPartition
  • 假诺要更改来OnlinePartition状态,当前必需处于NewPartition、OnlinePartition或OfflinePartition中的一种。如果当前景观是NewPartition,首先为该分区大选出leader和IS奥德赛并写入zookeeper中保留;借使是OnlinePartition或OfflinePartition,则动用electLeaderForPartition方法(前边会谈到)重新为分区大选leader,最终设置分区状态为OnlinePartition
  • 假若要调换成OfflinePartition状态,当前务必处于NewPartition、OnlinePartition或OfflinePartition中的一种。当分区的leader荒诞不经或不可用时会施行那样的转换。该转变也只是是改动分区状态为OfflinePartition
  • 假诺要改动来NonExistentPartition状态,当前必得处于OfflinePartition,同样地,调换操作也唯有是安装指标分区状态为NonExistentPartition

deleteTopicStopReplicaCallback:删除别本的回调方法,具体逻辑如下:

OfflinePartition -> NonExistentPartition

关键的类措施定义如下:

ReplicaDeletionSuccessful —— 若是别本删除诉求成功,再次回到的响应未有不当的话,该别本会被置于ReplicaDeletionSuccessful状态,可由ReplicaDeletionStarted状态转变而来

awaitTopicDeletionNotification:删除topic线程会调用该方法等待触发、重启或终止topic删除的风浪发生。——具体做法正是一旦线程在运作何况删除topic状态并未有生出转移就直接等候

  1. getLeaderIsrAndEpochOrThrowException —— 从Zookeeper中获取有些分区的leader和IS悍马H2音讯,假若一纸空文则抛出极度。

  2. electLeaderForPartition —— 为那多少个leader已不可用的指标离线分区大选新的leader,在试行Offlinepartition, OnlinePartition到OnlinePartition的转移时会被调用。具体逻辑是:

2. onNewTopicCreation:分区状态机的topic退换监听器会调用这么些格局。具体逻辑如下:

okay,讲完那几个监听类之后我们得以梳理一下PartitionStateMachine类的字段和方法。先说字段:

ReassignedPartitionsContext case类

二、PartitionStateMachine.scala

ControllerStats object

  1. initializePartitionState —— 在分区状态机运维时候会被调用,用于安装Zookeeper中持有现有分区的上马状态。具体逻辑是:首先取得zk中已有分区记录,遍历每一条分区记录,假诺该分区未有leader和IS凯雷德的话则停放NewPartition状态,不然检查一下leader broker是不是可用(即在可用broker列表中),若是可用的话将分区状态设置为OnlinePartition否则设置为OfflinePartition。

  2. assertValidPreviousStates —— 在分区状态调换起先前验证一下转换前的情景是否帮忙开展这种转移。被允许的转移在此以前早就说过了,不再赘言。

  3. assignReplicasToPartitions —— 在实行NonExistentPartition到NewPartition的转变时会调用该办法用来更新controller的保存的分区别本分配缓存消息。具体方法正是先从zookeeper中读出该分区的A奇骏记录然后参与到controller保存的缓存中。

  4. initializeLeaderAndIsrForPartition —— 在施行NewPartition到OnlinePartition转换时会调用该格局。如若一个分区处于NewPartition状态时,它并不曾公投出leader和ISSportage。在转移到OnlinePartition状态后,会在Zookeeper上创制对应的leader和IS智跑记录。一旦处于OnlinePartition状态后分区恒久无法退回到NewPartition状态,而只好是OfflinePartition状态。具体逻辑如下:

ControllerContext类

和不菲监听器类同样,也促成了handleChildChange方法,具体逻辑如下:

shutdown —— 循环调用removeExistingBroker方法移除缓存中兼有broker

标记分区状态为离线(offline),仅此而已

12. startNewReplicasForReassignedPartition:为已分配别本的分区扩充新的副本——发送StartReplica哀告给新别本所在的broker

NonExistentPartition -> NewPartition

  • 收获要进行分红的新的别本集合以及给定分区脚下的leader
  • 封存缓存中该分区的别本分配记录,然后更新为上一步中新别本集合——那样做能够让LeaderAndIsr央浼封装这些集结发送到当前或新选的leader,进而阻碍它往IS奥迪Q5中增多旧的别本
  • 翻开一下待分配的别本群集中是不是含有leader别本:假若不包涵的话则供给从ISKuga中另行公投leader
  • 要不检查一下leader是或不是可用,如果不可用也急需再行大选leader,不然更新zookeeper中leader的epoch值,以供后续的LeaderAndIsr必要使用

BrokerChangeListener类

  • 对此每三个topic都登记二个分区改换监听器
  • 为流传的新分区域地质调查用onNewPartitionCreation方法来创立那个新的分区
  • 将饱含新topic音讯的元数据央求发送给全体broker以使他们能够管理新topic的呼吁

PreferredReplicaPartitionLeaderSelector类 —— 纵然ACR-V中的第一个别本正是方今leader的话,抛出非常,否则就公投该别本为leader,把当前ISKoleos当作新的ISWrangler,令ACRUISER作为接收LeaderAndIsr乞请的别本集结。

  1. liveBrokers_ —— 设置当前可用的broker集结,即给liveBrokersUnderlying和liveBrokerIdsUnderlying赋值

  2. liveBrokers/liveBrokerIds —— 过滤掉那二个正在关闭中的broker

  3. liveOrShuttingDownBrokersIds/liveOrShuttingDownBrokers —— 获取当前可用的以及正处在关闭状态的broker集结

  4. partitionsOnBroker —— 返回全部在给定broker上保存有别本的分区集结

  5. replicasOnBrokers —— 重返给定Broker集合中具有broker上保存的具备别本记录,封装成PartitionAndReplica集结再次来到

  6. replicasForTopic —— 重回给定topic的兼具副本,封装成PartitionAndReplica集合重临

  7. partitionsForTopic —— 再次回到给定topic的全部分区

  8. allLiveReplicas —— 再次来到全数可用的别本记录

  9. replicasForPartition —— 重返给定分区的别本会集,封装成PartitionAndReplica集合重返

  10. removeTopic —— 把topic从controller中移除,包括partitionLeadershipInfo、partitionReplicaAssignment和allTopics

  1. initializeReplicaState —— 设置zookeeper中有着分区的别本的启幕状态,在开发银行别本状态机时会调用该情势。具体逻辑如下:
  • 第一判定是或不是是当前controller,假使不是抛出拾分——终究不是controller何谈关闭!
  • 看清一下该broker是不是留存,假如一纸空文自然也抛出特别
  • 将brokerId参与到待关闭broker集合中
  • 找寻在该broker上有着topic分区以及别本数
  • 遍历每八个topic分区,要是其leader别本就在该broker上海重机厂新公投leader,更新zookeeper中的新闻然后发送UpdateMetadata央浼公告全部受影响的broker
  • 借使leader别本不在该broker上,把该broker插足到StopReplica恳求队列中(但不删除分区)然后发送之
  • 终极将待关闭broker上具备别本都设置为OfflineReplica,更新zookeeper的IS昂科拉新闻通报同志央求给leader

NoOpLeaderSelector类 —— 本质上什么样都不做,只是重回当前的leader和ISENVISION以及给定分区当下的ALAND(assigned replicas)

partitionStateMachine:分区状态机实例

一概而论便是为分区选举出leader broker,该trait只定义了二个措施selectLeader,接收二个TopicAndPartition对象和八个LeaderAndIsr对象。TopicAndPartition表示要选leader的分区,而第二个参数表示zookeeper中保存的该分区的这段时间leader和ISCR-V记录。该方法会重回叁个元组满含了引入出来的leader和IS景逸SUV以及须求接收LeaderAndISr诉求的一组别本。

24. moveReassignedPartitionLeaderIfRequired:尽管非得的话再次大选某些分区的leader别本,具体逻辑如下:

  • 纵然要更动为NewReplica:必需申明当前情景是NonExistentReplica,然后从zookeeper中搜索该别本分区的leader、ISRubicon等音讯。要是不设有那几个消息,就等候leader大选之后发乞请给该副本;假使存在那个新闻,还要剖断一些leader是不是正是协和,假使是的话抛错因为被选举为leader的别本是无法张开情况调换来NewReplica的。即便上述步骤都未曾抛错,就把发送LeaderAndIsr央求给该别本并发送UpdateMetadata诉求给具有可用的broker。最终设置副本状态为NewReplica
  • 就算要改换为ReplicaDeletionStarted,必需表明当前情形是OfflineReplica,然后更新别本状态并发送结束别本的伏乞(通过回调函数来达成)
  • 只要要改换为ReplicaDeletionIneligible,表达该别本近期不可能被删去,首先要表达当前事态必得是ReplicaDeletionStarted,然后更新境况为ReplicaDeletionIneligible就可以
  • 借使要调换为NonExistentReplica,必需表达当前景观是ReplicaDeletionSuccessful,之后在controller缓存中把别本从全数分区的ASportage中移除并从气象缓存中移除该别本的笔录
  • 只要要转移为OnlineReplica,必得表明当前情状是NewReplica、OnlineReplica、OfflineReplica或ReplicaDeletionIneligible中的一种。假如是NewReplica状态,找寻脚下分区的ARAV4集结,纵然该别本不在ARAV4中则把它参预到A卡宴;如若是其他意况的话则须要先检查是还是不是存在leader,假如不设有的话代表该分区从未处于OnlinePartition状态,也正是说broker从未开启过该分区的音信日志写入,因而在本方法中也就像是何都不做。但如若存在leader音讯,将发送LeaderAndIsr哀告给该别本,并发送UpdateMetadata央浼给持有可用的broker,最后设置情形就能够
  • 假如要转变为OfflineReplica,必需表达当前事态是NewReplica、OnlineReplica、OfflineReplica或ReplicaDeletionIneligible中的一种。首头阵送甘休别本的央浼使得别本不在从leader出获取音信,然后寻觅分区的leader和ISWrangler。若是不设有则抛出拾分,不然将分区别本从IS安德拉中移除并发送LeaderAndIsr央求更新移除后的ISSportage,最终设置别本状态就可以

completeDeleteTopic:实现具体的去除topic操作。逻辑如下:

NewPartition, OnlinePartition -> OfflinePartition

sendRequest —— 发送诉求,具体做法正是基于给定brokerId,寻觅缓存中的broker状态新闻,假诺不真实报告警察方broker不可用退出,不然将request及回调方法装进对应的图景消息的消息队列中

2. 对此这么些分区来讲,发送LeaderAndIsr央浼给种种接收哀告的副本况且发送UpdateMetadata央浼给各样可用的broker

4. shutdownBroker:符合规律关闭controller。controller会首先鲜明关闭broker上的leader replica,然后将那些leader转移到其它broker上。具体逻辑如下:

  1. handleStateChange —— 这些办法是分区状态机的主导措施。该形式首要承接保险所有事态转变都以合法的。具体逻辑如下:

35. onBrokerStartup:别本状态机的broker监听器会调用此措施,传入一组新成立的broker。大约流程如下:1. 为富有NewPartition/OfflinePartition的分区触发OnlinePartition状态退换;2. 反省是还是不是留存重分配别本分配给新扩张长的broker。即便存在的话,它会为每一个topic分区都实行重分配的操作。该方法中不要求刷新leader和IS君越缓存。具体逻辑如下:

  1. triggerOnlinePartitionStateChange —— 当卡夫卡集群成功选出controller或爆发broker更改时就可以调用该办法尝试将全体处于NewPartition或OfflinePartition状态的分区转变状态到OnlinePartition状态。具体做法就是遍历分区状态缓存中的全数分区,只要其所属的topic不在要刨除topic队列中且分区处于NewPartition或OfflinePartition状态就调用handleStateChange方法开展到OnlinePartition的情事调换。然后发送更新LeaderAndIsr诉求以及元数据央浼给broker

  2. partitionInState —— 从controller缓存中赢稳当前高居给定状态的装有分区,貌似那个格局没有被采纳过

  3. startup —— 运行状态机,先开首化各类分区的处境,然后设置运行标记位为true注解已运行,最终调用triggerOnlinePartitionStateChange方法将符合条件的分区都置为OnlinePartition状态

  4. registerTopicChangeListener/deregisterTopicChangeListener —— 分别注册和撤废Zookeeper的/brokers/topics上分区状态转变监听器

  5. registerDeleteTopicListener/deregisterDeleteTopicListener —— 分别注册和收回zookeeper的/admin/delete_topics下的topic删除的监听器

  6. registerPartitionChangeListener/deregisterPartitionChangeListener —— 分别登记和收回zookeeper的/brokers/topics/[topic]下的分区更动监听器

  7. registerListeners —— 注册处境转变监听器,别的假使翻开了topic删除的作用(设置delete.topic.enable属性为true),那么还要注册删除topic的监听器

  8. deregisterListeners —— 撤除状态转变监听器以及各种topic的分区改动监听器,清空topic-》分区改换监听器的缓存。类似地,假如翻开了topic删除的成效(设置delete.topic.enable属性为true),还要收回删除topic的监听器

  9. shutdown —— controller关闭时会调用该格局来关闭状态机:首先设置运维标记位为false,清空分区状态缓存并收回全部Zookeeper监听器

  10. handleStateChanges —— 分区转移监听器会调用该办法为一组分区进行景况转换,具体做法正是遍历分区会集,为每三个分区域地质调查用handleStateChange方法设置分区状态,最终发送LeaderAndIsr央浼以及创新元数据诉求给broker

kafka controller的骨干类,有一千多行代码。大家先来看其定义的有个别主要字段:

一、PartitionLeaderSelector.scala

replicaStateMachine:副本状态机实例

ReplicaStateMachine类与PartitionStateMachine类的代码结构很像,大家依旧贰个个地深入分析其定义的字段和方式:

isTopicDeletionInProgress:决断给定的topic是或不是处张巍在被剔除的气象,具体方法就是判断那几个topic是不是有未有被去除的副本

类方法

topicsIneligibleForDeletion:动态维护的可变不可删除的topic集结

21. deregisterReassignedPartitionsIsrChangeListeners:撤除全数正在打开别本分配的分区的IS中华V退换监听器

3. isActive:判定当前broker是还是不是是controller。假诺其包括的光景文中的锦绣前程管理器不为空表明正是当下controller

onTopicDeletion:topic删除线程会调用该格局并传递一组待删除的topic。该方法调用onPartitionDeletion回调方法来管理全体待删除的分区。具体逻辑如下:

  • 率先将那些旧的别本都设置为OfflineReplica状态,并从IS中华V中移除
  • 下一场设置它们状态为ReplicaDeletionStarted,发送截至别本的吩咐给那几个别本
  • 最终设置它们状态分别为ReplicaDeletionSuccessful和NonExistentReplica把它们从分区的A奥迪Q5中移除
  • 保存待删除topic别本(哪怕独有贰个)的broker宕机
  • 正值重新分配topic的分区别本
  • 正在进展topic的preferred分区别本公投——所谓的preferred别本,譬喻四个分区的别本列表是[1, 5, 9],那么节点1正是preferred别本,因为它在别本列表中最初出现

22. updateLeaderEpochAndSendRequest:更新给定分区的leader的epoch值并发送LeaderAndIsr央求给leader。具体逻辑如下:

Controller上下文类定义的议程如下:

38. onPreferredReplicaElection:为钦点分区举办preferred别本leader选举。做法正是将具有目的分区状态置于OnlinePartition,并应用preferredReplicaPartitionLeaderSelector举办大选。

29. SessionExpirationListener嵌套类:监察和控制zookeeper中会话过期的监听器。当前的兑现中哪些都不做,因为zookeeper客商端会重连

  • 寻觅那二个要扩充preferred副本leader公投的分区列表
  • 把controller缓存的当下正值开展的preferred别本公投的分区从上一步获得的分区列表中去掉,并过滤出那三个所属topic要被删去的分区
  • 倘使得到的分区集结不为空记录二个荒谬,并从最终要开展大选的分区集合中去除掉
  • 调用onPreferredReplicaElection方法进行preferred别本大选

controllerContext:controller上下文音信,首要缓存了比非常多broker、topic、分区及别本音讯

该object首要定义了一个措施:

五、ControllerChannelManager.scala

除非当全部的别本都远在TopicDeletionSuccessful状态后,删除topic的线程会标志topic为"已成功删除"。之后线程会开启topic删除销毁形式,即从controller的上下文新闻以及zookeeper中去除全数的topic状态。这是zk中路径/brokers/topics/[topic]被删去的当世无双尺度。但如果具有别本虽已不是TopicDeletionStarted状态但最少有一个是TopicDeletionFailed状态,那么线程就能标识topic要求开展重试删除操作。

startReplicaDeletion:onPartitionDeletion回调方法会调用该方法。它是剔除topic的第二步,第一步是给持有broker发送UpdateMetadata央求让它们结束为那么些topic提供劳动。该方法会将待删除topic放入贰个甩卖列表中。只要该列表不为空,就不会尝试重试列表中的topic。当以下情状发生时会从该列表中移除topic:

KafkaController object

33. startup:运营controller模块的艺术。该方法并不会假如当前的broker正是controller,它只是是注册三个zookeeper会话过期监听器并运转controller的leader选举

  • 撤废全部zk上的监听器,满含reassignedPartitionListener和preferredReplicaElectionListener
  • 闭馆topic删除管理器
  • 关门分区状态机和别本状态机,关闭通道管理器
  • 重设controller context,并重置broker状态
  • 不断地去刷新获取zookeeper中该分区的leader和IS奥迪Q3消息
  • 一经音信空中楼阁则脱离形式,不然剖断一下epoch值是不是比最近controller的epoch值大,如若是表达存在别的controller,那么抛出相当;纵然否的话还索要判断一下加以别本是或不是在ISPRADO中
  • 若存在,如若该别本照旧leader别本的话就安装新的leader是-1,然后创设四个新的ISLacrosse(去掉了该别本)
  • 万一那么些新的IS福睿斯为空表达要刨除的副本已经是ISRubicon中最后一个了,若无开启unclean leader公投,那么就亟须保留那一个副本以便让它今后可以产生leader
  • 使用新的leader、ISCRUISER和controller_epoch值更新zookeeper然后回来更新结果
  • 如若第二步最终检查手续中开采该别本不在ISGL450中那么就利用原本的音信更新controller的缓存
  • 末段回来更新过的leader、IS福特Explorer和controller_epoch值

isTopicQueuedUpForDeletion:判别是或不是足以去除给定的topic——具体做法正是剖断其是还是不是在待删除topic集结中

8. maybTriggerPartitionReassignment:或者会触发分区扩充别本重分配。做法正是遍历controller上下文新闻保存的全部待别本重分配的分区列表

9. startChannelManager:遵照卡夫卡Config创制八个新的ControllerChannelManager并运行它

  1. handleDataDeleted: 空方法

resumeDeletionForTopics:一旦有回涨topic删除操作的平地风波驾临就能够调用该方式。那个事件饱含:1. 新broker启动,然后待删除topic队列中topic的妄动别本都足以被去除;2. 分区重分配完了;3. preferred别本公投完毕。具体做法就是寻觅给定topic集结与待删除topic集结的插花,假如那几个交集不为空,将它们从不足删除topic会集中删除并调用resumeTopicDeletionThread方法唤醒线程起始拍卖topic删除

  1. updateLeaderEpoch:不会修改leader或ISRAV4,只是递增leader的epoch值

16. updateAssignedReplicasForPartition:依据传入的别本分配新闻更新zookeeper中对应topic的副本分配记录

removeBroker —— 把broker从broker状态音信缓存移除

  • 遍历LeaderAndIsr央浼队列,获取到每种乞求的对象broker以及当前broker列表中是leader的broker会集,依据它们塑造新的LeaderAndIsr诉求。
  • 调用controller的sendRequest发送LeaderAndIsr央浼,内定不须求response
  • 清空LeaderAndIsr央求队列
  • UpdateMetadata央求队列也是类似。遍历UpdateMetadata诉求队列,寻觅每条必要要发放的对象broker以及呼吁中分区状态新闻
  • 利用这个消息创立新的UpdateMetadata央求,然后发送给对应的broker
  • 清空UpdateMetadata须要队列
  • 遍历StopReplica乞求队列,为每一个目的broker寻找这么些要刨除的别本以及不要删除的别本来打印出来
  • 遍历各种目标broker对应的别本音信列表,为各样StopReplicaInfo创制三个StopReplicaRequest并发送出去
  • 清空StopReplicaRequestMap必要队列

20. registerPreferredReplicaElectionListener/deregisterPreferredReplicaElectionListener:注册/取消zookeeper上对/admin/preferred_replica_election的监听

5. watchIsrChangesForReassignedPartition:注册IS昂科雷更动监听器监听zookeeper上/brokers/topics/[topic]/partitions/[partitionId]/state节点的更动以博得leader及ISKoleos的浮动

1. handleDataChange:当有分区须求改变leader为preferred别本,具体逻辑如下:

  1. onControllerFailover:zookeeper leader公投器调用该措施大选当前broker为新的controller。具体逻辑如下:

15. removePartitionsFromPreferredReplicaElection:把给定的多少个分区从preferred leader别本公投缓存中移除

  1. sendRequestsToBrokers:发送央浼给broker。具体逻辑如下:
  • 等候触发、重启或终止topic删除的风浪光临
  • 只要线程当前从未运行,直接退出
  • 对于每贰个要被剔除的topic来讲,要是全体副本都已经被成功删除(都地处ReplicaDeletionSuccessful),那么直接调用completeDeleteTopic方法成功topic删除,首假使清理zookeeper以及controller缓存中的topic音信
  • 比如还应该有别本未有被剔除,忽略这次操作,只是简短地打字与印刷那个未被去除的别本
  • 只要尚有别本在剔除过程中败诉了,那么就调用markTopicForDeletionRetry方法重试别本删除
  • 拍卖完全数别本之后需求看清一下topic当前是不是足以被删除,就算是,直接调用onTopicDeletion方法开启删除操作
  • 依照给定的待重分配别本的分区上下文音讯获收取要举办重复进行抽成的别本会集,并从当中得到出当下可用的副本集结
  • 获得controller中保存的该topic分区的具有别本。假使当前已分配的别本和要拓宽分红的新别本一模一样,则抛出十一分表明不用分配了
  • 要不,先推断一下待分配的新别本集结中是不是有不可用别本,若无则注册一下ISRubicon监听器并标识topic不能被去除,最后调用onPartitionReassignment方法重新分配分区的别本
  • 若果有不可用别本,抛出十二分表明不能够开端本次副本重分配操作

completeReplicaDeletion:甘休别本须求的响应中一旦没错误,那么其回调方法会调用该形式。该方法会将那些被删除的别本置于ReplicaDeletionSuccessful状态并提示topic删除线程删除topic——当然前提是topic的具备别本都已经被成功删除。具体逻辑便是从给定的别本会集中挑出那个早就办好删除希图的别本来,然后设置它们的情形,最终提示删除线程来实行删除操作

治本topic删除的状态机,具体逻辑如下:

parseControllerId —— 解析controller id,给定的json串是从zookeeper的/controller节点的数码,类似于{"version":1,"brokerid":0,"timestamp":"1431655574471"}

19. registerReassignedPartitionsListener/deregisterReassignedPartitionsListener:注册/取消zookeer上对/admin/reassign_partitions的监听

32. sendRequest:调用通道管理器的sendRequest方法给给定的broker发送给定的request

以下意况下苏醒topic删除操作:

  • 检核对象分区是不是正在开展重分配,如若不是的话一向退出模式
  • 要不,从zookeeper中再一次读取leader和IS奥迪Q3新闻(因为zookeeper顾客端回调方法并不会回来Stat对象)
  • 比如不设有leader和IS帕杰罗音信,直接抛出极其表明分区一纸空文,不然检查下ISEnclave中是或不是有到场新的别本
  • 只要未有投入新的别本则一而再分区重分配操作;不然记录下这种场馆就能够
  • 为每一个传入的broker寻觅相应的StopReplicaRequestInfo集结
  • 开创StopReplicaRequestInfo实例(依据回调方法是不是为空创造特定版本的)然后加入到相应brokerId的StopReplicaRequestInfo集合中

PartitionsReassignedListener类

第二个类是ControllerBrokerRequestBatch。Kafka会缓存一群须求一齐发送,因而该类首要定义了三类央浼的批次队列: leaderAndIsrRequestMap、stopReplicaRequestMap和updateMetadataRequestMap。该类定义的方式如下:

  1. registerSessionExpirationListener:注册会话过期监听器

  2. incrementControllerEpoch:递增controller的epoch值

实际到代码来讲,这么些scala只是简短地定义了贰个类: TopicDeletionManager,其构造函数接收三个参数:

  1. brokerRequestBatch —— broker缓存的央求批次

  2. partitionReassignedListener —— 分区重分配别本的监听器

  3. preferredReplicaElectionListener —— 分区举办preferred别本leader公投的监听器

connectToBroker —— 张开底层的坦途并三番五次中元发送状态改动央求

  1. handleDataDeleted:空方法

KafkaController class

  • 封存topic别本的broker重新运转起来
  • topic的preferred别本大选完结
  • topic的分区别本重分配完了

LeaderIsrAndControllerEpoch case类

shutdown:关闭topic删除线程,然后清空全数有关缓存(满含待删除topic群集、待删除分区集合以及不能被删除的topic集合)

doWork —— 线程的主干措施,具体逻辑如下:

各样正在被删去的别本都远在以下三种情景之一:

  • 将正在被去除的broker从给定的dead broker列表中移除
  • 寻找那三个leader别本在dead broker中的全体分区,触发那几个分区的场面退换,将它们设置为OfflinePartition
  • 重新触发分区状态更动,置为OnlinePartition
  • 找寻dead broker上富有别本以及那些broker之上的近日可用别本,并将那么些别本置于OfflineReplica状态
  • 反省是还是不是存在待删除topic的别本,假使存在的话,将那一个别本置于ReplicaDeletionIneligible状态
  • 得到StopReplica乞求的响应
  • 寻觅那么些删除失利的别本,如若不是全部别本都失败,就能够收获那个被成功删除的别本,调用completeReplicaDeletion方法成功对那些别本的去除
  • 更新controller缓存的可用broker列表、全部topic列表、topic分区的别本分配记录
  • 在zookeeper中更新具备分区的leader、ISLacrosse消息
  • 翻开controller的坦途处理器
  • 初步化preferred别本大选分区、重分配别本的分区
  • 创办topic删除管理器

代表贰个topic有些分区的三个副本的数据结构

也是贰个zookeeper监听类,担当监听重分配分区、ISEscort音信的转换。同样地,它定义了七个章程:

markTopicForDeletionRetry:要是topic在待删除队列中但尚无伊始,那么就能重试删除操作。为了保险重试成功,设置相应的别本状态从ReplicaDeletionIneligible到OfflineReplica状态

粗粗的想想就是:

六、KafkaController.scala

    那几个scala文件很复杂,里面定义了众多类和object。先说Callbacks那组伴生对象。Callbacks,看名就会知道意思正是概念了回调方法类,近日卡夫卡定义了二种央浼响应的回调方法: LeaderAndIsrResponse、UpdateMetadataResponse以及StopReplicaResponse。而Callbacks object充作了工厂类的方法定义了build方法用于创设二种回调方法。

3. addLeaderAndIsrRequestForBrokers:增添broker与相应分区状态消息的映照到controller的缓存中。具体逻辑如下:

  • 使用OAR和RAR更新zookeeper中的AR记录
  • 为OA凯雷德+RA途锐中的每一个别本发送LeaderAndIsr央求,首假诺透过强制更新zookeeper中的leader_epoch值来形成
  • 起步新的RA福特Explorer-OARubicon中的副本,正是将它们置于NewReplica状态
  • 伺机RAMurano中有所别本都与leader同步
  • 将RAHaval中颇负副本都设置为OnlineReplica转啊柜台
  • 立异缓存,设置ACalifornia T为RA昂科威
  • 设若leader别本不在RA奇骏中,从RATiggo中选出多个新的leader。如若是从RA帕杰罗中选出的leader则向来发送LeaderAndIsr,不然更新zookeeper中的leader_epoch然后再发送LeaderAndIsr乞求。不管如何,LeaderAndIsr央求中的A帕杰罗都将被赋值为RALX570,那足以阻挡leader将(RAEvoque - OA瑞鹰)的别本重新扩展加会IS途乐中
  • 将装有(OA瑞鹰-RAENCORE)中的别本都置于OfflineReplica状态,在zookeeper中更新ISEvoque使之去掉(OAENVISION-RA瑞虎),然后只向leader发送LeaderAndIsr哀告告知它IS景逸SUV的更新。之后发送StopReplica诉求给全数OA福特Explorer-RA昂科拉中的副本
  • 出殡StopReplica伏乞将全数OAENCORE-RA奔驰G级中的别本都活动到NonExistentReplica状态,并剔除磁盘上的别本数据
  • 更新Zookeeper中的AR为RAR
  • 更新/admin/reassign_partitions数据移除该分区数据
  • 推选leader之后,别本和IS揽胜极光消息都浮动了,因而须求重新发送UpdateMetadata诉求给每种broker

8. offlinePartitionSelector/reassignedPartitionLeaderSelector/preferredReplicaPartitionLeaderSelector/controlledShutdownPartitionLeaderSelector —— 创立了各样分区leader公投器

isTopicIneligibleForDeletion:剖断给定的topic是还是不是处在不可删除状态

    再来七个类:RequestSendThread——央求发送线程。定义的办法如下:

  • 获得待删除topic集结的全部分区
  • controller发送更新元数据央求将那些分区的leader消息发给broker以便让broker不要再管理与待删除topic相关的数码
  • 鲁人持竿topic将分区别本分配记录分组,然后为每种分区域地质调查用onPartitionDeletion来删除分区

34. onBrokerFailure:分区别本状态机的broker监听器会调用该回调方法,用于监听failed的broker。该形式首要做了一晃三件事情:1. 标志failed leader的分区为Offline状态;2. 为具有New/Offline状态的分区触发OnlinePartition状态转换;3.各样failed 的broker的别本都设置为OfflineReplica。必要注意的是,该方法并未刷新Leader和IS奥迪Q5缓存,主若是因为分区状态机在稍后为New/Offline分区变为Online实行leader大选时会刷新缓存。该情势具体逻辑如下:

实际逻辑如下:

partitionsToBeDeleted:动态维护的可变待删除分区集结

Kafkacontroller的上下文消息类,构造函数接收七个用于连接zookeeper的字段。该类定义的类字段如下:

topicsToBeDeleted:动态维护的可变待删除topic会集

  • topic被成功删除
  • topic存在ReplicaDeletionIneligible状态的
  • 假设topic已入队列但并未有在进展中会重试topic删除,之后topic的具备别本都被置于ReplicaDeletionStarted状态。随后controller发送结束别本(并剔除)央浼。
  1. handleDataChange:通过命令行发起的分区重分配时调用,具体逻辑如下:

isTopicEligibleForDeletion:当出现以下情状时允许重试topic删除:1. topic删除未到位;2. topic删除当前未开端;3. topic当前被标识为不可删除

  • TopicDeletionStarted —— 调用onPartitionDeletion回调方法之后别本步向到删除topic阶段,常常都以由controller上监听zk中/admin/delete_topics的监听器触发。作为气象转变的一片段,controller会发送StopReplicaRequests诉求给全体的别本。即使deletePartition设置为true,controller会为StopReplicaResponse注册四个回调方法并在接收删除别本的响应时调用它
  • TopicDeletionSuccessful —— 若响应中无不当,deleteTopicStopReplicaCallback会退换别本状态从TopicDeletionStarted到TopicDeletionSuccessful
  • TopicDeletionFailed —— 若响应中有错误,deleteTopicStopReplicaCallback会更改副本状态从TopicDeletionStarted到TopicDeletionFailed。假若一个broker宕机了但其上保存了正在被去除的topic的别本,kafka会标志相应的副本状态为TopicDeletionFailed。那样做的开始和结果是——借使八个broker在悬停别本央浼发出此前但在别本踏向到TopicDeletionStarted之后宕机,那么就有不小可能率出现别本被漏洞比很多地定格在TopicDeletionStarted状态,这样broker重新运维以往也不会重试topic删除操作了。

假使上述标准中自由一个标准满足,kafka都会记录三个荒谬并将该分区从待重分配分区列表中移除。该类定义了三个措施:

  • 从zookeeper的/admin/reassign_partitions节点下读取分区重分配音信
  • 从该部分消息中过滤出从未伊始重分配的分区列表(与controller中缓存的正在进行重分配分区对比)
  • 遍历每一个供给打开重分配别本的分区,先判定其所属的topic是不是要求开展删减。尽管topic须要张开删减,则跳过分区重分配操作,并将其从重分配副本分区列表中移除。假若topic不要求张开删除,构造三个ReassignedPartitionsContext传递给controller发起重分配别本的操作

对分区重新分配别本,除非:1. 分区曾经存在过;2. 新的别本与已存在的别本同样;3. 新副本集结中随机二个别本已挂掉

1. onNewPartitionCreation:那么些回调方法在topic创立回调方法中被调用,具体逻辑如下:

PreferredReplicaElectionListener类

6. initiateReassignReplicasForTopicPartition:为给定topic分区重新分配别本,具体逻辑如下:

  1. areReplicasInIsr:判别一组别本是不是在有个别topic分区的IS索罗德中
  1. newBatch: 一个检查评定方法,确定保证上述三类央浼批次为空,如若不为空的话抛出拾贰分
  • 从队列中收获第一项,并收获对应的伏乞及回调
  • 品味发送央浼(broker宕机一段时间了,那么controller的zk监听器ChannelConfig保存了channel的具有配置且能够动态修改,假设发送不成事,重连broker并休憩300ms然后重试
  • 收获相应档案的次序的响应,就是LeaderAndIsr、StopReplica如故UpdateMetadata的乞求
  • 调用特定须要的回调
  • 从zookeeper的/admin/preferred_replica_election节点下读取处那个正在开展的preferred别本大选的分区集结
  • 自己议论它们是或不是早就到位或所属topic是或不是已被删除
  • 立异controller的缓存把这么些分区会集参预到缓存中(但不包罗上一步中的那多少个分区)

failReplicaDeletion:假诺保留有待删除topic别本的broker宕机或终止别本央求的响应中有不当的时候会调用该方法。该方法会将加以的replica列表中的全数别本设置为ReplicaDeletionIneligible状态,所属的topic也会参与到待删除topic集结中。即使topic全部别本都成功地响应了去除别本的呼吁并付诸响应,那么就提醒topic删除线程来重试topic删除

25. initializeTopicDeletion:正是找寻那么些急需删除的topic集合以及不可能被删去的topic会集,然后利用它们创造三个topic删除管理器

  • 从zookeeper的/admin/reassign_partitions下读取正在扩充副本分配操作的分区会集
  • 自小编争辨这几个分区,寻找那多个已经完成的也许topic已被剔除的分区
  • 遍历上一步中获得的分区集合,删除缓存中对应分区的记录
  • 其后构建筹算举办别本分配的分区会集然后把它们步向到controller的对应缓存中
  • 更新给定分区的leader_epoch值。要是更新失利记录一个荒谬申明无法发送LeaderAndIsr须求然后退出
  • 要不构造贰个新的LeaderAndIsr要求并出席到LeaderAndIsr发送批次中
  • 发送批次中的LeaderAndIsr必要,将新分配的别本发给分区的leader

resumeTopicDeletionThread:唤醒删除topic线程管理topic删除

isDeleteTopicEnabled:由属性delete.topic.enable钦命是还是不是帮忙删除topic,假使关闭该项,试行删除topic的下令无任何功效

startRequestSendThread —— 运行相应broker的呼吁发送线程

为zookeeper中/admin/preferred_replica_election下给定的分区列表初步preferred别本leader公投。定义的七个点子如下:

OA兰德本田UR-V —— 重分配前的别本列表

10. updateLeaderAndIsrCache:从zookeeper中创新controller保存的分区leader、isr和controller_epoch缓存

markTopicIneligibleForDeletion:以下条件发生时会终止topic的删除:1. 别本挂掉了;2. topic的少数分区正在进行重新分配;3. topic的一点分区正在进展preferred别本大选。该方法会将一部分topic标志为不可删除(具体做法正是立异不可删除topic集结),假诺给定的topic不满足条件则什么都不做

本文由9159.com发布于编程,转载请注明出处:  该文件中还定义了五种leader选举器,也会执行

关键词:

上一篇:没有了
下一篇:没有了