kafka系统入门教程

网友投稿 771 2022-05-29

一、Kafka简介

kafka is a distributed,partitioned,replicated commit logservice。它提供了类似于JMS的特性,但是在实现上完全不同,此外它并不是JMS规范的实现。kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例()成为broker。无论是kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息。

二、基本架构图

三、基本概念解释

1)Broker

Kafka集群包含一个或多个服务器,这种服务器被称为broker。broker端不维护数据的消费状态,提升了性能。直接使用磁盘进行存储,线性读写,速度快:避免了数据在JVM内存和系统内存之间的复制,减少耗性能的创建对象和垃圾回收。

2)Producer

负责发布消息到Kafka broker;Producer将消息发布到指定的Topic中,同时Producer也能决定将此消息归属于哪个partition;比如基于"round-robin"方式或者通过其他的一些算法等.

3)Consumer

消息消费者,向Kafka broker读取消息的客户端,consumer从broker拉取(pull)数据并进行处理。

本质上kafka只支持Topic.每个consumer属于一个consumer group;反过来说,每个group中可以有多个consumer.发送到Topic的消息,只会被订阅此Topic的每个group中的一个consumer消费.

如果所有的consumer都具有相同的group,这种情况和queue模式很像;消息将会在consumers之间负载均衡.

如果所有的consumer都具有不同的group,那这就是"发布-订阅";消息将会广播给所有的消费者.

在kafka中,一个partition中的消息只会被group中的一个consumer消费;每个group中consumer消息消费互相独立;我们可以认为一个group是一个"订阅"者,一个Topic中的每个partions,只会被一个"订阅者"中的一个consumer消费,不过一个consumer可以消费多个partitions中的消息.kafka只能保证一个partition中的消息被某个consumer消费时,消息是顺序的.事实上,从Topic角度来说,消息仍不是有序的.

4)Topic

每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)

5)Partition

Parition是物理上的概念,每个topic将被分成多个partition(区),每个partition在存储层面是append log文件

6)Consumer Group

每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)

7)Topic & Partition

Topic在逻辑上可以被认为是一个queue,每条消费都必须指定它的Topic,可以简单理解为必须指明把这条消息放进哪个queue里。为了使得Kafka的吞吐率可以线性提高,物理上把Topic分成一个或多个Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。若创建topic1和topic2两个topic,且分别有13个和19个分区,则整个集群上会相应会生成共32个文件夹(本文所用集群共8个节点,此处topic1和topic2 replication-factor均为1)。

partitions的目的有多个.最根本原因是kafka基于文件存储.通过分区,可以将日志内容分散到多个上,来避免文件尺寸达到单机磁盘的上限,每个partiton都会被当前broker(kafka实例)保存;可以将一个topic切分多任意多个partitions,来提高消息保存/消费的效率.此外越多的partitions意味着可以容纳更多的consumer,有效提升并发消费的能力.

当segment文件尺寸达到一定阀值时(可以通过配置文件设定,默认1G),将会创建一个新的文件;当buffer中消息的条数达到阀值时将会触发日志信息flush到日志文件中,同时如果"距离最近一次flush的时间差"达到阀值时,也会触发flush到日志文件.如果broker失效,极有可能会丢失那些尚未flush到文件的消息.因为意外实现,仍然会导致log文件格式的破坏(文件尾部),那么就要求当server启东是需要检测最后一个segment的文件结构是否合法并进行必要的修复.

获取消息时,需要指定offset和最大chunk尺寸,offset用来表示消息的起始位置,chunk size用来表示最大获取消息的总长度(间接的表示消息的条数).根据offset,可以找到此消息所在segment文件,然后根据segment的最小offset取差值,得到它在file中的相对位置,直接读取输出即可.

日志文件的删除策略非常简单:启动一个后台线程定期扫描log file列表,把保存时间超过阀值的文件直接删除(根据文件的创建时间).为了避免删除文件时仍然有read操作(consumer消费),采取copy-on-write方式.

8、分配

kafka使用zookeeper来存储一些meta信息,并使用了zookeeper watch机制来发现meta信息的变更并作出相应的动作(比如consumer失效,触发负载均衡等)

Broker node registry: 当一个kafka broker启动后,首先会向zookeeper注册自己的节点信息(临时znode),同时当broker和zookeeper断开连接时,此znode也会被删除.

格式: /broker/ids/[0…N] -->host:port;其中[0…N]表示broker id,每个broker的配置文件中都需要指定一个数字类型的id(全局不可重复),znode的值为此broker的host:port信息.

Broker Topic Registry: 当一个broker启动时,会向zookeeper注册自己持有的topic和partitions信息,仍然是一个临时znode.

格式: /broker/topics/[topic]/[0…N] 其中[0…N]表示partition索引号.

Consumer and Consumer group: 每个consumer被创建时,会向zookeeper注册自己的信息;此作用主要是为了"负载均衡".

一个group中的多个consumer可以交错的消费一个topic的所有partitions;简而言之,保证此topic的所有partitions都能被此group所消费,且消费时为了性能考虑,让partition相对均衡的分散到每个consumer上.

