业界消息总线技术分析-RocketMQ

网友投稿 894 2022-05-28

一、概述

官方简介:

u  RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点:

u  能够保证严格的消息顺序

u  提供丰富的消息拉取模式

u  高效的订阅者水平扩展能力

u  实时的消息订阅机制

u  亿级消息堆积能力

二、性能吞吐量

单节点,消息大小 10个字节。

Kafka:   80万条/秒

RocketMQ:12万条/秒

Kafka性能高的原因:producer端多个小消息合并,批量发送到Broker。

可能存在的问题:如果producer宕机,则会导致消息丢失,业务出错。

三、分布式模型

3.1 网络部署模型

部署模型:Name server 轻量级的名字服务,存储cluster、broker、topic、queue之间的关系—即路由信息。

Name Server 是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。每个name server都是全量的路由信息。

Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId=0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有Name Server。

Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。Producer 完全无状态,可集群部署。

Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic 路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。 Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

3.2 生产端模型

PUSH(推消息)

单个生产者和该生产者关联的所有broker保持长连接。

3.3 消费者模型

轮询消息(PULL模式)

消息拉取线程每隔多久拉取一次?间隔时间由DefaultMQPushConsumer的pullInterval属性控制,默认为0,可手动设置。

消费完offset存储到broker上。该时间由DefaultMQPushConsumer的persistConsumerOffsetInterval属性控制,默认为5秒,可手动设置

四、网络模型

序号

名称

描述

优缺点

1

单个 Master

一个集群只有一台Broker

一旦Broker 重启或者宕机时,会导致整个服务不可用

2

多 Master 模式

一个集群无 Slave,全是 Master,例如 2 个 Master 或者 3 个   Master

优点:配置简单,单个Master 宕机或重启维护对应用无影响。

缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到受到影响。

3

多 Master 多 Slave 模式,异步复制

每个 Master 配置一个 Slave,有多对Master-Slave,HA 采用异步复制方式,主备有短暂消息延迟,毫秒级。

优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,因为 Master 宕机后,消费者仍然可以从   Slave 消费,此过程对应用透明。不需要人工干预。性能同多 Master 模式几乎一样。

缺点:Master 宕机,磁盘损坏情况,会丢失少量消息。

4

多 Master 多 Slave 模式,同步双写

每个 Master 配置一个 Slave,有多对Master-Slave,HA 采用同步双写方式,主备都写成功,向应用返回成功。

优点:数据与服务都无单点,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高

缺点:性能比异步复制模式略低,大约低 10%左右,发送单个消息的 RT 会略高。目前主宕机后,备机不能自动切换为主机,后续会支持自动切换功能。

五、消息模型

队列模型

Topic 可以有N个队列组成。队列可以在一台Broker上,也可以在不同Broker上。相当于kafka的分区。

Tag:消息的二级分类。可以通过此进行过滤消费。

消息模型:

序号

名称

说明

1

顺序消息

局部顺序,即一类消息为满足顺序性,必须Producer单线程顺序发送,且发送到同一个队列,这样Consumer就可以按照Producer发送的顺序去消费消息。

2

普通顺序消息

正常情况下可以保证完全的顺序消息,但是一旦发生通信异常,Broker重启,由于队列总数发生变化,哈希取模后定位的队列会变化,产生短暂的消息顺序不一致。

如果业务能容忍在集群异常情况(如某个Broker宕机或者重启)下,消息短暂的乱序,使用普通顺序方式比较合适。

3

严格顺序消息

顺序消息的一种,无论正常异常情况都能保证顺序,但是牺牲了分布式Failover特性,即Broker集群中只要有一台机器不可用,则整个集群都不可用,服务可用性大大降低。

目前已知的应用只有数据库binlog同步强依赖严格顺序消息,其他应用绝大部分都可以容忍短暂乱序,推荐使用普通的顺序消息。

4

事务消息

阿里云MQ支持分布式事务消息,未来开源版本的RocketMQ也有计划支持分布式事务消息。

消费模型:

序号

名称

说明

1

广播消费

一条消息被多个Consumer消费,即使这些Consumer属于同一个Consumer   Group,消息也会被Consumer Group中的每个Consumer都消费一次,广播消费中的Consumer Group概念可以认为在消息划分方面无意义。

2

集群消费

一个Consumer   Group中的Consumer实例平均分摊消费消息。例如某个Topic有9条消息,其中一个Consumer Group有3个实例(可能是3个进程,或者3台机器),那么每个实例只消费其中的3条消息。

3

并发消费

消费者个数比队列个数多时,并发消费吞吐量大。注意:该情况下,不保序。(这个就相当于我们后面做的并发消费。)

六、消息转发实时性

(1). Producer 发送消息,消息从socket 进入java 堆。

(2). Producer 发送消息,消息从java 堆转入PAGACACHE,物理内存。

(3). Producer 发送消息,由异步线程刷盘,消息从PAGECACHE 刷入磁盘。

(4). Consumer 拉消息(正常消费),消息直接从PAGECACHE(数据在物理内存)转入socket,到达consumer,不经过java 堆。这种消费场景最多,线上96G 物理内存,按照1K 消息算,可以在物理内存缓存1 亿条消息。

