Kafka运维】Kafka全网最全最详细运维命令合集(精品强烈建议收藏!!!)

网友投稿 1516 2022-05-29

推荐一款非常好用的kafka管理平台,

kafka的灵魂伴侣

滴滴开源Logi-KafkaManager 一站式Kafka监控与管控平台

本文所有命令,博主均全部操作验证过,保证准确性; 非复制粘贴拼凑文章; 如果想了解更多工具命令,可在评论区留下评论,博主会择期加上;

博主正在连载 Kafka源码、Kafka运维、Kafka实践系列文章 并且相关文章会配套录制视频

本文为专栏第一篇欢迎关注

<石臻臻的杂货铺>

不迷路!!!

以下大部分运维操作,都可以使用 LogI-Kafka-Manager 在平台上可视化操作;

@[TOC]

1.TopicCommand

1.1.Topic创建

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic test

相关可选参数

1.2.删除Topic

bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic test

支持正则表达式匹配Topic来进行删除,只需要将topic 用双引号包裹起来

例如: 删除以create_topic_byhand_zk为开头的topic;

bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic “create_topic_byhand_zk.*”

.表示任意匹配除换行符 \n 之外的任何单字符。要匹配 . ,请使用 . 。

·*·:匹配前面的子表达式零次或多次。要匹配 * 字符,请使用 *。

.* : 任意字符

删除任意Topic (慎用)

bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic “.*?”

更多的用法请参考正则表达式

1.3.Topic分区扩容

zk方式(不推荐)

>bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic1 --partitions 2

kafka版本 >= 2.2 支持下面方式(推荐)

单个Topic扩容

bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic test_create_topic1 --partitions 4

批量扩容 (将所有正则表达式匹配到的Topic分区扩容到4个)

sh bin/kafka-topics.sh --topic ".*?" --bootstrap-server 172.23.248.85:9092 --alter --partitions 4

".*?" 正则表达式的意思是匹配所有; 您可按需匹配

PS: 当某个Topic的分区少于指定的分区数时候,他会抛出异常;但是不会影响其他Topic正常进行;

相关可选参数

PS: 虽然这里配置的是全部的分区副本分配配置,但是正在生效的是新增的分区;

比如: 以前3分区1副本是这样的

现在新增一个分区,--replica-assignment 2,1,3,4 ; 看这个意思好像是把0,1号分区互相换个Broker

但是实际上不会这样做,Controller在处理的时候会把前面3个截掉; 只取新增的分区分配方式,原来的还是不会变

1.4.查询Topic描述

1.查询单个Topic

sh bin/kafka-topics.sh --topic test --bootstrap-server xxxx:9092 --describe --exclude-internal

2.批量查询Topic(正则表达式匹配,下面是查询所有Topic)

sh bin/kafka-topics.sh --topic ".*?" --bootstrap-server xxxx:9092 --describe --exclude-internal

支持正则表达式匹配Topic,只需要将topic 用双引号包裹起来

相关可选参数

5.查询Topic列表

1.查询所有Topic列表

sh bin/kafka-topics.sh --bootstrap-server xxxxxx:9092 --list --exclude-internal

2.查询匹配Topic列表(正则表达式)

查询test_create_开头的所有Topic列表

sh bin/kafka-topics.sh --bootstrap-server xxxxxx:9092 --list --exclude-internal --topic "test_create_.*"

相关可选参数

2.ConfigCommand

Config相关操作; 动态配置可以覆盖默认的静态配置;

2.1 查询配置

展示关于Topic的动静态配置

1.查询单个Topic配置(只列举动态配置)

sh bin/kafka-configs.sh --describe --bootstrap-server xxxxx:9092 --topic test_create_topic

或者

sh bin/kafka-configs.sh --describe --bootstrap-server 172.23.248.85:9092 --entity-type topics --entity-name test_create_topic

2.查询所有Topic配置(包括内部Topic)(只列举动态配置)

sh bin/kafka-configs.sh --describe --bootstrap-server 172.23.248.85:9092 --entity-type topics

3.查询Topic的详细配置(动态+静态)

只需要加上一个参数--all

同理 ;只需要将--entity-type 改成对应的类型就行了 (topics/clients/users/brokers/broker-loggers)

sh bin/kafka-configs.sh --describe --bootstrap-server xxxx:9092 --version

所有可配置的动态配置 请看最后面的 附件 部分

