阿里四面:kafka何时、如何删除Topic?

网友投稿 753 2022-05-28

Kafka有很多状态机和管理器,如Controller通道管理器ControllerChannelManager、处理Controller事件的ControllerEventManager等。这些管理器和状态机,大多与各自“宿主”联系密切。就如Controller这俩管理器,必须与Controller组件紧耦合,才能实现各自功能。

Kafka还有一些状态机和管理器,具有相对独立的功能框架,不严重依赖使用方,如:

TopicDeletionManager(主题删除管理器)

负责对指定Kafka主题执行删除操作,清除待删除主题在集群上的各类“痕迹”。

ReplicaStateMachine(副本状态机)

负责定义Kafka副本状态、合法的状态转换,以及管理状态之间的转换。

PartitionStateMachine(分区状态机)

负责定义Kafka分区状态、合法的状态转换,以及管理状态之间的转换。

本文看看Kafka是如何删除一个主题的。

前言

以为成功执行kafka-topics.sh --delete命令后,主题就会被删除。这种不正确的认知会导致经常发现主题没被删干净。于是,网传终极“武林秘籍”:手动删除磁盘上的日志文件,手动删除ZooKeeper下关于主题的各节点。但我不推荐这么干:

并不完整

除非你重启Broker,否则,这套“秘籍”无法清理Controller端和各个Broker上元数据缓存中的待删除主题的相关条目

并没有被官方所认证,后果自负

与其琢磨删除主题失败之后怎么自救,还是研究Kafka到底如何执行该操作。TopicDeletionManager.scala包括:

阿里四面:kafka何时、如何删除Topic?

DeletionClient接口:负责实现删除主题以及后续的动作

如更新元数据

ControllerDeletionClient类:实现DeletionClient接口的类,分别实现了刚刚说到的那4个方法。

TopicDeletionManager类:主题删除管理器类

定义方法维护主题删除前后集群状态的正确性。如,何时删除主题、何时主题不能被删除、主题删除过程中要规避哪些操作等

DeletionClient接口及实现

删除主题,并将删除主题的事件同步给其他Broker。

DeletionClient接口目前只有一个实现类ControllerDeletionClient,构造器的两个字段:

KafkaController实例

Controller组件对象

KafkaZkClient实例

Kafka与ZooKeeper交互的客户端对象

API

删除主题在zk上的所有“痕迹”。分别调用KafkaZkClient的3个方法删除ZooKeeper下/brokers/topics/节点、/config/topics/节点和/admin/delete_topics/节点。

删除zk下待删除主题的标记节点。调用KafkaZkClient#deleteTopicDeletions,批量删除一组主题在/admin/delete_topics下的子节点。注意,deleteTopicDeletions这个方法名结尾的Deletions,表示/admin/delete_topics下的子节点。所以:

deleteTopic是删除主题

deleteTopicDeletions是删除/admin/delete_topics下的对应子节点

这两个方法里都有epochZkVersion字段,代表期望的Controller Epoch版本号。若使用一个旧Epoch版本号执行这些方法,zk会拒绝,因为和它自己保存的版本号不匹配。若一个Controller的Epoch<ZooKeeper中保存的,则该Controller很可能是已过期的Controller。这就是Zombie Controller。epochZkVersion字段的作用,就是隔离Zombie Controller发送的操作。

屏蔽主题分区数据变更-:取消/brokers/topics/节点数据变更的监听。

当该主题的分区数据发生变更后,由于对应zk-已被取消,因此不会触发Controller相应处理逻辑。

为何取消该-?为避免操作相互干扰:假设用户A发起主题删除,同时用户B为这个主题新增分区。此时,这两个操作就会冲突,若允许Controller同时处理这俩操作,势必会造成逻辑混乱及状态不一致。为应对这种情况,在移除主题副本和分区对象前,代码要先执行这个方法,确保不再响应用户对该主题的其它操作。

mutePartitionModifications调用unregisterPartitionModificationsHandlers,并接着调用KafkaZkClient#unregisterZNodeChangeHandler,取消zk上对给定主题的分区节点数据变更的监听。

调用KafkaController#sendUpdateMetadataRequest,给集群所有Broker发送更新请求,告诉它们不要再为已删除主题的分区提供服务:

该方法会给集群中的所有Broker发送更新元数据请求,告知它们要同步给定分区的状态。

TopicDeletionManager定义及初始化

创建TopicDeletionManager类实例

在KafkaController类初始化时被创建:

实例化了一个全新的ControllerDeletionClient对象,然后利用该对象实例和replicaStateMachine、partitionStateMachine,一起创建TopicDeletionManager实例。

KafkaServerStartable.startup()=》KafkaServer.startup()=》KafkaController.init=》TopicDeletionManager

TopicDeletionManager重要API

除了类定义和初始化,还有resumeDeletions:重启主题删除操作过程。

主题因为某些事件可能一时无法完成删除,如主题分区正在进行副本重分配等。一旦这些事件完成,主题重新具备可删除资格。就需调用resumeDeletions重启删除操作。

从元数据缓存中获取要删除主题列表,之后定义了两个空的主题列表,分别保存待重试删除主题和待删除主题

遍历每个要删除的主题,去看它所有副本的状态。如果副本状态都是ReplicaDeletionSuccessful,就表明该主题已经被成功删除,此时,再调用completeDeleteTopic方法,完成后续的操作就可以了。对于那些删除操作尚未开始,并且暂时无法执行删除的主题,源码会把这类主题加到待重试主题列表中,用于后续重试;如果主题是能够被删除的,就将其加入到待删除列表中。

最后,调用retryDeletionForIneligibleReplicas重试待重试主题列表中的主题删除操作。对待删除主题列表中的主题则调用onTopicDeletion删除。

retryDeletionForIneligibleReplicas重试主题删除:将对应主题副本的状态,从ReplicaDeletionIneligible变更到OfflineReplica。这样,后续再次调用resumeDeletions,就会尝试重新删除主题。

下面,我再用一张图来解释下resumeDeletions方法的执行流程:

resumeDeletions串联起了TopicDeletionManger中的很多方法,较关键的:

completeDeleteTopic:

onTopicDeletion:

onTopicDeletion会多次使用分区状态机,调整待删除主题的分区状态。最后调用onPartitionDeletion执行真正的底层物理磁盘文件删除。这是通过副本状态机状态转换操作完成的。

总结

在主题删除过程中,Kafka会调整集群中三个地方的数据:

ZooKeeper

删除主题时,zk上与该主题相关的所有ZNode节点必须被清除

元数据缓存

Controller端元数据缓存中的相关项,也必须要被处理,并且要被同步到集群的其他Broker上

磁盘日志文件

要清理的首要目标

这三个地方须统一处理,就好似原子操作。回想“秘籍”,它无法清除Controller端的元数据缓存项。因此,避免使用这“大招”。

DeletionClient接口主要是操作ZooKeeper,实现ZooKeeper节点的删除等操作。

TopicDeletionManager,是在KafkaController创建过程中被初始化的,主要通过与元数据缓存进行交互的方式,来更新各类数据。

Kafka ZooKeeper

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Linux 进程管理之四大名捕
下一篇:2021年大数据Spark(二十二):内核原理
相关文章