urllib:Python爬虫必学的网络库,收藏这一篇就Go了
563
2022-05-29
本文主要内容
考察Kafka架构。
生产者发送消息。
消费者读取消息。
Kafka安装与运行。
虽然这是一本关于Kafka Streams的书,但是要研究Kafka Streams不可能不探讨Kafka,毕竟,Kafka Streams是一个运行在Kafka之上的库。
Kafka Streams设计得非常好,因此即使具有很少或者零Kafka经验的人都可以启动和运行Kafka Streams。但是,你所取得的进步和对Kafka调优的能力将是有限的。掌握Kafka的基础知识对有效使用Kafka Streams来说是必要的。
{注意}
本章面向的读者是对Kafka Streams有兴趣,但对Kafka本身具有很少或零经验的开发者。如果读者对Kafka具备很好的应用知识,那么就可以跳过本章,直接阅读第3章。
Kafka是一个很大的话题,很难通过一章进行完整论述。本章将会覆盖足以使读者很好地理解Kafka的工作原理和一些核心配置项设置的必备知识。要想更深入了解Kafka的知识,请看Dylan Scott写的_Kafka in Action_(Manning,2018)
1 数据问题
如今,各组织都在研究数据。互联网公司、金融企业以及大型零售商现在比以往任何时候都更善于利用这些数据。通过利用数据,既能更好地服务于客户,又能找到更有效的经营方式(我们要对这种情况持积极态度,并且在看待客户数据时要从好的意图出发)。
让我们考虑一下在ZMart数据管理解决方案中的各种需求。
需要一种将数据快速发送到中央存储的方法。
由于服务器经常发生故障,这就需要复制数据的能力,有了这种能力,不可避免的故障就不会导致停机和数据丢失。
需要能够扩展到任意数量消费者的数据,而不必跟踪不同的应用程序。需要让组织中的任何人都能使用这些数据,而不必跟踪哪些人已经查看了数据,哪些人还没有查看。
2 使用Kafka处理数据
在第1章中,已介绍过大型零售公司ZMart。那时,ZMart需要一个流式处理平台来利用公司的销售数据,以便更好地提供客户服务并提升销售总额。但在那时的6个月前,ZMart期待了解它的数据情况,ZMart最初有一个定制的非常有效的解决方案,但是很快就发现该解决方案变得难以驾驭了,接下来将看到其原因。
2.1 ZMart原始的数据平台
最初,ZMart是一家小公司,零售销售数据从各分离的应用程序流入系统。这种方法起初效果还是不错的,但随着时间的推移,显然需要一种新的方法。一个部门的销售数据不再只是该部门所感兴趣的,公司的其他部门也可能感兴趣,并且不同的部门对数据的重要性和数据结构都有不同的需求。图2-1展示了ZMart原始的数据平台。
图2-1 ZMart原始数据架构简单,足够使每个信息源流入和流出信息
随着时间的推移,ZMart通过收购其他公司以及扩大其现有商店的产品而持续增长。随着应用程序的添加,应用程序之间的连接变得更加复杂,由最初的少量的应用程序之间的通信演变成了一堆名副其实的意大利面条。如图2-2所示,即使只有3个应用程序,连接的数量也很烦琐且令人困惑。可以看到,随着时间的推移,添加新的应用程序将使这种数据架构变得难以管理。
图2-2 随着时间的推移,越来越多的应用程序被添加进来,连接所有这些信息源变得非常复杂
2.2 一个Kafka销售交易数据中心
一个解决ZMart问题的方案是创建一个接收进程来控制所有的交易数据,即建立一个交易数据中心。这个交易数据中心应该是无状态的,它以一种方式接受交易数据并存储,这种方式是任何消费应用程序可以根据自己的需要从数据中心提取信息。对哪些数据的追踪取决于消费应用程序,交易数据中心只知道需要将交易数据保存多久,以及在什么时候切分或删除这些数据。
也许你还没有猜到,我们有Kafka完美的用例。Kafka是一个具有容错能力、健壮的发布/订阅系统。一个Kafka节点被称为一个代理,多个Kafka服务器组成一个集群。Kafka将生产者写入的消息存储在Kafka的主题之中,消费者订阅Kafka主题,与Kafka进行通信以查看订阅的主题是否有可用的消息。图 2-3 展示了如何将Kafka想象为销售交易数据 中心。
现在大家已经对Kafka的概况有了大致的了解,在下面的几节中将进行仔细研究。
图2-3 使用Kafka作为销售交易中心显著简化了ZMart数据架构,现在每台服务器不需要知道其他的信息来源,它们只需要知道如何从Kafka读取数据和将数据写入Kafka
3 Kafka架构
在接下来的几个小节中,我们将介绍Kafka体系架构的关键部分以及Kafka的工作原理。如果想尽早地体验运行Kafka,可以直接跳到2.6节,安装和运行Kafka。等Kafka安装之后,再回到这里来继续学习Kafka。
3.1 Kafka是一个消息代理
在前一节中,我曾说过Kafka是一个发布/订阅系统,但更精确地说法是Kafka充当了消息代理。代理是一个中介,将进行互利交换或交易但不一定相互了解的两部分汇聚在一起。图2-4展示了ZMart数据架构的演化。生产者和消费者被添加到图中以展示各单独部分如何与Kafka进行通信,它们之间不会直接进行通信。
Kafka将消息存储在主题中,并从主题检索消息。消息的生产者和消费者之间不会直接连接。此外,Kafka并不会保持有关生产者和消费者的任何状态,它仅作为一个消息交换中心。
Kafka主题底层的技术是日志,它是Kafka追加输入记录的文件。为了帮助管理进入主题的消息负载,Kafka使用分区。在第1章我们讨论了分区,大家可以回忆一下,分区的一个应用是将位于不同服务器上的数据汇集到同一台服务器上,稍后我们将详细讨论分区。
图2-4 Kafka是一个消息代理,生产者将消息发送到Kafka,这些消息被存储,并通过主题订阅的方式提供给消费者
3.2 Kafka是一个日志
Kafka底层的机制就是日志。大多数软件工程师都对日志很熟悉,日志用于记录应用程序正在做什么。如果在应用程序中出现性能问题或者错误,首先检查的是应用程序的日志,但这是另一种类型的日志。在Kafka(或者其他分布式系统)的上下文中,日志是“一种只能追加的,完全按照时间顺序排列的记录序列”[1]。
图2-5展示了日志的样子,当记录到达时,应用程序将它们追加到日志的末尾。记录有一个隐含的时间顺序,尽管有可能不是与每条记录相关联的时间戳,因为最早的记录在左边,后达到的记录在右端。
日志是具有强大含义的简单数据抽象,如果记录按时间有序,解决冲突或确定将哪个更新应用到不同的机器就变得明确了:最新记录获胜。
Kafka中的主题是按主题名称分隔的日志,几乎可以将主题视为有标签的日志。如果日志在一个集群中有多个副本,那么当一台服务器宕机后,就能够很容易使服务器恢复正常:只需重放日志文件。从故障中恢复的能力正是分布式提交日志具有的。
图2-5 日志是追加传入记录的文件——每条新到达的记录都被立即放在接收到的最后一条记录之后,这个过程按时间顺序对记录进行排序
我们只触及了关于分布式应用程序和数据一致性的深入话题的表面,但到目前为止所讲解的知识应该能让读者对Kafka涉及的内容有了一个基本的了解。
3.3 Kafka日志工作原理
当安装Kafka时,其中一个配置项是log.dir,该配置项用来指定Kafka存储日志数据的路径。每个主题都映射到指定日志路径下的一个子目录。子目录数与主题对应的分区数相同,目录名格式为“主题名_分区编号”(将在下一节介绍分区)。每个目录里面存放的都是用于追加传入消息的日志文件,一旦日志文件达到某个规模(磁盘上的记录总数或者记录的大小),或者消息的时间戳间的时间间隔达到了所配置的时间间隔时,日志文件就会被切分,传入的消息将会被追加到一个新的日志文件中(如图2-6所示)。
图2-6 logs目录是消息存储的根目录,/logs目录下的每个目录代表一个主题的分区,目录中的文件名以主题的名称打头,然后是下划线,后面接一个分区的编号
可以看到日志和主题是高度关联的概念,可以说一个主题是一个日志,或者说一个主题代表一个日志。通过主题名可以很好地处理经由生产者发送到Kafka的消息将被存储到哪个日志当中。既然已经讨论了日志的概念,那么我们再来讨论Kafka另一个基本概念——分区。
3.4 Kafka和分区
分区是Kafka设计的一个重要部分,它对性能来说必不可少。分区保证了同一个键的数据将会按序被发送给同一个消费者。图2-7展示了分区的工作原理。
图2-7 Kafka使用分区来实现高吞吐量,并将一个主题的消息在集群的不同服务器中传播
对主题作分区的本质是将发送到主题的数据切分到多个平行流之中,这是Kafka能够实现巨大吞吐量的关键。我们解释过每个主题就是一个分布式日志,每个分区类似于一个它自己的日志,并遵循相同的规则。Kafka将每个传入的消息追加到日志末尾,并且所有的消息都严格按时间顺序排列,每条消息都有一个分配给它的偏移量。Kafka不保证跨分区的消息有序,但是能够保证每个分区内的消息是有序的。
除了增加吞吐量,分区还有另一个目的,它允许主题的消息分散在多台机器上,这样给定主题的容量就不会局限于一台服务器上的可用磁盘空间。
现在让我们看看分区扮演的另一个关键角色:确保具有相同键的消息最终在一起。
3.5 分区按键对数据进行分组
Kafka处理键/值对格式的数据,如果键为空,那么生产者将采用轮询(round-robin)方式选择分区写入记录。图2-8展示了用非空键如何分配分区的操作。
如果键不为空,Kafka会使用以下公式(如下伪代码所示)确定将键/值对发送到哪个分区:
HashCode.(key) % number of partitions
通过使用确定性方法来选择分区,使得具有相同键的记录将会按序总是被发送到同一个分区。默认的分区器使用此方法,如果需要使用不同的策略选择分区,则可以提供自定义的分区器。
图2-8 “foo”被发送到分区0,“bar”被发送到分区1。通过键的
字节散列与分区总数取模来获得数据被分配的分区
3.6 编写自定义分区器
为什么要编写自定义分区器呢?在几个可能的原因中,下面将举一个简单的例子——组合键的使用。
假设将购买数据写入Kafka,该数据的键包括两个值,即客户ID和交易日期,需要根据客户ID对值进行分组,因此对客户ID和交易日期进行散列是行不通的。在这种情况下,就需要编写一个自定义分区器,该分区器知道组合键的哪一部分决定使用哪个分区。例如,/src/main/java/ bbejeck/model/PurchaseKey.java中的组合键,如代码清单2-1所示。
代码清单2-1 组合键PurchaseKey类
当提及分区时,需要保证特定用户的所有交易信息都会被发送到同一个分区中。但是整体作为键就无法保证,因为购买行为会在多个日期发生,包括交易日期的记录对一个用户而言就会导致不同的键值,就会将交易数据随机分布到不同的分区中。若需要确保具有相同客户ID的交易信息都发送到同一个分区,唯一的方法就是在确定分区时使用客户ID作为键。
代码清单2-2所示的自定义分区器的例子就满足需求。PurchaseKeyPartitioner类(源代码见src/ main/java/bbejeck/chapter_2/partitioner/PurchaseKeyPartitioner.java)从键中提取客户ID来确定使用哪个分区。
代码清单2-2 自定义分区器PurchaseKeyPartitioner类
该自定义分区器继承自DefaultPartitioner类,当然也可以直接实现Partitioner接口,但是在这个例子中,在DefaultPartitioner类中有一个已存在的逻辑。
请注意,在创建自定义分区器时,不仅局限于使用键,单独使用值或与键组合使用都是有效的。
{注意}
Kafka API提供了一个可以用来实现自定义分区器的Partitioner接口,本书不打算讲解从头开始写一个分区器,但是实现原则与代码清单2-2相同。
已经看到如何构造一个自定义分区器,接下来,将分区器与Kafka结合起来。
3.7 指定一个自定义分区器
既然已编写了一个自定义分区器,那就需要告诉Kafka使用自定义的分区器代替默认的分区器。虽然还没有讨论生产者,但在设置Kafka生产者配置时可以指定一个不同的分区器[2],配置如下:
通过为每个生产者实例设置分区器的方式,就可以随意地为任何生产者指定任何分区器类。在讨论Kafka生产者时再对生产者的配置做详细介绍。
{警告}
在决定使用的键以及选择键/值对的部分作为分区依据时,一定要谨慎行事。要确保所选择的键在所有数据中具有合理的分布,否则,由于大多数数据都分布在少数几个分区上,最终导致数据倾斜。
3.8 确定恰当的分区数
在创建主题时决定要使用的分区数既是一门艺术也是一门科学。其中一个重要的考虑因素是流入该主题的数据量。更多的数据意味着更多的分区以获得更高的吞吐量,但与生活中的任何事物一样,也要有取舍。
增加分区数的同时也增加了TCP连接数和打开的文件句柄数。此外,消费者处理传入记录所花费的时间也会影响吞吐量。如果消费者线程有重量级处理操作,那么增加分区数可能有帮助,但是较慢的处理操作最终将会影响性能。
3.9 分布式日志
我们已经讨论了日志和对主题进行分区的概念,现在,花点时间结合这两个概念来阐述分布式日志。
到目前为止,我们讨论日志和对主题进行分区都是基于一台Kafka服务器或者代理,但典型的Kafka生产集群环境包括多台服务器。故意将讨论集中单个节点上,是因为考虑一个节点更容易理解概念。但在实践中,总是使用包括多台服务器的Kafka集群。
当对主题进行分区时,Kafka不会将这些分区分布在一台服务上,而是将分区分散到集群中的多台服务器上。由于Kafka是在日志中追加记录,因此Kafka通过分区将这些记录分发到多台服务器上。图2-9展示了这个过程。
让我们通过使用图2-9作为一个向导来完成一个快速实例。对于这个实例,我们假设有一个主题,并且键为空,因此生产者将通过轮询的方式分配分区。
生产者将第1条消息发送到位于Kafka代理1上的分区0中[3],第2条消息被发送到位于Kafka代理1上的分区1中,第3条消息被发送到位于Kafka代理2上的分区2中。当生产者发送第6条消息时,消息将会被发送到Kafka代理3上的分区5中,从下一条消息开始,又将重复该步骤,消息将被发送到位于Kafka代理1上的分区0中。以这种方式继续分配消息,将消息分配到Kafka集群的所有节点中。
图2-9 生产者将消息写入主题的分区中,如果消息没有关联键,那么生产者就会通过轮询方式选择一个分区,否则通过键的散列值与分区总数取模来决定分区
虽然远程存储数据听起来会有风险,因为服务器有可能会宕机,但Kafka提供了数据冗余。当数据被写入Kafka的一个代理时,数据会被复制到集群中一台或多台机器上(在后面小节会介绍副本)。
3.10 ZooKeeper:领导者、追随者和副本
到目前为止,我们已经讨论了主题在Kafka中的作用,以及主题如何及为什么要进行分区。可以看到,分区并不都位于同一台服务器上,而是分布在整个集群的各个代理上。现在是时候来看看当服务器故障时Kafka如何提供数据可用性。
Kafka代理有领导者(leader)和追随者(follower)的概念。在Kafka中,对每一个主题分区(topic partition),会选择其中一个代理作为其他代理(追随者)的领导者。领导者的一个主要职责是分配主题分区的副本给追随者代理服务器。就像Kafka在集群中为一个主题分配分区一样,Kafka也会在集群的多台服务器中复制分区数据。在深入探讨领导者、追随者和副本是如何工作之前,先来介绍Kafka为实现这一点所使用的技术。
3.11 Apache ZooKeeper
如果你是个Kafka菜鸟,你可能会问自己:“为什么在Kafka的书中会谈论Apache ZooKeeper?”Apache ZooKeeper是Kafka架构不可或缺的部分,正是由于ZooKeeper才使得Kafka有领导者代理,并使领导者代理做诸如跟踪主题副本的事情,ZooKeeper官网对其介绍如下:
ZooKeeper是一个集中式服务,用于维护配置信息、命名、提供分布式同步和组服务。这些类型的所有服务都是通过分布式应用程序以某种形式使用。
既然Kafka是一个分布式应用程序,那么它一开始就应该知道ZooKeeper在其架构中的作用。在这里的讨论中,我们只考虑两个或多个Kafka服务器的安装问题。
在Kafka集群中,其中一个代理会被选为控制器。在2.3.4节我们介绍了分区以及如何在集群的不同服务器之间分配分区。主题分区有一个领导者分区和一到多个追随者分区(复制的级别决定复制的程度[4]),当生成消息时,Kafka将记录发送到领导者分区对应的代理上。
3.12 选择一个控制器
Kafka使用ZooKeeper来选择代理控制器,对于其中涉及的一致性算法的探讨已超出本书所讲内容的范围,因此我们不做深入探讨,只声明ZooKeeper从集群中选择一个代理作为控制器。
如果代理控制器发生故障或者由于任何原因而不可用时,ZooKeeper从与领导者保持同步的一系列代理(已同步的副本[ISR])中选出一个新的控制器,构成该系列的代理是动态的[5],ZooKeeper只会从这个代理系列中选择一个领导者[6]。
3.13 副本
Kafka在代理之间复制记录,以确保当集群中的节点发生故障时数据可用。可以为每个主题(正如前面介绍的消息发布或消费实例中的主题)单独设置复制级别也可以为集群中的所有主题设置复制级别[7]。图2-10演示了代理之间的复制流。
Kafka复制过程非常简单,一个主题分区对应的各代理从该主题分区的领导者分区消费消息,并将消息追加到自己的日志当中。正如2.3.12节所论述的,与领导者代理保持同步的追随者代理被认为是ISR,这些ISR代理在当前领导者发生故障或者不可用时有资格被选举为领导者。[8]
图2-10 代理1和代理3是一个主题分区的领导者,同时也是另外一个分区的追随者,而代理2只是追随者,追随者代理从领导者代理复制数据
3.14 控制器的职责
代理控制器的职责是为一个主题的所有分区建立领导者分区和追随者分区的关系,如果一个Kafka节点宕机或者没有响应(与ZooKeeper之间的心跳),那么所有已分配的分区(包括领导者和追随者)都将由代理控制器重新进行分配。图2-11演示了一个正在运行的代理控制器。[9]
图2-11展示了一个简单的故障情景。第1步,代理控制器检测到代理3不可用。第2步,代理控制器将代理3上分区的领导权重新分配给代理2。
ZooKeeper也参与了Kafka以下几个方面的操作。
集群成员——代理加入集群和维护集群中成员关系。如果一个代理不可用,则ZooKeeper将该代理从集群成员中移除。
主题配置——跟踪集群中的主题,记录哪个代理是主题的领导者,各主题有多少个分区以及主题的哪些特定的配置被覆盖。
访问控制——识别谁可以从特定主题中读取或写入消息。
图2-11 代理控制器负责将其他代理分配为某些主题/分区的领导者代理和另一些主题/分区的追随者代理, 当代理不可用时,代理控制器将已分配给不可用代理的重新分配给集群中的其他代理
现在可知Kafka为什么依赖于Apache ZooKeeper了,正是ZooKeeper使得Kafka有了一个带着追随者的领导者代理,领导者代理的关键角色是为追随者分配主题分区,以便进行复制,以及在代理成员出现故障时重新分配主题分区。
3.15 日志管理
对追加日志已进行了介绍,但还没有谈到随着日志持续增长如何对其进行管理。一个集群中旋转磁盘的空间是一个有限的资源,因此对Kafka而言,随着时间的推移,删除消息是很重要的事。在谈到删除Kafka中的旧数据时,有两种方法,即传统的日志删除和日志压缩。
3.16 日志删除
日志删除策略是一个两阶段的方法:首先,将日志分成多个日志段,然后将最旧的日志段删除。为了管理Kafka不断增加的日志,Kafka将日志切分成多个日志段。日志切分的时间基于消息中内置的时间戳。当一条新消息到达时,如果它的时间戳大于日志中第一个消息的时间戳加上log.roll.ms配置项配置的值时,Kafka就会切分日志。此时,日志被切分,一个新的日志段会被创建并作为一个活跃的日志段,而以前的活跃日志段仍然为消费者提供消息检索[10]。
日志切分是在设置Kafka代理时进行设置的[11]。日志切分有两个可选的配置项。
log.roll.ms——这个是主配置项,但没有默认值。
log.roll.hours——这是辅助配置项,仅当log.roll.ms没有被设置时使用,该配置项默认值是168小时。
随着时间的推移,日志段的数据也将不断增加,为了为传入的数据腾出空间,需要将较旧的日志段删除。为了删除日志段,可以指定日志段保留的时长。图2-12说明了日志切分的过程。
图2-12 左边是当前日志段,右上角是一个已被删除的日志段,在其下面是最近切分的仍然在使用的日志段
与日志切分一样,日志段的删除也基于消息的时间戳,而不仅是时钟时间或文件最后被修改的时间,日志段的删除基于日志中最大的时间戳。用来设置日志段删除策略的3个配置项按优先级依次列出如下,这里按优先级排列意味着排在前面的配置项会覆盖后面的配置项。
log.retention.ms——以毫秒(ms)为单位保留日志文件的时长。
log.retention.minutes——以分钟(min)为单位保留日志文件的时长。
log.retention.hours——以小时(h)为单位保留日志文件。
提出这些设置的前提是基于大容量主题的假设,这里大容量是指在一个给定的时间段内保证能够达到文件最大值。另一个配置项log.retention.bytes,可以指定较长的切分时间阈值,以控制I/O操作。最后,为了防止日志切分阈值设置得相对较大而出现日志量显著增加的情况,请使用配置项log.segment.bytes来控制单个日志段的大小。
对于键为空的记录以及独立的记录[12],删除日志的效果很好。但是,如果消息有键并需要预期的更新操作,那么还有一种方法更适合。
3.17 日志压缩
假设日志中已存储的消息都有键,并且还在不停地接收更新的消息,这意味着具有相同键的新记录将会更新先前的值。例如,股票代码可以作为消息的键,每股的价格作为定期更新的值。想象一下,使用这些信息来展示股票的价值,并出现程序崩溃或者重启,这就需要能够让每个键恢复到最新数据[13]。
如果使用删除策略,那么从最后一次更新到应用程序崩溃或重启之间的日志段就可能被去除,启动时就得不到所有的记录[14]。一种较好的方式是保留给定键的最近已知值,用与更新数据库表键一样的方式对待下一条记录[15]。
按键更新记录是实现压缩主题(日志)的表现形式。与基于时间和日志大小直接删除整个日志段的粗粒度方式不同,压缩是一种更加细粒度的方式,该方式是删除日志中每个键的旧数据。从一个很高的层面上来说,一个日志清理器(一个线程池)运行在后台,如果后面的日志中出现了相同的键,则日志清理器就会重新复制日志段文件并将该键对应的旧记录去除。图2-13阐明了日志压缩是如何为每个键保留最新消息的。
这种方式保证了给定键的最后一条记录在日志中。可以为每个主题指定日志保留策略,因此完全有可能某些主题使用基于时间的保留,而其他主题使用压缩。
默认情况下,日志清理功能是开启的。如果要对主题使用压缩,那么需要在创建主题时设置属性log.cleanup.policy=compact。
在Kafka Streams中使用应用状态存储时就要用到压缩,不过并不需要我们自己来创建相应的日志或主题——框架会处理。然而,理解压缩的原理是很重要的,日志压缩是一个宽泛的话题,我们仅谈论至此。如果想了解压缩方面的更多信息,参见Kafka官方文档。
{注意}
当使用cleanup.policy为压缩时,你可能好奇如何从日志中去除一条记录。对于一个压缩的主题,删除操作会为给定键设置一个null值,作为一个墓碑标记。任何值为null的键都确保先前与其键相同的记录被去除,之后墓碑标记自身也会被去除。
图2-13 左边是压缩前的日志,可以看到具有不同值的重复键,这些值是用来更新给定键的。右边是压缩后的日志,保留了每个键的最新值,但日志变小了
本节的关键内容是:如果事件或消息是独立、单独的,那么就使用日志删除,如果要对事件或消息进行更新,那就使用日志压缩。
我们已经花了很多时间介绍Kafka内部是如何处理数据的,现在,让我们转移到Kafka外部,探讨如何通过生产者向Kafka发送消息,以及消费者如何从Kafka读取消息。
4 生产者发送消息
回到ZMart对集中销售交易数据中心的需求,看看如何将购买交易数据发送到Kafka。在Kafka中,生产者是用于发送消息的客户端。图2-14重述ZMart的数据结构,突出显示生产者,以强调它们在数据流中适合的位置。
尽管ZMart有很多的销售交易,但现在我们只考虑购买一个单一物品:一本10.99美元的书。当消费者完成销售交易时,交易信息将被转换为一个键/值对并通过生产者发送到Kafka。
键是客户ID,即123447777,值是一个JSON格式的值,即"{\"item\":\"book\",\ "price\":10.99}"(这里已把双引号转义了,这样JSON可以被表示为Java中的字符串)。有了这种格式的数据,就可以使用生产者将数据发送到Kafka集群。代码清单2-3所示的示例代码可以在源代码/src/main/java/bbejeck.chapter_2/producer/SimpleProducer.java类中找到。
图2-14 生产者用于向Kafka发送消息,它们并不知道哪个消费者会读取消息,也不知道消费者在什么时候会读取消息
代码清单2-3 SimpleProducer示例
Kafka生产者是线程安全的。所有消息被异步发送到Kafka——一旦生产者将记录放到内部缓冲区,就立即返回Producer.send。缓冲区批量发送记录,具体取决于配置,如果在生产者缓冲区满时尝试发送消息,则可能会有阻塞。
这里描述的Producer.send方法接受一个Callback实例,一旦领导者代理确认收到记录,生产者就会触发Callback.onComplete方法,Callback.onComplete方法中仅有一个参数为非空。在本例中,只关心在发生错误时打印输出异常堆栈信息,因此检验异常对象是否为空。一旦服务器确认收到记录,返回的Future就会产生一个RecordMetadata对象。
{定义}
在代码清单2-3中,Producer.send方法返回一个Future对象,一个Future对象代表一个异步操作的结果。更重要的是,Future可以选择惰性地检索异步结果,而不是等它们完成。更多信息请参考Java文档“Interface Future ”(接口Future )。
4.1 生产者属性
当创建KafkaProducer实例时,传递了一个包含生产者配置信息的java.util. Properties参数。KafkaProducer的配置并不复杂,但在设置时需要考虑一些关键属性,例如,可以在配置中指定自定义的分区器。这里要介绍的属性太多了,因此我们只看一下代码清单2-3中使用的属性。
服务器引导——bootstrap.servers是一个用逗号分隔的host:port值列表。最终,生产者将使用集群中的所有代理。此外,此列表用于初始连接到Kafka集群。
序列化器——key.serializer和value.serializer通知Kafka如何将键和值转化为字节数组。在内部,Kafka使用键和值的字节数组,因此在将消息通过网络发送之前需要向Kafka提供正确的序列化器,以将对象转换为字节数组。
确认应答——acks指定生产者认为在一条记录发送完成之前需要等待的从代理返回的最小确认数。acks的有效值为all、0和1。当值为all时,生产者需要等待一个代理接收到所有追随者代理都已提交记录的确认。当值为1时,代理将记录写入其日志,但不用等待所有的追随者代理来确认提交了记录。当值为0时,意味着生产者不用等待任何确认——这基本上是“即发即弃”。
重试——如果发送一批消息失败,retries指定失败后尝试重发的次数。如果记录的顺序很重要,那么应该考虑设置max.in.flight.requests.connection为1,以防止失败的记录在重试发送之前第二批记录成功发送的情景。
压缩类型——如果使用数据压缩的话,compression.type配置项用来指定要采用的压缩算法。如果设置了compression.type,compression.type会通知生产者在发送数据前对本批次的数据进行压缩。注意,是对整个批次进行压缩,而不是单条记录。
分区器类——partitioner.class指定实现Partitioner接口的类的名称。partitioner.class与我们在2.3.7节中介绍的自定义分区器有关。
更多生产者相关的配置信息请参见Kafka官方文档。
4.2 指定分区和时间戳
当创建一个ProducerRecord对象时,可以选择指定分区、时间戳或者两者都指定。在代码清单2-3中实例化ProducerRecord时,使用了4个重载构造方法中的一个。其他构造方法允许设置分区和时间戳,或者只设置分区,代码如下:
4.3 指定分区
在2.3.5节中,我们讨论了Kafka分区的重要性。我们也讨论了DefaultPartitioner的工作原理以及如何提供一个自定义分区器。为什么要显式设置分区?可能有多种业务上的原因,下面是其中一个例子。
假设传入的记录都有键,但是记录被分发到哪个分区并不重要,因为消费者有逻辑来处理该键包含的任何数据。此外,键的分布可能不均匀,但你希望确保所有的分区接收到的数据量大致相同,代码清单2-4给出的是一个粗略的实现方案。
代码清单2-4 手动设置分区
上面的代码调用Math.abs,因此对于Math.abs求得的整型值,如果该值超出Integer. MAX_VALUE,也不必关注。
{定义}
AtomicInteger属于java.util.concurrent.atomic包,该包包含支持对单个变量进行无锁、线程安全的操作的类。若需要更多信息,请参考Java官方文档关于java.util.concurrent.atomic包的介绍。
4.4 Kafka中的时间戳
Kafka从0.10版本开始在记录中增加了时间戳,在创建ProducerRecord对象时调用以下重载的构造函数设置了时间戳。
如果没有设置时间戳,那么生产者在将记录发送到Kafka代理之前将会使用系统当前的时钟时间。时间戳也受代理级别的配置项log.message.timestamp.type的影响,该配置项可以被设置为CreateTime(默认类型)和LogAppendTime中的一种。与许多其他代理级别的配置一样,代理级别的配置将作为所有主题的默认值,但是在创建主题时可以为每个主题指定不同的值[16]。如果时间戳类型设置为LogAppendTime,并且在创建主题时没有覆盖代理级别对时间戳类型的配置,那么当将记录追加到日志时,代理将使用当前的时间覆盖时间戳,否则,使用来自ProducerRecord的时间戳。
两种时间戳类型该如何选择呢?LogAppendTime被认为是“处理时间”,而CreateTime被认为是“事件时间”,选择哪一种类型取决于具体的业务需求。这就要确定你是否需要知道Kafka什么时候处理记录,或者真实的事件发生在什么时候。在后面的章节,将会看到时间戳对于控制Kafka Streams中的数据流所起的重要作用。(未完)
本文转载自异步社区
Kafka 存储 大数据
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。