RabbitMQ 第3章 RabbitMQ Work Queues(工作队列)

网友投稿 552 2022-05-28

一、概述

工作队列(Work queues)

(使用Java客户端)

它在web应用中是非常有用的,因为在很短的时间内http请求窗口处理一个复杂的任务是不可能实现的,它的结构如下图-1所示:

二、实现步骤

2.1、准备

在本部分内容之前,已经实现了发送单条”Hello World!“的消息,现在将发送一些复杂的字符串任务,由于没有一个真实的生产环境来模拟,我们可以通过使用Thread.sleep()函数来假设任务通过描述字符串hello...将要花费三秒钟的时间。

从之前的实例中,我们只需要稍微修改Sender01.java代码,允许任意的消息通过命令行发送,这一计划将安排到工作队列中的任务,让我们将

String message = getMessage(argv); channel.basicPublish("", "hello", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'");

消息处理:

private static String getMessage(String[] strings){ if (strings.length < 1) return "Hello World!"; return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) return ""; StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); }

之前的旧的Recv01.java也需要做一些改变。

while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); doWork(message); System.out.println(" [x] Done"); }

模拟假任务执行时间:

private static void doWork(String task) throws InterruptedException { for (char ch: task.toCharArray()) { if (ch == '.') Thread.sleep(1000); } }

2.2、轮询调度

使用消息队列的优点之一就是能够方便地并行工作,如果我们建立了大量的工作任务,我们就可以添加更多的worker,这样大规模应用就显的比较容易,首先让我们尝试同时运行两个Worker.java脚本,他们都将获得队列中的消息,但是具体情况如下,我们需要打开三个控制台,两个将运行Worker.java的脚本,这些控制台将显示两个消费者C1和C2,第三个控制台将发布一个新的任务,一旦启动了消费者,你将可以通过第三个控制台发布一些消息,在默认情况下RabbitMQ将发送每条消息给下一个消费者,在队列里每个消费者将获得同样数量的消息,这种方式被称之轮询调度。

2.3、消息确认

为了确保每条消息不丢失,RabbitMQ支持消息的确认(Acknowledgments),一个确认被送回消费者告诉RabbitMQ的一个特定的消息一经被接收和处理,RabbitMQ此时才可以将该特定的消息删除。

如果消费者进程被杀掉而没有发送一个确认给RabbitMQ服务器,RabbitMQ会明白这个消息是没有被正常完成处理。

QueueingConsumer consumer = new QueueingConsumer(channel); boolean autoAck = false; channel.basicConsume("hello", autoAck, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); //... channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }

RabbitMQ 第3章 RabbitMQ Work Queues(工作队列)

2.4、消息持久化

前面已经给出了如何确保消费者down了,任务也不会丢失,但是我们的任务在RabbitMQ服务器停止时还是可能会被丢失,当RabbitMQ服务器宕机或者奔溃时,它会丢失所有的队列和消息,除非告诉它不要这么做,需要做两件事情确保消息不会被丢失,我们需要标记队列和消息持久化,首先我们需要确保RabbitMQ永远不会丢失队列,为了做到这点,我们需要将队列声明为持久化:

boolean durable = true; channel.queueDeclare("task_queue", durable, false, false, null);

此时queueDeclare变化需要被兼容到生产者和消费者的代码中,在这一点上,我们肯定不会丢失队列,即使task_queue队列所属的RabbitMQ服务器重新启动,现在我们需要将消息标记为持久性,通过设置MessageProperties值为 PERSISTENT_TEXT_PLAN来实现。

import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

消息持久化需要注意的事项:

标记为持久化的消息并不能完全保证消息不会被丢失,虽然它告诉RabbitMQ的消息保存到磁盘,仍然有一个很短的时间内,RabbitMQ的消息一经接收了,但是并没有成功保存到磁盘,而是保存在缓存汇总,持久化的保证能力不足,但它是我们简单任务队列已经足以满足需求了。

2.5、公平调度

int prefetchCount = 1; channel.basicQos(prefetchCount);

它的结构图如下图-2所示:

2.6、完整的代码清单:

package com.xuz.work; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.ConsumerCancelledException; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.ShutdownSignalException; public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); Connection conn = factory.newConnection(); Channel channel = conn.createChannel(); /** * true: * 这样设置之后,服务器收到消息后就会立刻将消息写入到硬盘,就可以防止突然服务器挂掉,而引起数据丢失了,但是服务器如果刚收到消息,还没有来得写入硬盘,就挂掉了,这样 * 无法避免消息得丢失。 */ channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println("waiting for message.To exit press CTRL+C"); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); /** * false:设置确认消息,true表示接收到消息之后,将返回给服务端确定消息 */ channel.basicConsume(TASK_QUEUE_NAME, false,consumer); while(true){ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("Received:["+message+"] from Task"); doWork(message); System.out.println("Done!"); //设置消息确认机制,如将如下代码注释掉,则 //一旦将autoAck关闭之后,一定要记得处理完消息之后,向服务器确认消息。否则服务器将会一直转发该消息 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } private static void doWork(String message) throws InterruptedException { for (char ch : message.toCharArray()) { if(ch == '.')Thread.sleep(1000); } } }

Java RabbitMQ

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Linux开发_Shell脚本编程语言
下一篇:277_DBA_执行计划成本概述_统计信息收集
相关文章