2.2 增删改 配置 --alter

–alter

删除配置: --delete-config k1=v1,k2=v2

添加/修改配置: --add-config k1,k2

选择类型: --entity-type (topics/clients/users/brokers/broker-

loggers)

类型名称: --entity-name

--add-config

sh bin/kafka-configs.sh --bootstrap-server xxxxx:9092 --alter --entity-type topics --entity-name test_create_topic1 --add-config file.delete.delay.ms=222222,retention.ms=999999

--delete-config

sh bin/kafka-configs.sh --bootstrap-server xxxxx:9092 --alter --entity-type topics --entity-name test_create_topic1 --delete-config file.delete.delay.ms,retention.ms

类型有: (topics/clients/users/brokers/broker- loggers)

哪些配置可以修改 请看最后面的附件:ConfigCommand 的一些可选配置

3.副本扩缩、分区迁移、跨路径迁移 kafka-reassign-partitions

请戳 【kafka运维】副本扩缩容、数据迁移、副本重分配、副本跨路径迁移 (如果点不出来,表示文章暂未发表,请耐心等待)

4.Topic的发送kafka-console-producer.sh

4.1 生产无key消息

## 生产者 bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties

4.2 生产有key消息

加上属性--property parse.key=true

## 生产者 bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/producer.properties --property parse.key=true

默认消息key与消息value间使用“Tab键”进行分隔,所以消息key以及value中切勿使用转义字符(\t)

可选参数

5. Topic的消费kafka-console-consumer.sh

1. 新客户端从头消费--from-beginning (注意这里是新客户端,如果之前已经消费过了是不会从头消费的)

下面没有指定客户端名称,所以每次执行都是新客户端都会从头消费

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

2. 正则表达式匹配topic进行消费--whitelist

消费所有的topic

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist ‘.*’

消费所有的topic,并且还从头消费

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --whitelist ‘.*’ --from-beginning

3.显示key进行消费--property print.key=true

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --property print.key=true

4. 指定分区消费--partition 指定起始偏移量消费--offset

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --partition 0 --offset 100

5. 给客户端命名--group

注意给客户端命名之后,如果之前有过消费,那么--from-beginning就不会再从头消费了

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group test-group

6. 添加客户端属性--consumer-property

这个参数也可以给客户端添加属性,但是注意 不能多个地方配置同一个属性,他们是互斥的;比如在下面的基础上还加上属性--group test-group 那肯定不行

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer-property group.id=test-consumer-group

7. 添加客户端属性--consumer.config

跟--consumer-property 一样的性质,都是添加客户端的属性,不过这里是指定一个文件,把属性写在文件里面, --consumer-property 的优先级大于 --consumer.config

sh bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer.config config/consumer.properties

6.kafka-leader-election Leader重新选举

6.1 指定Topic指定分区用重新PREFERRED:优先副本策略 进行Leader重选举

> sh bin/kafka-leader-election.sh --bootstrap-server xxxx:9090 --topic test_create_topic4 --election-type PREFERRED --partition 0

6.2 所有Topic所有分区用重新PREFERRED:优先副本策略 进行Leader重选举

sh bin/kafka-leader-election.sh --bootstrap-server xxxx:9090 --election-type preferred --all-topic-partitions

6.3 设置配置文件批量指定topic和分区进行Leader重选举

先配置leader-election.json文件

{ "partitions": [ { "topic": "test_create_topic4", "partition": 1 }, { "topic": "test_create_topic4", "partition": 2 } ] }

sh bin/kafka-leader-election.sh --bootstrap-server xxx:9090 --election-type preferred --path-to-json-file config/leader-election.json

相关可选参数

7. 持续批量推送消息kafka-verifiable-producer.sh

单次发送100条消息--max-messages 100

一共要推送多少条,默认为-1,-1表示一直推送到进程关闭位置

sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092 --max-messages 100

每秒发送最大吞吐量不超过消息 --throughput 100

推送消息时的吞吐量,单位messages/sec。默认为-1,表示没有限制

sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092 --throughput 100

【kafka运维】Kafka全网最全最详细运维命令合集(精品强烈建议收藏!!!)

发送的消息体带前缀--value-prefix

sh bin/kafka-verifiable-producer.sh --topic test_create_topic4 --bootstrap-server localhost:9092 --value-prefix 666

注意--value-prefix 666必须是整数,发送的消息体的格式是加上一个 点号. 例如: 666.

