Kafka笔记】Kafka 多线程消费消息

网友投稿 774 2022-05-30

Kafka Java Consumer采用的是单线程的设计。其入口类KafkaConsumer是一个双线程的设计,即用户主线程和心跳线程。

用户主线程,指的是启动Consumer应用程序main方法的线程,心跳线程(Heartbeat Thread)只负责定期给对应的Broker机器发送心跳请求,以表示消费者应用的存活性。

官网文档对于consumer多线程的处理方式 :

每个线程一个消费者

每个线程自己的消费者实例。这里是这种方法的优点和缺点:

PRO: 这是最容易实现的

PRO: 因为它不需要在线程之间协调,所以通常它是最快的。

CON: 更多的消费者意味着更多的TCP连接到集群(每个线程一个)。一般kafka处理连接非常的快,所以这是一个小成本。

CON: 更多的消费者意味着更多的请求被发送到服务器,但稍微较少的数据批次可能导致I/O吞吐量的一些下降。

CON: 所有进程中的线程总数受到分区总数的限制。

解耦消费和处理

另一个替代方式是一个或多个消费者线程,它来消费所有数据,其消费所有数据并将ConsumerRecords实例切换到由实际处理记录处理的处理器线程池来消费的阻塞队列。这个选项同样有利弊:

可扩展消费者和处理进程的数量。这样单个消费者的数据可分给多个处理器线程来执行,避免对分区的任何限制。

CON: 跨多个处理器的顺序保证需要特别注意,因为线程是独立的执行,后来的消息可能比遭到的消息先处理,这仅仅是因为线程执行的运气。如果对排序没有问题,这就不是个问题。

CON: 手动提交变得更困难,因为它需要协调所有的线程以确保处理对该分区的处理完成。

这是两种不同的处理方式。

解释: 消费者程序启动多个线程,每个线程维护专属的KafkaConsumer,负责完整的消息获取、消息处理流程。 (其实就是一个消费者客户端开启多个线程,每个线程都有各自的Consumer对同一个topic或者多个topic进行消费,这些消费者(线程)组成了一个消费者组)

借用网上的图:

topic数据实例:

代码:

public class KafkaConsumerThread implements Runnable{ private KafkaConsumer consumer; private AtomicBoolean closed = new AtomicBoolean(false); public KafkaConsumerThread(){ } // 构造方法 生成自己的consumer public KafkaConsumerThread(Properties props) { this.consumer = new KafkaConsumer<>(props); } @Override public void run() { try { // 消费同一主题 consumer.subscribe(Collections.singletonList("six-topic")); // 线程名称 String threadName = Thread.currentThread().getName(); while (!closed.get()){ ConsumerRecords records = consumer.poll(3000); for (ConsumerRecord record : records) { System.out.printf("Context: Thread-name= %s, topic= %s partition= %s, offset= %d, key= %s,value= %s\n",threadName,record.topic(),record.partition(),record.offset(),record.key(),record.value()); } } }catch (WakeupException e){ e.printStackTrace(); }finally { consumer.close(); } } /** * 关闭消费 */ public void shutdown(){ closed.set(true); // wakeup 可以安全地从外部线程来中断活动操作 consumer.wakeup(); } public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "XXXXXXX:9093"); props.put("group.id", "thread-1");//消费者组,只要group.id相同,就属于同一个消费者组 props.put("enable.auto.commit", "true");//自动提交offset props.put("auto.offset.reset", "earliest"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("session.timeout.ms", "30000"); props.put("max.poll.records",6); // 运行三个线程,消费同一个topic 这个topic的分区必须大于等于3 否则会有消费者消费不到数据 for (int i = 0; i < 3 ; i++) { new Thread(new KafkaConsumerThread(props),"Thread"+i).start(); } } }

日志:

Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 0, key= ImKey-0-one,value= ImValue-0-one Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 0, key= ImKey-1-one,value= ImValue-1-one Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 1, key= ImKey-5-one,value= ImValue-5-one Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 2, key= ImKey-8-one,value= ImValue-8-one Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 3, key= ImKey-10-one,value= ImValue-10-one Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 4, key= ImKey-13-one,value= ImValue-13-one Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 5, key= ImKey-14-one,value= ImValue-14-one Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 0, key= ImKey-4-one,value= ImValue-4-one Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 1, key= ImKey-6-one,value= ImValue-6-one Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 2, key= ImKey-7-one,value= ImValue-7-one Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 3, key= ImKey-11-one,value= ImValue-11-one Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 4, key= ImKey-15-one,value= ImValue-15-one Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 5, key= ImKey-21-one,value= ImValue-21-one Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 6, key= ImKey-25-one,value= ImValue-25-one Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 7, key= ImKey-27-one,value= ImValue-27-one Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 8, key= ImKey-29-one,value= ImValue-29-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 1, key= ImKey-2-one,value= ImValue-2-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 2, key= ImKey-3-one,value= ImValue-3-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 3, key= ImKey-9-one,value= ImValue-9-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 4, key= ImKey-12-one,value= ImValue-12-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 5, key= ImKey-16-one,value= ImValue-16-one Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 6, key= ImKey-17-one,value= ImValue-17-one Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 7, key= ImKey-24-one,value= ImValue-24-one Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 8, key= ImKey-32-one,value= ImValue-32-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 6, key= ImKey-18-one,value= ImValue-18-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 7, key= ImKey-19-one,value= ImValue-19-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 8, key= ImKey-20-one,value= ImValue-20-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 9, key= ImKey-22-one,value= ImValue-22-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 10, key= ImKey-23-one,value= ImValue-23-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 11, key= ImKey-26-one,value= ImValue-26-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 12, key= ImKey-28-one,value= ImValue-28-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 13, key= ImKey-30-one,value= ImValue-30-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 14, key= ImKey-31-one,value= ImValue-31-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 15, key= ImKey-33-one,value= ImValue-33-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 16, key= ImKey-34-one,value= ImValue-34-one

可以看到三个线程,一个消费者组,每个线程消费者得到一个topic的分区去消费消息。

解释: 消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。获取消息的线程可以是一个或多个,每个维护专属KafkaConsumer实例,处理消息交由特定线程池来做,从而实现消息获取与消息处理的真正解耦。

这里的多线程处理消息逻辑可以有多种方法,这里就列出来几种:

使用队列存储消息,多线程处理队列:

使用独立锁(FIFO)队列LinkedBlockingQueue

该队列是线程安全的先进先出队列

【Kafka笔记】Kafka 多线程消费消息

public class KafkaConsumerThread2 implements Runnable { // 存储消息 先进先出队列 private LinkedBlockingQueue> list; private AtomicBoolean closed = new AtomicBoolean(false); public KafkaConsumerThread2() { } public KafkaConsumerThread2(LinkedBlockingQueue> list) { this.list = list; } @Override public void run() { // 线程名称 String threadName = Thread.currentThread().getName(); // 处理消息 while (!closed.get()){ try { ConsumerRecords records = list.take(); System.out.println("消息数量"+records.count()); if (records.isEmpty()){ System.out.printf("队列为空,不消费数据,Thread-name= %s\n",threadName); }else { for (ConsumerRecord record : records) { Thread.sleep(3000); System.out.printf("Context: Thread-name= %s, topic= %s partition= %s, offset= %d, key= %s,value= %s\n",threadName,record.topic(),record.partition(),record.offset(),record.key(),record.value()); } } }catch (InterruptedException e){ e.printStackTrace(); } } } public static void main(String[] args) { LinkedBlockingQueue> list = new LinkedBlockingQueue<>(); Properties props = new Properties(); props.put("bootstrap.servers", "10.33.68.68:9093"); props.put("group.id", "thread-5");//消费者组,只要group.id相同,就属于同一个消费者组 props.put("enable.auto.commit", "true");//自动提交offset props.put("auto.offset.reset", "earliest"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("session.timeout.ms", "30000"); props.put("max.poll.records",5); KafkaConsumer consumer = new KafkaConsumer<>(props); // 消费同一主题 consumer.subscribe(Collections.singletonList("six-topic")); // 开启三个线程处理队列中的消息 for (int i = 0; i <3 ; i++) { new Thread(new KafkaConsumerThread2(list),"thread-"+i).start(); } while (true){ ConsumerRecords records = consumer.poll(1000); try { list.put(records); //Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } } }

创建线程池,使用线程池处理消息逻辑

逻辑处理类ConsumerDealThread:

public class ConsumerDealThread implements Runnable{ private ConsumerRecord record; public ConsumerDealThread(ConsumerRecord record) { this.record = record; } public void run() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.printf("Context: Thread-name= %s, topic= %s partition= %s, offset= %d, key= %s,value= %s\n",Thread.currentThread().getName(),record.topic(),record.partition(),record.offset(),record.key(),record.value()); } }

运行类KafkaConsumerThread3:

public class KafkaConsumerThread3 { public static void main(String[] args) { LinkedBlockingQueue> list = new LinkedBlockingQueue<>(); Properties props = new Properties(); props.put("bootstrap.servers", "10.33.68.68:9093"); props.put("group.id", "thread-18");//消费者组,只要group.id相同,就属于同一个消费者组 props.put("enable.auto.commit", "true");//自动提交offset props.put("auto.offset.reset", "earliest"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("session.timeout.ms", "30000"); props.put("max.poll.records",5); KafkaConsumer consumer = new KafkaConsumer<>(props); // 消费同一主题 consumer.subscribe(Collections.singletonList("six-topic")); ExecutorService executor = new ThreadPoolExecutor(3, 3, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy()); while (true){ ConsumerRecords records = consumer.poll(1000); try { for (ConsumerRecord record : records) { executor.submit(new ConsumerDealThread(record)); } } catch (Exception e) { e.printStackTrace(); consumer.wakeup(); executor.shutdown(); try { if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { System.out.println("超时,未关闭线程池"); } } catch (InterruptedException e2) { e.printStackTrace(); } } BlockingQueue queue = ((ThreadPoolExecutor) executor).getQueue(); System.out.println("队列数量:"+queue.size()); } } }

EI企业智能 Kafka 可信智能计算服务 TICS 多线程 智能数据

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:机器人操作系统ROS 1.0 和 2.0 发展规划2018-2025(Open Robotics)译
下一篇:(更新时间)2021年4月8日 Django框架 ORM中查询详解
相关文章