将OneNote笔记与Word文档链接的两种方法(onenote怎样打开word文档)
774
2022-05-30
Kafka Java Consumer采用的是单线程的设计。其入口类KafkaConsumer是一个双线程的设计,即用户主线程和心跳线程。
用户主线程,指的是启动Consumer应用程序main方法的线程,心跳线程(Heartbeat Thread)只负责定期给对应的Broker机器发送心跳请求,以表示消费者应用的存活性。
官网文档对于consumer多线程的处理方式 :
每个线程一个消费者
每个线程自己的消费者实例。这里是这种方法的优点和缺点:
PRO: 这是最容易实现的
PRO: 因为它不需要在线程之间协调,所以通常它是最快的。
CON: 更多的消费者意味着更多的TCP连接到集群(每个线程一个)。一般kafka处理连接非常的快,所以这是一个小成本。
CON: 更多的消费者意味着更多的请求被发送到服务器,但稍微较少的数据批次可能导致I/O吞吐量的一些下降。
CON: 所有进程中的线程总数受到分区总数的限制。
解耦消费和处理
另一个替代方式是一个或多个消费者线程,它来消费所有数据,其消费所有数据并将ConsumerRecords实例切换到由实际处理记录处理的处理器线程池来消费的阻塞队列。这个选项同样有利弊:
可扩展消费者和处理进程的数量。这样单个消费者的数据可分给多个处理器线程来执行,避免对分区的任何限制。
CON: 跨多个处理器的顺序保证需要特别注意,因为线程是独立的执行,后来的消息可能比遭到的消息先处理,这仅仅是因为线程执行的运气。如果对排序没有问题,这就不是个问题。
CON: 手动提交变得更困难,因为它需要协调所有的线程以确保处理对该分区的处理完成。
这是两种不同的处理方式。
解释: 消费者程序启动多个线程,每个线程维护专属的KafkaConsumer,负责完整的消息获取、消息处理流程。 (其实就是一个消费者客户端开启多个线程,每个线程都有各自的Consumer对同一个topic或者多个topic进行消费,这些消费者(线程)组成了一个消费者组)
借用网上的图:
topic数据实例:
代码:
public class KafkaConsumerThread implements Runnable{ private KafkaConsumer
日志:
Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 0, key= ImKey-0-one,value= ImValue-0-one Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 0, key= ImKey-1-one,value= ImValue-1-one Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 1, key= ImKey-5-one,value= ImValue-5-one Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 2, key= ImKey-8-one,value= ImValue-8-one Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 3, key= ImKey-10-one,value= ImValue-10-one Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 4, key= ImKey-13-one,value= ImValue-13-one Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 5, key= ImKey-14-one,value= ImValue-14-one Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 0, key= ImKey-4-one,value= ImValue-4-one Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 1, key= ImKey-6-one,value= ImValue-6-one Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 2, key= ImKey-7-one,value= ImValue-7-one Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 3, key= ImKey-11-one,value= ImValue-11-one Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 4, key= ImKey-15-one,value= ImValue-15-one Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 5, key= ImKey-21-one,value= ImValue-21-one Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 6, key= ImKey-25-one,value= ImValue-25-one Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 7, key= ImKey-27-one,value= ImValue-27-one Context: Thread-name= Thread1, topic= six-topic partition= 1, offset= 8, key= ImKey-29-one,value= ImValue-29-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 1, key= ImKey-2-one,value= ImValue-2-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 2, key= ImKey-3-one,value= ImValue-3-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 3, key= ImKey-9-one,value= ImValue-9-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 4, key= ImKey-12-one,value= ImValue-12-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 5, key= ImKey-16-one,value= ImValue-16-one Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 6, key= ImKey-17-one,value= ImValue-17-one Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 7, key= ImKey-24-one,value= ImValue-24-one Context: Thread-name= Thread2, topic= six-topic partition= 2, offset= 8, key= ImKey-32-one,value= ImValue-32-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 6, key= ImKey-18-one,value= ImValue-18-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 7, key= ImKey-19-one,value= ImValue-19-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 8, key= ImKey-20-one,value= ImValue-20-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 9, key= ImKey-22-one,value= ImValue-22-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 10, key= ImKey-23-one,value= ImValue-23-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 11, key= ImKey-26-one,value= ImValue-26-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 12, key= ImKey-28-one,value= ImValue-28-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 13, key= ImKey-30-one,value= ImValue-30-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 14, key= ImKey-31-one,value= ImValue-31-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 15, key= ImKey-33-one,value= ImValue-33-one Context: Thread-name= Thread0, topic= six-topic partition= 0, offset= 16, key= ImKey-34-one,value= ImValue-34-one
可以看到三个线程,一个消费者组,每个线程消费者得到一个topic的分区去消费消息。
解释: 消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。获取消息的线程可以是一个或多个,每个维护专属KafkaConsumer实例,处理消息交由特定线程池来做,从而实现消息获取与消息处理的真正解耦。
这里的多线程处理消息逻辑可以有多种方法,这里就列出来几种:
使用队列存储消息,多线程处理队列:
使用独立锁(FIFO)队列LinkedBlockingQueue
该队列是线程安全的先进先出队列
public class KafkaConsumerThread2 implements Runnable { // 存储消息 先进先出队列 private LinkedBlockingQueue
创建线程池,使用线程池处理消息逻辑
逻辑处理类ConsumerDealThread:
public class ConsumerDealThread implements Runnable{ private ConsumerRecord record; public ConsumerDealThread(ConsumerRecord record) { this.record = record; } public void run() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.printf("Context: Thread-name= %s, topic= %s partition= %s, offset= %d, key= %s,value= %s\n",Thread.currentThread().getName(),record.topic(),record.partition(),record.offset(),record.key(),record.value()); } }
运行类KafkaConsumerThread3:
public class KafkaConsumerThread3 { public static void main(String[] args) { LinkedBlockingQueue
EI企业智能 Kafka 可信智能计算服务 TICS 多线程 智能数据
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。