Consumer id Registry: 每个consumer都有一个唯一的ID(host:uuid,可以通过配置文件指定,也可以由系统生成),此id用来标记消费者信息.

格式:/consumers/[group_id]/ids/[consumer_id]

仍然是一个临时的znode,此节点的值为{“topic_name”:#streams…},即表示此consumer目前所消费的topic + partitions列表.

Consumer offset Tracking: 用来跟踪每个consumer目前所消费的partition中最大的offset.

格式:/consumers/[group_id]/offsets/[topic]/[broker_id-partition_id]–>offset_value

此znode为持久节点,可以看出offset跟group_id有关,以表明当group中一个消费者失效,其他consumer可以继续消费.

Partition Owner registry: 用来标记partition被哪个consumer消费.临时znode

格式:/consumers/[group_id]/owners/[topic]/[broker_id-partition_id]–>consumer_node_id当consumer启动时,所触发的操作:

A) 首先进行"Consumer id Registry";

B) 然后在"Consumer id Registry"节点下注册一个watch用来监听当前group中其他consumer的"leave"和"join";只要此znode path下节点列表变更,都会触发此group下consumer的负载均衡.(比如一个consumer失效,那么其他consumer接管partitions).

C) 在"Broker id registry"节点下,注册一个watch用来监听broker的存活情况;如果broker列表变更,将会触发所有的groups下的consumer重新balance.

9、生产者消费者通信图

Producer端使用zookeeper用来"发现"broker列表,以及和Topic下每个partition leader建立socket连接并发送消息.

Broker端使用zookeeper用来注册broker信息,以及监测partition leader存活性.

Consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partition leader建立socket连接,并获取消息

六、主要配置

1.Broker配置

/usr/local/software/kafka/kafka_2.11-1.1.0/config/server.properties

#每一个boker都有一个唯一的id作为它们的名字。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响consumers的消息情况

broker.id=0

#broker server服务端口

port=9092

#broker的主机地址,若是设置了,那么会绑定到这个地址上,若是没有,会绑定到所有的接口上,并将其中之一发送到ZK

#host.name=

#broker处理消息的最大线程数,一般情况下数量为cpu核数

num.network.threads=3

#处理IO的线程数

num.io.threads=8

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600

#kafka数据的存放地址,多个地址的话用逗号分割,多个目录分布在不同磁盘上可以提高读写性能  /data/kafka-logs-1,/data/kafka-logs-2

log.dirs=/tmp/kafka-logs

#默认分区数

num.partitions=2

num.recovery.threads.per.data.dir=1

offsets.topic.replication.factor=1

transaction.state.log.replication.factor=1

transaction.state.log.min.isr=1

#控制一个log保留多长个小时

log.retention.hours=168

#单一的log segment文件大小

log.segment.bytes=1073741824

log.retention.check.interval.ms=300000

#是否log cleaning

log.cleaner.enable=false

#指定zookeeper连接字符串, 格式如hostname:port/chroot。chroot是一个namespace

zookeeper.connect=localhost:2181

#连接zk的session超时时间

zookeeper.connection.timeout.ms=6000

group.initial.rebalance.delay.ms=0

2.消费者主要配置

bootstrap.servers=localhost:9092

#当前消费者的Group名称,需要指定

group.id=test-consumer-group

3.生产者的主要配置

bootstrap.servers=localhost:9092

compression.type=none

以上是关于kafka一些基础说明,在其中我们知道如果要kafka正常运行,必须配置zookeeper,否则无论是kafka集群还是的生存者和消费者都无法正常的工作的,以下是对zookeeper进行一些简单的介绍:

kafka系统入门教程

七、Zookeeper集群

zookeeper是一个为分布式应用提供一致性的服务的软件,它是开源的Hadoop项目的一个子项目,并根据google发表的一篇论文来实现的。zookeeper为分布式系统提供了高效且易于使用的协同服务,它可以为分布式应用提供相当多的服务,诸如统一命名服务,配置管理,状态同步和组服务等。zookeeper接口简单,我们不必过多地纠结在分布式系统难于处理的同步和一致性问题上,你可以使用zookeeper提供的现成(off-the-shelf)服务来实现来实现分布式系统额配置管理,组管理,Leader选举等功能。

Zookeeper安装方式有三种,单机模式和集群模式以及伪集群模式。

■ 单机模式:Zookeeper只运行在一台服务器上,适合测试环境;

■ 伪集群模式:就是在一台物理机上运行多个Zookeeper 实例;

■ 集群模式:Zookeeper运行于一个集群上,适合生产环境,这个计算机集群被称为一个“集合体”(ensemble)

Zookeeper通过复制来实现高可用性,只要集合体中半数以上的机器处于可用状态,它就能够保证服务继续。为什么一定要超过半数呢?这跟Zookeeper的复制策略有关:zookeeper确保对znode 树的每一个修改都会被复制到集合体中超过半数的机器上。

关于Zookeeper的功能和工作原理可以参考:https://www.cnblogs.com/felixzh/p/5869212.html

分布式 Kafka 存储

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:加快云原生技术转型, 智能调度登陆华为云DevOps: 增速,节源
下一篇:无线节能组信标为什么会自动切换? 排查故障的过程真的像谜一样无法解释
相关文章