kafka介绍安装

网友投稿 814 2022-05-29

Kafka介绍与安装

Apache Kafka® 是 一个分布式流处理平台,是一个发布订阅系统

Kafka适合什么样的场景?

构造实时流数据管道,它可以在系统或应用之间可靠地获取数据。 (相当于message queue

构建实时流式应用程序,对这些流数据进行转换或者影响。 (就是流处理,通过kafka stream topic和topic之间内部进行变化)

常用的场景将微服务日志输入到kafka中,通过kafka的高速缓存特性降低系统延迟,然后异步消费写入到ELK中。

kafka每条记录中包含一个key,一个value和一个timestamp(时间戳)。

Kafka有四个核心的API:

The Producer API 允许一个应用程序发布一串流式的数据到一个或者多个Kafka topic。

The Consumer API 允许一个应用程序订阅一个或多个 topic ,并且对发布给他们的流式数据进行处理。

The Streams API 允许一个应用程序作为一个流处理器,消费一个或者多个topic产生的输入流,然后生产一个输出流到一个或多个topic中去,在输入输出流中进行有效的转换。

The Connector API 允许构建并运行可重用的生产者或者消费者,将Kafka topics连接到已存在的应用程序或者数据系统。比如,连接到一个关系型数据库,捕捉表(table)的所有变更内容。

Topics

Topic 就是数据主题,是数据记录发布的地方,可以用来区分业务系统。Kafka中的Topics总是多订阅者模式,一个topic可以拥有一个或者多个消费者来订阅它的数据。

Kafka 集群保留所有发布的记录—无论他们是否已被消费—并通过一个可配置的参数——保留期限来控制. 举个例子, 如果保留策略设置为2天,一条记录发布后两天内,可以随时被消费,两天过后这条记录会被抛弃并释放磁盘空间。Kafka的性能和数据大小无关.

Kafka 消费者是非常廉价的—消费者的增加和减少,对集群或者其他消费者没有多大的影响。

分布式

日志的分区partition (分布)在Kafka集群的服务器上。每个服务器在处理数据和请求时,共享这些分区。每一个分区都会在已配置的服务器上进行备份,确保容错性.

生产者

生产者可以将数据发布到所选择的topic(主题)中。生产者负责将记录分配到topic的哪一个 partition(分区)中。可以使用循环的方式来简单地实现负载均衡,也可以根据某些语义分区函数(例如:记录中的key)来完成。

消费者

消费者使用一个 消费组 名称来进行标识,发布到topic中的每条记录被分配给订阅消费组中的一个消费者实例.消费者实例可以分布在多个进程中或者多个机器上。

如果所有的消费者实例在同一消费组中,消息记录会负载平衡到每一个消费者实例.

如果所有的消费者实例在不同的消费组中,每条消息记录会广播到所有

Kafka作为消息系统

队列的优点在于它允许你将处理数据的过程分给多个消费者实例,使你可以扩展处理过程。 不好的是,队列不是多订阅者模式的—一旦一个进程读取了数据,数据就会被丢弃。 而发布-订阅系统允许你广播数据到多个进程,但是无法进行扩展处理,因为每条消息都会发送给所有的订阅者。

与RabbitMq队列相比,kafka具有严格的顺序保证。

传统队列在服务器上保存有序的记录,如果多个消费者消费队列中的数据, 服务器将按照存储顺序输出记录。 虽然服务器按顺序输出记录,但是记录被异步传递给消费者, 因此记录可能会无序的到达不同的消费者。这意味着在并行消耗的情况下, 记录的顺序是丢失的。因此消息系统通常使用“唯一消费者”的概念,即只让一个进程从队列中消费, 但这就意味着不能够并行地处理数据。

比如 商品sku1的商品进行库存变更先后有两条记录 msg1 库存1,msg2 库存2,有多个消费者时虽然先发送msg1后发送msg2,但是并发条件下可能msg2的消费者先处理完成,msg1的消费者还在消费的情况,导致时序丢失。

kafka介绍与安装

Kafka topic中的partition是一个并行的概念。 Kafka能够为一个消费者池提供顺序保证和负载平衡,是通过将topic中的partition分配给消费者组中的消费者来实现的, 以便每个分区由消费组中的一个消费者消耗。通过这样,我们能够确保消费者是该分区的唯一读者,并按顺序消费数据。 众多分区保证了多个消费者实例间的负载均衡。但请注意,消费者组中的消费者实例个数不能超过分区的数量。

kafka可以通过记录中的key进行分区分配。

下载与安装

- http://kafka.apache.org/downloads

Binary downloads:

Scala 2.12 - kafka_2.12-2.8.0.tgz (asc, sha512)

解压文件

$ tar -xzf kafka_2.13-2.8.0.tgz

$ cd kafka_2.13-2.8.0

启动zookeeper 新版本中将不再依赖外部的zookeeper

$ bin/zookeeper-server-start.sh config/zookeeper.properties

启动kafka

$ bin/kafka-server-start.sh config/server.properties

启动成功 started (kafka.server.KafkaServer)

[2021-07-30 21:46:39,446] INFO Kafka version: 2.8.0 (org.apache.kafka.common.utils.AppInfoParser) [2021-07-30 21:46:39,446] INFO Kafka commitId: ebb1d6e21cc92130 (org.apache.kafka.common.utils.AppInfoParser) [2021-07-30 21:46:39,447] INFO Kafka startTimeMs: 1627652799443 (org.apache.kafka.common.utils.AppInfoParser) [2021-07-30 21:46:39,448] INFO [KafkaServer id=0] started (kafka.server.KafkaServer) [2021-07-30 21:46:39,495] INFO [broker-0-to-controller-send-thread]: Recorded new controller, from now on will use broker 172.20.25.8:9092 (id: 0 rack: null) (kafka.server.BrokerToControllerRequestThread)

创建一个主题

$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092

Created topic quickstart-events.

查看主题

bin/kafka-topics.sh --list --zookeeper localhost:2181

启动生产者

$ ./bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092

发送消息 hello message

启动消费者

启动多个消费者,每个消费者都可以收到消息

$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092

若想清理数据

$ rm -rf /tmp/kafka-logs /tmp/zookeeper

集群

在本级上创建3个节点

复制配置文件

cp config/server.properties config/server-1.properties

cp config/server.properties config/server-2.properties

编辑这些新文件并设置如下属性:

config/server-1.properties:

broker.id=1

listeners=PLAINTEXT://:9093

log.dir=/tmp/kafka-logs-1

config/server-2.properties:

broker.id=2

listeners=PLAINTEXT://:9094

log.dir=/tmp/kafka-logs-2

broker.id属性是集群中每个节点的名称,这一名称是唯一且永久的。我们必须重写端口和日志目录,因为我们在同一台机器上运行这些,我们不希望所有的代理尝试在同一个端口注册,或者覆盖彼此的数据。

启动两个新的节点:

> bin/kafka-server-start.sh config/server-1.properties & ... > bin/kafka-server-start.sh config/server-2.properties & ...

创建topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

查看topic

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

Topic: my-replicated-topic TopicId: U842ZMiFQAyZgdiUSJ5ICA PartitionCount: 1 ReplicationFactor: 3 Configs:

Topic: my-replicated-topic Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2

第一行给出了所有分区的摘要,下面的每行都给出了一个分区的信息。因为我们只有一个分区,所以只有一行。

“leader”是负责给定分区所有读写操作的节点。每个节点都是随机选择的部分分区的领导者。

“replicas”是复制分区日志的节点列表,不管这些节点是leader还是仅仅活着。

“isr”是一组“同步”replicas,是replicas列表的子集,它活着并被指到leader。

查看原先的topic

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic quickstart-events

Topic: quickstart-events TopicId: YmOLaKbBTxitS9AgDfs8LQ PartitionCount: 1 ReplicationFactor: 1 Configs:

Topic: quickstart-events Partition: 0 Leader: 0 Replicas: 0 Isr: 0

原来的主题没有副本且在服务器0上。我们创建集群时,这是唯一的服务器.

当有一个kafka服务挂掉时服务仍可正常运营。

Kafka RabbitMQ

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Python 持久化 - 文件
下一篇:两所大学中的智能车竞赛校内赛
相关文章