五分钟带你玩转rabbitmq(七)怎么保证消息不丢失

网友投稿 731 2022-05-28

先来说明一个概念,什么是可靠投递呢?在RabbitMQ中,一个消息从生产者发送到RabbitMQ服务器,需要经历这么几个步骤:

生产者准备好需要投递的消息。

生产者与RabbitMQ服务器建立连接。

生产者发送消息。

RabbitMQ服务器接收到消息,并将其路由到指定队列。

RabbitMQ服务器发起回调,告知生产者消息发送成功。

所谓可靠投递,就是确保消息能够百分百从生产者发送到服务器。

队列存在的以下问题:消息丢失问题 重复消费问题 以下为解决点

1:队列持久化硬盘

丢失的过程就只有在内存发送到磁盘时会丢失消息 如果保存到磁盘后 重启服务消息不会丢失 但是会影效率

new Queue("demo_queue", true, false, false, args); 第二个参数为true

2:手动ack

告知生产者消息成功/失败,否则,如果失败此队列会保持挂起状态,他们消息会等待。所以在消费完成之后通知生产者消费是否成功/失败,ack/nack

配置文件

rabbitmq:

host: 192.168.xx.xx

port: 5672

username: root

五分钟带你玩转rabbitmq(七)怎么保证消息不丢失

password: root

virtual-host: /

listener:

simple:

acknowledge-mode: manual #手动应答

prefetch: 1 # 每次只处理一个信息

publisher-confirms: true #开启消息确认机制

publisher-returns: true #支持消息发送失败返回队列

@RabbitListener(queues = "demo_queue")

protected void consumerDead(Message message, Channel channel) throws Exception {

RabbitEnum ackSign = RabbitEnum.ACCEPT;

try {

int i = 10 / 0;

} catch (Exception e) {

ackSign = RabbitEnum.RETRY;

throw e;

} finally {

// 通过finally块来保证Ack/Nack会且只会执行一次

if (ackSign == RabbitEnum.ACCEPT) {

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

} else if (ackSign == RabbitEnum.RETRY) {

channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);

}

}

}

3:确认是否发送成功

判断消息是否发送到交换机

@Bean

public RabbitTemplate rabbitTemplate() {

Logger logger = LoggerFactory.getLogger(RabbitTemplate.class);

RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

// 消息发送失败返回到队列中, yml需要配置 publisher-returns: true

rabbitTemplate.setMandatory(true);

// 发送消息确认, yml需要配置 publisher-confirms: true 消息是否发送的核心配置

rabbitTemplate.setConfirmCallback(msgSendConfirmCallBack());

// 消息返回, yml需要配置 publisher-returns: true 消息返回的配置

rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {

String correlationId = message.getMessageProperties().getCorrelationId().toString();

logger.debug("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {} 路由键: {}", correlationId, replyCode, replyText, exchange,

routingKey);

});

return rabbitTemplate;

}

/**

* 确认发送消息是否成功

*

* @return

*/

@Bean

public MsgSendConfirmCallBack msgSendConfirmCallBack() {

return new MsgSendConfirmCallBack();

}

public class MsgSendConfirmCallBack implements RabbitTemplate.ConfirmCallback {

/**

* 回调方法

* @param correlationData

* @param ack

* @param cause

*/

@Override

public void confirm(CorrelationData correlationData, boolean ack, String cause) {

System.out.println("MsgSendConfirmCallBack , 回调id:" + correlationData);

if (ack) {

System.out.println("消息发送成功");

} else {

//可以将消息写入本地,使用定时任务重新发送

System.out.println("消息发送失败:" + cause + "\n重新发送");

}

}

}

4:集群化处理

5:异地容灾

6:发送消息持久化到db中 进行消息的重新发送

7:消费者消息固话到db中 通过消息id判断是否重复消费

参考:https://www.freesion.com/article/1880596463/

RabbitMQ

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:《Python大规模机器学习》— 2.2.4 使用数据库
下一篇:Kafka过期数据未老化原因及解决方案
相关文章