(5). Consumer 拉消息(异常消费),消息直接从PAGECACHE(数据在虚拟内存)转入socket。

(6). Consumer 拉消息(异常消费),由于Socket 访问了虚拟内存,产生缺页中断,此时会产生磁盘IO,从磁盘Load 消息到PAGECACHE,然后直接从socket 发出去。

(7). 同5 一致。

(8). 同6 一致。

实时消息消费:大部分都是可以实时消息PageCache的,实时性高。

堆积消息的消费:“读写分离设计”,如果Broker有Slave,消费时,当Master发现,消费者消费的是磁盘上的数据,会把该消费者重定向到Slave节点进行读取。

七、持久化

刷盘:同步/异步刷盘

Replication:同步/异步,参加第四章节。

所有的Topic的队列写入同一个CommitLog,每个CommitLog文件默认大小1G,超过1G自动生成新的。

u  永远一个文件在写,其他文件在读

u  顺序写,随机读

u  消费时,使用mmap + write方式

消息清理

扫描间隔

默认10秒,由broker配置参数cleanResourceInterval决定

空间阈值

物理文件不能无限制的一直存储在磁盘,当磁盘空间达到阈值时,不再接受消息,broker打印出日志,消息发送失败,阈值为固定值85%

清理时机

默认每天凌晨4点,由broker配置参数deleteWhen决定;或者磁盘空间达到阈值

文件保留时长

默认72小时,由broker配置参数fileReservedTime决定

八、消息QoS机制

u  可靠性

Master slave模式时,数据可靠。可以配置1个master,多slave即多副本。

u  安全

通信基于Netty的,可以配置SSL通道。

数据存储安全,开源版本没有看到。

u  时间约束

u  消息传递的优先级

没有。优先级会严重影响队列性能。在内存的数据,排序还好点,已经落盘的排序,有点不可能了。

九、扩展性

扩容:

Broker可以自由的扩容。这样整个集群能力增强,但是对于已经存在的topic的队列,不能自动rebalance到新增的上去。需要专门工具,开源版本没有。

Name server 的扩容,Broker怎么动态发现最新的Name server?客户端(Producer/Comsumer)怎么发现添加的Name server?

Ø  环境变量指定NameServer地址·

export NAMESRV_ADDR=192.168.8.106:9876

Ø  http静态服务器寻址

客户端启动后,会定时访问一个静态的HTTP服务器,地址如下:

http://examle.com:8080/rocketmq/msaddr

这个URL的返回内容如下:

192.168.8.106:9876

客户端默认每隔2分钟访问一次这个HTTP服务器,并更新本地的NameServer地址。URL已经在代码中写死,可通过修改/etc/hosts文件来改变要访问的服务器,例如在/etc/hosts增加如下配置:

10.232.22.67   examle.com

缩容:开源版本没有更多介绍。

升级:v3.2.6新版本向前向后兼容,客户端与服务器不同版本可互相兼容。

消息丰富的过滤机制。可扩展。

Ø  简单过滤:

业界消息总线技术分析-RocketMQ

订阅消息时,可以根据tag过滤。

consumer.subscribe("TopicTest1", "TagA || TagC || TagD");

Ø  高级过滤:

可以在Broker上开启 Filter Server 进程进行过滤。Filter Server上运行的代码,可以自定义。使用CPU 资源来换取网卡流量资源(因为CPU通常不高,如果客户端过滤,则浪费带宽。但是过滤会带来时延)。

九、总结

Broker部署复杂:Broker本身分为Master/Slave角色

Topic Replica只能按照上面Broker的Master/Slave配置,即如果topic创建在1个master/1个slave上,则具有1个副本。如果创建3副本的topic,则需要专门放在1个master/2 slave的Broker上。不灵活。

但是这种结构比kafka可能更可以支撑大集群。因为

1、 没有统一的controller节点。

2、 Broker之间的通信也是固定的,master只和自己的slave通信,进行数据的副本复制。而Kafka的Broker需要和不定数目的Broker进行通信。(topic A 副本可能分布在Broker 1,Broker2,Broker 3上,Topic B的副本可能分布在Broker 1 ,Broker 10,Broker20上)

3、  RocketMQ 2.0 是zookeeper,RocketMQ是自研的name server。可以深入思考一下,为什么?name server代码行数不到1000行,就是个内存K/V数据库。非常轻量级,没有watch机制,都是通过心跳检测。Name server之间不通信,不同步。多台name server相当于多台热备。重新添加一台name server,每30s Broker更新路由信息,可以全量获取到所有的信息。

4、RocketMQ支持丰富的消费过滤,定时发送,失败重传,事务消息,可能是和电商或者说阿里本身的场景有关,即他们需要这些丰富的功能。我们的核心场景是什么?要有所取舍。

分布式消息服务

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:7天玩转PostgreSQL基础训练营(二)
下一篇:支付宝 App 构建优化解析:通过安装包重排布优化 Android 端启动性能
相关文章