业界消息总线技术分析-RabbitMQ

网友投稿 683 2022-05-28

1      概述

RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现,最初由RabbitMQ Technologies Ltd开发并且提供商业支持的。该公司在2010年4月被SpringSource(VMWare的一个部门)收购。在2013年5月被并入Pivotal。

1.1      官网介绍

为应用提供健壮的消息投递,易于使用,能运行于多个主流的操作系统,多种类型的开发平台。

1.2      亮点

可靠性:RabbitMQ提供多种功能让你能够在性能与可靠性之间做出权衡,包括持久化、消息投递确认、发布者确认和高用性。

灵活的路由:消息在到达queues之前,通过exchanges进行路由。RabbitMQ为典型的路由逻辑內建了多种exchange类型。你能够将多种exchanges绑定在一起,也能够以插件的形式,自己实现一个exchange。

集群:在同一个本地网络中的多个RabbitMQ server能够组成集群,形成一个统一的逻辑broker。

联合:如果多个server只需要一个松散的、不可靠的连接,则可以通过RabbitMQ的联合模式实现。

高可用的queues:Queues能够在集群内多台机器之间镜像,确保在某一台机器故障时的消息安全。

多协议:RabbitMQ支持多种消息协议。核心协议是AMQP 0-9-1,另外还能够以插件的形式,支持STOMP、MQTT、AMQP1.0等。

多客户端:你能想到的任何语言几乎都有RabbitMQ的客户端。

管理界面:提供容易使用的管理界面来监控broker。

消息跟踪:RabbitMQ提供消息跟踪功能。

支持插件:支持以插件的形式,扩展功能。

1.3      RabbitMQ解决什么问题?

1)信息的发送者和接收者如何维持连接,如果一方的连接中断,这期间的数据如何防止丢失?

2)如何降低发送者和接收者的耦合度?

3)如何让Priority高的接收者先接到数据?

4)如何做到load balance?有效均衡接收者的负载?

5)如何有效的将数据发送到相关的接收者?也就是说接收者subscribe 不同的数据,如何做有效的filter。

6)如何做到可扩展,甚至将这个通信模块发到cluster上?

7)如何保证接收者接收到了完整,正确的数据?

AMQP协议解决了以上的问题,而RabbitMQ实现了AMQP。

2      性能吞吐量

2.1      与redis对比

测试环境

个人笔记本,Server与Client使用同一台机器。

Processor : Inter(R) Core(TM)2 Duo CPU P8400 @2.26GHz RAM : 4.00GB Operating System : 32-bit Ubuntu 10.10。

软件环境

RabbitMQ和Redis的Server启动都是用默认配置,客户端使用Java,JVM启动使用默认参数:

RabbitMQ Server 2.2.0

RabbitMQ Java Client 2.2.0

Redis 2.0 Stable

Jedis

入队性能

出队性能

测试结论

对于入队操作,当数据比较小时Redis的性能要高于RabbitMQ,而如果数据大小超过了10K,Redis慢的无法忍受。

对于出队操作,无论数据大小,Redis都表现出非常好的性能,而RabbitMQ的出队性能则远低于Redis。

2.2      与kafka对比

测试环境

ubuntu 15.10 64位

cpu:inter core i7-4790 3.60GHZ * 8

内存:16GB

硬盘:ssd 120GB

软件环境:rabbmitmq 3.6.0   kafka0.8.1  (均为单机本机运行)

测试结果均为单操作测试,即生产的时候没有消费操作

测试结果

kafka :消费速度: 37,586 /s  生产速度: 448,753 /s

rabbitmq: 消费速度: 20,807 /s  生产速度  16,413 /s

测试结论

很明显的看出kafka的性能远超rabbitmq。

3     RabbitMQ的架构

架构图如下:

从架构图可以看出,Procuder Publish的Message进入了Exchange。接着通过“routing keys”, RabbitMQ会找到应该把这个Message放到哪个queue里。

有三种类型的Exchanges:direct, fanout,topic。每个实现了不同的路由算法(routing algorithm)。

Direct exchange: 如果routing key 匹配, 那么Message就会被传递到相应的queue中。

Fanout exchange: 会向相应的queue广播。

Topic exchange: 对key进行模式匹配,比如ab*可以传递到所有ab*的queue。

4     分布式模型

4.1      三种模式

集群模式(Clustering):

连接多台机器,作为一个逻辑的broker。集群节点之间通过Erlang的消息进行通信,同一个集群内的每一个节点,都必须有相同的Erlang cookie。集群内的所有节点,都必须有相同版本的RabbitMR与Erlang。

联合模式(Federation):

联合模式允许一个broker上的exchange或queue接收消息,然后发布到另外一个broker上的exchange或queue。这里的broker可以是独立的机器,也可以是一个集群。联合模式broker之间通过AMQP进行通信。

铲子模式(The Shovel):

使用铲子连接brokers在概念上与联合模式类似,不一样的是,铲子模式工作在更低的level。铲子只是简单地从一个broker的queue消费消息,然后转发到另外一个broker的exchange上。

对照

4.2      集群模式

RabbitMQ逻辑视图

RabbitMQ物理视图

Ø  分为磁盘节点与内存节点。

Ø  队列数据只能够存储在磁盘节点上。

Ø  状态数据存储在内存节点与磁盘节点上,内存节点只把数据保存在内存上,因此能够提高速度,但仅仅影响到资源管理。

