【云小课】应用平台第6课 命令行管理Kafka的Topic,方便又实用

网友投稿 697 2022-05-29

Kafka是一款非常热门的消息中间件,它的消息生产与消费都围绕消息主题(Topic)进行:生产者将消息发送给Topic,消息存储在Topic的某个分区,消费者不断从Topic拉取未读消息进行处理。

华为云Kafka服务在控制台提供Topic的部分管理功能,包括创建Topic、删除Topic,以及Topic的老化时间、持久化与消息同步复制等功能设置。

在这里,小编向新手同学介绍Kafka提供的命令行工具,命令行工具相对于云服务控制台,还提供了Topic的参数自定义、分区与副本数的增加等控制台没有提供的功能。

【云小课】应用平台第6课 命令行管理Kafka的Topic,方便又实用

命令行工具准备

JDK环境配置

命令行工具依赖Java环境,需要先安装JDK。以下介绍均以Linux环境为例。

JDK推荐使用Java Development Kit 1.8及以上版本。小编在华为云对象存储(OBS)桶里存了一份JDK,供大家直接下载。Linux系统下可使用如下命令获取:

wget https://407154.obs.cn-south-1.myhuaweicloud.com/jdk-8u231-linux-x64.tar.gz

JDK下载并解压后可直接使用,但建议配置环境变量。小编的配置方式如下:

步骤 1 在环境变量参数中增加JAVA信息:vim ~/.bash_profile

补充如下内容:

export JAVA_HOME=/root/jdk1.8.0_231        export PATH=$JAVA_HOME/bin:$PATH

步骤 2 使环境信息生效:source ~/.bash_profile

环境变量配置完成后,使用java -version验证环境变量是否生效,版本号应与自定义的JDK一致。

下载Kafka命令行工具

Kafka安装包自带了命令行工具,推荐使用2.3.0以上版本,因为低版本的Kafka,Topic管理工具仅支持连接Kafka的Zookeeper组件,不支持直连Broker节点。

客户端下载并解压后即可使用。

wget https://archive.apache.org/dist/kafka/2.6.0/kafka_2.13-2.6.0.tgz

Kafka提供了服务管理、Topic管理、消息管理、消息迁移等30多个小工具,可以通过man手册或者使用 --help选项了解工具的具体使用方法。以下我们重点介绍Topic管理相关的工具kafka-topics.sh(Windows下对应kafka-topics.bat)。

Kafka实例准备

命令行工具客户端与环境准备了,还差Kafka服务:)如果您是华为云Kafka新用户,华为云Kafka在华南-广州区域为您提供了Kafka免费试用版本,省去环境搭建的繁琐。

如下图所示,登录华为云控制台后,找到分布式消息服务Kafka,右上角有一个“免费领取Kafka实例”的快捷入口。注意区域切换到“华南-广州”。

为了方便本地客户端访问,在试用版的Kafka实例创建完成后,我为其开启了公网访问。在实例基本信息中,可查看访问地址。

Topic管理

到现在为止,Kafka实例、命令行工具都已搭建好。以下介绍如何使用命令行工具kafka-topic.sh管理Kafka的Topic。kafka-topics.sh支持创建、查询、删除Topic,以及Topic的各类参数管理。

1.创建Topic

kafka-topics.sh --create --topic {topic_name} --bootstrap-server {broker_ip}:{port} --partitions {partition_num} --replication-factor {replication_num} --config conf_name1=conf_value1 --config conf_name2=conf_value2

create:创建Topic

bootstrap-server:Kafka的broker节点,Kafka集群有多broker节点,这里填1个即可。

partitions:Topic分区数量

replication-factor: 副本数,副本数量不能超过broker节点数量

config:Topic级别参数配置,以“名称=值”的形式定义每一个参数

从华为云控制台创建,部分Topic的配置参数不能自定义,如果您需要自定义这些配置参数,则可以尝试通过命令行工具创建。

以下列出部分常用的配置参数:

cleanup.policy

消息强制持久化(fsync)的分区最大消息数量,比如此选项设置为10,则内存中某个分区达到10条消息则强制持久化到磁盘。

此参数设置需谨慎,过大则每次持久化的时间较长,且遇到故障后可能丢失的消息更多,过小则持久化频繁则容易导致整体的客户端请求有一定延迟。

2.查询Topic列表

kafka-topics.sh --list --bootstrap-server {broker_ip}:{port}

list:查询Topic列表,返回所有topic名称

3.查看Topic的详细信息

kafka-topics.sh --describe --topic {topic_name} --bootstrap-server {broker_ip}:{port}

describe:查看Topic详情。

返回信息包括Topic名称、分区数、副本数、Topic级别的配置信息,以及每一个分区的Leader副本所在broker节点等信息。

4.为Topic增加分区数、副本数等。

Kafka服务的控制台界面目前没有开放Topic的分区和副本数的修改功能,那么您可以通过命令行工具自助完成。

kafka-topics.sh --alter --partitions {partition_num} --bootstrap-server {broker_ip}:{port}  --topic {topic_name}

alter:对Topic进行配置变更。

变更命令需同时带上Topic的某个参数,如分区数或者副本数。注意,修改分区数或副本数时,只能增加,不能减少。

5.删除Topic

如果确定不再使用某个主题,最好的方式是删除,这样可以释放一些资源,比如磁盘空间、文件句柄等。

kafka-topics.sh --delete --topic {topic_name} --bootstrap-server {broker_ip}:{port}

delete:删除Topic。

注意,删除主题操作不可逆,一旦删除,与该主题相关的所有消息数据会被全部删除。

6.修改Topic的配置

命令行工具使用broker节点地址修改Topic的配置时,存在bug,需要连接zookeeper地址才可以完成。由于华为云Kafka考虑稳定性等原因,没有开放zookeeper,因此,小编建议您通过登录Kafka-manager修改Topic的配置。将在下一期云小课中进行介绍。

云小课 分布式消息服务 Kafka

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:《Python大规模机器学习》— 2.4 数据流的特征管理
下一篇:如何利用OpenCV寻找轮廓的中心?
相关文章