五分钟带你玩转rabbitmq(五)死信队列

网友投稿 743 2022-05-29

文件目录如下

业务背景:

如果有有错误消息 如果手动nack同时将消息放回到队列中 那么这条消息会反复消费 留在队列中

如果nack后将消息丢弃 那么如果碰到网络抖动 消息也会丢失 。 所以 建立死信队列避免消息丢失。

原理 :

当消息进入进入业务队列后 如果收到nack那么就将这条消息放入另一条队列中 。

1.pom文件

org.springframework.boot

spring-boot-starter-amqp

2.配置文件

server:

port: 8088

spring:

RabbitMQ:

host: 192.168.*.*

port: 5672

username: root

password: root

virtual-host: /

listener:

simple:

acknowledge-mode: manual #手动应答

prefetch: 1 # 每次只处理一个信息

publisher-confirms: true #开启消息确认机制

publisher-returns: true #支持消息发送失败返回队列

3.rabbitmq的配置

@Configuration

public class RabbitMqConfig {

/**

* 连接工厂

*/

@Autowired

private ConnectionFactory connectionFactory;

五分钟带你玩转rabbitmq(五)死信队列

/**

* 定制化amqp模版

*

* ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调 即消息发送到exchange ack

* ReturnCallback接口用于实现消息发送到RabbitMQ 交换器,但无相应队列与交换器绑定时的回调 即消息发送不到任何一个队列中 ack

*/

@Bean

public RabbitTemplate rabbitTemplate() {

Logger logger = LoggerFactory.getLogger(RabbitTemplate.class);

RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

// 消息发送失败返回到队列中, yml需要配置 publisher-returns: true

rabbitTemplate.setMandatory(true);

// 发送消息确认, yml需要配置 publisher-confirms: true

rabbitTemplate.setConfirmCallback(msgSendConfirmCallBack());

// 消息返回, yml需要配置 publisher-returns: true

rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {

String correlationId = message.getMessageProperties().getCorrelationId().toString();

logger.debug("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {} 路由键: {}", correlationId, replyCode, replyText, exchange,

routingKey);

});

return rabbitTemplate;

}

/**

* 确认发送消息是否成功(调用util方法)

*

* @return

*/

@Bean

public MsgSendConfirmCallBack msgSendConfirmCallBack() {

return new MsgSendConfirmCallBack();

}

}

util发送回调方法

public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {

/**

* 回调方法

* @param correlationData

* @param ack

* @param cause

*/

@Override

public void confirm(CorrelationData correlationData, boolean ack, String cause) {

System.out.println("MsgSendConfirmCallBack , 回调id:" + correlationData);

if (ack) {

System.out.println("消息发送成功");

} else {

//可以将消息写入本地,使用定时任务重新发送

System.out.println("消息发送失败:" + cause + "\n重新发送");

}

}

}

这里有一个点 如果想做实现消息失败重新发送 在注释处可以实现

需要将消息写入本地 如果失败从本地读取 然后发送 如果成功删除本地信息

4.业务队列(如:订单业务)

这里声明了一个业务队列

关键点在于x-dead-letter-exchange,x-dead-letter-routing-key 两个参数

@Configuration

public class BusinessConfig {

/**

* 业务1模块direct交换机的名字

*/

public static final String YEWU1_EXCHANGE = "yewu1_direct_exchange";

/**

* 业务1 demo业务的队列名称

*/

public static final String YEWU1_DEMO_QUEUE = "yewu1_demo_queue";

/**

* 业务1 demo业务的routekey

*/

public static final String YEWU1_DEMO_ROUTINGKEY = "yewu1_demo_key";

@Bean

public Queue yewu1DemoDeadQueue() {

// 将普通队列绑定到死信队列交换机上

Map args = new HashMap<>(2);

args.put(RetryConfig.RETRY_LETTER_QUEUE_KEY, DeadConfig.FAIL_EXCHANGE_NAME);

args.put(RetryConfig.RETRY_LETTER_ROUTING_KEY, DeadConfig.FAIL_ROUTING_KEY);

return new Queue("yewu1_demo_dead_queue", true, false, false, args);

}

/**

* 将消息队列和交换机进行绑定

*/

@Bean

public Binding binding_one() {

return BindingBuilder.bind(yewu1DemoDeadQueue()).to(yewu1Exchange())

.with("yewu1_demo_dead_key");

}

}

这里有一个点如果想持久化消息到磁盘 需要新建队列时 new Queue将第二个参数输入为true 但是面对大并发时效率会变低

5.死信队列

@Configuration

public class DeadConfig {

/**

* 死信队列

*/

public final static String FAIL_QUEUE_NAME = "fail_queue";

/**

* 死信交换机

*/

public final static String FAIL_EXCHANGE_NAME = "fail_exchange";

/**

* 死信routing

*/

public final static String FAIL_ROUTING_KEY = "fail_routing";

/**

* 创建配置死信队列

*

*/

@Bean

public Queue deadQueue() {

return new Queue(FAIL_QUEUE_NAME, true, false, false);

}

/**

* 死信交换机

*

* @return

*/

@Bean

public DirectExchange deadExchange() {

DirectExchange directExchange = new DirectExchange(FAIL_EXCHANGE_NAME, true, false);

return directExchange;

}

/**

* 绑定关系

*

* @return

*/

@Bean

public Binding failBinding() {

return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(FAIL_ROUTING_KEY);

}

}

6.生产者消费者

public enum RabbitEnum {

/**

* 处理成功

*/

ACCEPT,

/**

* 可以重试的错误

*/

RETRY,

/**

* 无需重试的错误

*/

REJECT

@RequestMapping("/sendDirectDead")

String sendDirectDead(@RequestBody String message) throws Exception {

System.out.println("开始生产");

CorrelationData data = new CorrelationData(UUID.randomUUID().toString());

rabbitTemplate.convertAndSend(BusinessConfig.YEWU1_EXCHANGE, "yewu1_demo_dead_key",

message, data);

System.out.println("结束生产");

System.out.println("发送id:" + data);

return "OK,sendDirect:" + message;

}

@RabbitListener(queues = "yewu1_demo_dead_queue")

protected void consumerDead(Message message, Channel channel) throws Exception {

RabbitEnum ackSign = RabbitEnum.RETRY;

try {

int i = 10 / 0;

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

} catch (Exception e) {

ackSign = RabbitEnum.RETRY;

throw e;

} finally {

// 通过finally块来保证Ack/Nack会且只会执行一次

if (ackSign == RabbitEnum.ACCEPT) {

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

} else if (ackSign == RabbitEnum.RETRY) {

channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);

}

}

}

7.实验

当发送yewu1_demo_dead_queue队列时 如果抛出异常 会放入死信队列中。

RabbitMQ

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:hadoop基本认识
下一篇:什么是GaussDB(for Redis)?
相关文章