Ø  节点之间通过Erlang Cookie进行鉴权,也就是说,互相通信的两个节点,一定要有相同的Erlang Cookie。

Ø  集群可以随时增加或减少节点,但需要注意的是,如果整个集群都宕掉,则恢复集群时最后宕掉的节点需要最先启动。

4.3      普通集群

业界消息总线技术分析-RabbitMQ

Ø  queue在集群中对于contents只存储一份,其他节点只存储meta信息。

Ø  对于publish,客户端任意连接集群的一个节点,转发给创建queue的节点存储消息的所有信息;

Ø  对于consumer,客户端任意连接集群中的一个节点,如果数据不在该节点中,则从存储该消息data的节点拉取。

Ø  可见当存储有queue内容的节点失效后,只要等待该节点恢复后,queue中存在的消息才可以获取消费的到。

Ø  显然增加集群的节点,可以提高整个集群的吞吐量,但是在高可用方面要稍微差一些。

4.4  集群镜像队列

Ø  mirror queue是为RabbitMQ高可用的一种方案,相对于普通的集群方案来讲,queue中的消息每个节点都会存在一份或多份副本,具体有多少个副本,由策略进行配置。

Ø  在配置了镜像队列之后,单个节点失效的情况下,整个集群仍旧可以提供服务。但是由于数据需要在多个节点复制,在增加可用性的同时,系统的吞吐量会有所下降。

Ø  在实现机制上,mirror queue内部实现了一套选举算法,有一个master和多个slave,queue中的消息以master为主。

Ø  对于publish,可以选择任意一个节点进行连接,若该节点不是master,则RabbitMQ将消息转发给master,然后由master向其他slave节点发送该消息。

Ø  对于consumer,可以选择任意一个节点进行连接,消费的请求会转发给master。为保证消息的可靠性,consumer需要进行ack确认,master收到ack后,才会删除消息,ack消息会同步(默认异步方式)到其他各个节点,slave节点收到从master同步过来的ack,也会将对应的消息删除。

Ø  若master节点失效,则mirror queue会自动选举出一个节点(slave中消息队列最长者)作为master。

Ø  若slave节点失效,mirror queue集群中其他节点的状态无需改变。

5     网络模型

Ø  支持IPv4与IPv6双协议栈

Ø  底层使用TCP

Ø  支持TLS

Ø  官网介绍了较多针对传输层的调优措施,包括TCP Buffer Size、 OS Level Tuning、 TCP Socket Options等,具体可以参考 http://www.rabbitmq.com/networking.html

6     持久化

6.1      消息什么时候需要持久化

消息本身在publish的时候就要求消息写入磁盘。

内存紧张,需要将部分内存中的消息转移到磁盘。

6.2      消息什么时候会刷到磁盘

写入文件前会有一个Buffer,大小为1M(1048576),数据在写入文件时,首先会写入到这个Buffer,如果Buffer已满,则会将Buffer写入到文件(未必刷到磁盘);

有个固定的刷盘时间:25ms,也就是不管Buffer满不满,每隔25ms,Buffer里的数据及未刷新到磁盘的文件内容必定会刷到磁盘;

每次消息写入后,如果没有后续写入请求,则会直接将已写入的消息刷到磁盘:使用Erlang的receive x after 0来实现,只要进程的信箱里没有消息,则产生一个timeout消息,而timeout会触发刷盘操作。

6.3      消息在磁盘文件中的格式

消息保存于$MNESIA/msg_store_persistent/x.rdq文件中,其中x为数字编号,从1开始,每个文件最大为16M(16777216),超过这个大小会生成新的文件,文件编号加1。消息以以下格式存在于文件中:

<>

MsgId为RabbitMQ通过rabbit_guid:gen()每一个消息生成的GUID,MsgBody会包含消息对应的exchange,routing_keys,消息的内容,消息对应的协议版本,消息内容格式(二进制还是其它)等等。

6.4      文件何时删除?

当所有文件中的垃圾消息(已经被删除的消息)比例大于阈值(GARBAGE_FRACTION = 0.5)时,会触发文件合并操作(至少有三个文件存在的情况下),以提高磁盘利用率。

publish消息时写入内容,ack消息时删除内容(更新该文件的有用数据大小),当一个文件的有用数据等于0时,删除该文件。

7      消息QoS

可靠性

支持高可靠场景,一个master加上多个slave来保证消息的可靠。

安全

支持使用SASL进行认证

支持使用SSL进行通道加密

支持ACL

消息传递的优先级

AMQP协议本身支持优先级队列,具体的实现还得再分析。

8     总结

Broker部署较为复杂

基于Erlang语言实现,每一个节点都需要安装Erlang语言运行环境。

配置非常灵活

支持多种分布式模式,集群模式又分为普通集群与集群镜像队列,提供多种路由策略。同时通过插件方式,可以很方便地扩展RabbitMQ的功能。

无中心节点,可以支撑大集群

组件集群是直接使用Erlang语言的特性,不像kafka需要一个zookeeper。

RabbitMQ节点也没有

支持多种类型的协议

不仅仅支持RabbitMQ,还通过插件形式,支持MQTT、STOMP等。

RabbitMQ 存储

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Centos7.6系统重置root用户密码
下一篇:MySQL的redo log重做日志都懂了吗?
相关文章