其他参数:

--producer.config CONFIG_FILE 指定producer的配置文件

--acks ACKS 每次推送消息的ack值,默认是-1

8. 持续批量拉取消息kafka-verifiable-consumer

持续消费

sh bin/kafka-verifiable-consumer.sh --group-id test_consumer --bootstrap-server localhost:9092 --topic test_create_topic4

单次最大消费10条消息--max-messages 10

sh bin/kafka-verifiable-consumer.sh --group-id test_consumer --bootstrap-server localhost:9092 --topic test_create_topic4 --max-messages 10

相关可选参数

9.生产者压力测试kafka-producer-perf-test.sh

1. 发送1024条消息--num-records 100并且每条消息大小为1KB--record-size 1024 最大吞吐量每秒10000条--throughput 100

sh bin/kafka-producer-perf-test.sh --topic test_create_topic4 --num-records 100 --throughput 100000 --producer-props bootstrap.servers=localhost:9092 --record-size 1024

你可以通过LogIKM查看分区是否增加了对应的数据大小

从LogIKM 可以看到发送了1024条消息; 并且总数据量=1M; 1024条*1024byte = 1M;

2. 用指定消息文件--payload-file发送100条消息最大吞吐量每秒100条--throughput 100

先配置好消息文件batchmessage.txt

然后执行命令

发送的消息会从batchmessage.txt里面随机选择; 注意这里我们没有用参数--payload-delimeter指定分隔符,默认分隔符是\n换行;

bin/kafka-producer-perf-test.sh --topic test_create_topic4 --num-records 100 --throughput 100 --producer-props bootstrap.servers=localhost:9090 --payload-file config/batchmessage.txt

验证消息,可以通过 LogIKM 查看发送的消息

相关可选参数

10.消费者压力测试kafka-consumer-perf-test.sh

消费100条消息--messages 100

sh bin/kafka-consumer-perf-test.sh -topic test_create_topic4 --bootstrap-server localhost:9090 --messages 100

相关可选参数

11.删除指定分区的消息kafka-delete-records.sh

删除指定topic的某个分区的消息删除至offset为1024

先配置json文件offset-json-file.json

{"partitions": [{"topic": "test1", "partition": 0, "offset": 1024}], "version":1 }

在执行命令

sh bin/kafka-delete-records.sh --bootstrap-server 172.23.250.249:9090 --offset-json-file config/offset-json-file.json

验证 通过 LogIKM 查看发送的消息

从这里可以看出来,配置"offset": 1024 的意思是从最开始的地方删除消息到 1024的offset; 是从最前面开始删除的

12. 查看Broker磁盘信息

查询指定topic磁盘信息--topic-list topic1,topic2

sh bin/kafka-log-dirs.sh --bootstrap-server xxxx:9090 --describe --topic-list test2

查询指定Broker磁盘信息--broker-list 0 broker1,broker2

sh bin/kafka-log-dirs.sh --bootstrap-server xxxxx:9090 --describe --topic-list test2 --broker-list 0

例如我一个3分区3副本的Topic的查出来的信息

logDir Broker中配置的log.dir

{ "version": 1, "brokers": [{ "broker": 0, "logDirs": [{ "logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-0", "error": null, "partitions": [{ "partition": "test2-1", "size": 0, "offsetLag": 0, "isFuture": false }, { "partition": "test2-0", "size": 0, "offsetLag": 0, "isFuture": false }, { "partition": "test2-2", "size": 0, "offsetLag": 0, "isFuture": false }] }] }, { "broker": 1, "logDirs": [{ "logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-1", "error": null, "partitions": [{ "partition": "test2-1", "size": 0, "offsetLag": 0, "isFuture": false }, { "partition": "test2-0", "size": 0, "offsetLag": 0, "isFuture": false }, { "partition": "test2-2", "size": 0, "offsetLag": 0, "isFuture": false }] }] }, { "broker": 2, "logDirs": [{ "logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-2", "error": null, "partitions": [{ "partition": "test2-1", "size": 0, "offsetLag": 0, "isFuture": false }, { "partition": "test2-0", "size": 0, "offsetLag": 0, "isFuture": false }, { "partition": "test2-2", "size": 0, "offsetLag": 0, "isFuture": false }] }] }, { "broker": 3, "logDirs": [{ "logDir": "/Users/xxxx/work/IdeaPj/ss/kafka/kafka-logs-3", "error": null, "partitions": [] }] }] }

如果你觉得通过命令查询磁盘信息比较麻烦,你也可以通过 LogIKM 查看

12. 消费者组管理 kafka-consumer-groups.sh

1. 查看消费者列表--list

sh bin/kafka-consumer-groups.sh --bootstrap-server xxxx:9090 --list

先调用MetadataRequest拿到所有在线Broker列表

再给每个Broker发送ListGroupsRequest请求获取 消费者组数据

2. 查看消费者组详情--describe

DescribeGroupsRequest

查看消费组详情--group 或 --all-groups

查看指定消费组详情--group

sh bin/kafka-consumer-groups.sh --bootstrap-server xxxxx:9090 --describe --group test2_consumer_group

查看所有消费组详情--all-groups

sh bin/kafka-consumer-groups.sh --bootstrap-server xxxxx:9090 --describe --all-groups

查看该消费组 消费的所有Topic、及所在分区、最新消费offset、Log最新数据offset、Lag还未消费数量、消费者ID等等信息

查询消费者成员信息--members

所有消费组成员信息

sh bin/kafka-consumer-groups.sh --describe --all-groups --members --bootstrap-server xxx:9090

指定消费组成员信息

sh bin/kafka-consumer-groups.sh --describe --members --group test2_consumer_group --bootstrap-server xxxx:9090

查询消费者状态信息--state

所有消费组状态信息

sh bin/kafka-consumer-groups.sh --describe --all-groups --state --bootstrap-server xxxx:9090

指定消费组状态信息

sh bin/kafka-consumer-groups.sh --describe --state --group test2_consumer_group --bootstrap-server xxxxx:9090

3. 删除消费者组--delete

DeleteGroupsRequest

删除消费组–delete

删除指定消费组--group

sh bin/kafka-consumer-groups.sh --delete --group test2_consumer_group --bootstrap-server xxxx:9090

删除所有消费组--all-groups

sh bin/kafka-consumer-groups.sh --delete --all-groups --bootstrap-server xxxx:9090

PS: 想要删除消费组前提是这个消费组的所有客户端都停止消费/不在线才能够成功删除;否则会报下面异常

Error: Deletion of some consumer groups failed: * Group 'test2_consumer_group' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.

4. 重置消费组的偏移量 --reset-offsets

能够执行成功的一个前提是 消费组这会是不可用状态;

下面的示例使用的参数是: --dry-run ;这个参数表示预执行,会打印出来将要处理的结果;

等你想真正执行的时候请换成参数--excute ;

下面示例 重置模式都是 --to-earliest 重置到最早的;

请根据需要参考下面 相关重置Offset的模式 换成其他模式;

重置指定消费组的偏移量 --group

重置指定消费组的所有Topic的偏移量--all-topic

sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --all-topic

重置指定消费组的指定Topic的偏移量--topic

sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --topic test2

重置所有消费组的偏移量 --all-group

重置所有消费组的所有Topic的偏移量--all-topic

sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --all-group --bootstrap-server xxxx:9090 --dry-run --all-topic

重置所有消费组中指定Topic的偏移量--topic

sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --all-group --bootstrap-server xxxx:9090 --dry-run --topic test2

--reset-offsets 后面需要接重置的模式

相关重置Offset的模式

--from-file着重讲解一下

上面其他的一些模式重置的都是匹配到的所有分区; 不能够每个分区重置到不同的offset;不过**--from-file**可以让我们更灵活一点;

先配置cvs文档

格式为: Topic:分区号: 重置目标偏移量

test2,0,100 test2,1,200 test2,2,300

执行命令

sh bin/kafka-consumer-groups.sh --reset-offsets --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --from-file config/reset-offset.csv

5. 删除偏移量delete-offsets

以上大部分运维操作,都可以使用 LogI-Kafka-Manager 在平台上可视化操作;

滴滴开源Logi-KafkaManager 一站式Kafka监控与管控平台

技术交流

有想进

滴滴LogI开源用户群

的加我个人-

jjdlmn_

进群(备注:进群)

群里面主要交流

** kakfa**、es、agent、LogI-kafka-manager、

等等相关技术;

群内有专人解答你的问题

对~ 相关技术领域的解答人员都有; 你问的问题都会得到回应

Kafka 大数据 运维

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:我们都爱学的flink的state内容
下一篇:《Docker技术入门与实战(第3版)》——1.2 为什么要使用Docker
相关文章