RabbitMQ】Go语言实现六种消息中间件模型

网友投稿 775 2022-05-29

文章目录

写在前面

1. 介绍

1.1 什么是MQ

1.2 什么是RabbitMQ

1.3 AMQP 协议

2. Go语言操作RabbitMQ

2.1 下载

2.2 引入驱动

2.3 HelloWorld 模型

2.3.1 生产者

2.3.2 消费者

2.3.3 结果

2.4 Work Queues 模型

2.4.1 生产者

2.4.2 消费者

2.4.3 结果

2.5 Publish/Subscribe 模型

2.5.1 生产者

2.5.2 消费者

2.5.3 结果

2.6 Routing 模型

2.6.1 生产者

2.6.2 消费者

2.7 Topics 模型

2.7.1 生产者

2.7.2 消费者

2.8 RPC 模型

写在前面

本文是使用Go语言实现各种RabbitMQ的中间件模型

1. 介绍

1.1 什么是MQ

MQ(Message Quene) : 翻译为 消息队列,通过典型的 生产者和消费者模型,生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,轻松的实现系统间解耦。

别名为 消息中间件 通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。

目前市面上有很多消息中间件:RabbitMQ,RocketMQ,Kafka等等…

1.2 什么是RabbitMQ

RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求可能比较低了。

1.3 AMQP 协议

AMQP(advanced message queuing protocol) 在2003年时被提出,最早用于解决金融领不同平台之间的消息传递交互问题。

顾名思义,AMQP是一种协议,更准确的说是一种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。这使得实现了AMQP的provider天然性就是跨平台的。以下是AMQP协议模型:

2. Go语言操作RabbitMQ

2.1 下载

下载rabbitmq过程就省了,可以直接到官网网站下载安装,像安装qq一样。

2.2 引入驱动

驱动

go get github.com/streadway/amqp

连接

var MQ *amqp.Connection // RabbitMQ 链接 func RabbitMQ(connString string) { conn, err := amqp.Dial(connString) if err != nil { panic(err) } MQ = conn }

1

2

3

4

5

6

7

8

9

10

2.3 HelloWorld 模型

P代表生产者,C代表消费者,红色部分是队列。

生产者生成消息到队列中,消费者进行消费,直连单点模式。

2.3.1 生产者

声明连接对象

var ProductMQ *amqp.Connection

1

声明通道

ch, err := ProductMQ.Channel()

1

创建队列

q, err := ch.QueueDeclare("hello", // 队列名字 false, // 是否持久化, false, // 不用的时候是否自动删除 false, // 用来指定是否独占队列 false, // no-wait nil, // 其他参数 )

1

2

3

4

5

6

7

参数1(name):队列名字

参数2(durable):持久化,队列中所有的数据都是在内存中的,如果为true的话,这个通道关闭之后,数据就会存在磁盘中持久化,false的话就会丢弃

参数3(autoDelete):不需要用到队列的时候,是否将消息删除

参数4(exclusive):是否独占队列,true的话,就是只能是这个进程独占这个队列,其他都不能对这个队列进行读写

参数5(noWait):是否阻塞

参数6(args):其他参数

发布消息

body := "Hello World!" err = ch.Publish( "", // 交换机 q.Name, // 队列名字 false, // 是否强制性 // 当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返回给生产者 // 当mandatory设置为false时,出现上述情形broker会直接将消息扔掉 false, //当immediate标志位设置为true时,如果exchange在将消息路由到queue(s)时发现对于的queue上么有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或者多个)都没有消费者时,该消息会通过basic.return方法返还给生产者 // 是否立刻 /** 概括来说,mandatory标志告诉服务器至少将该消息route到一个队列中,否则将消息返还给生产者;immediate标志告诉服务器如果该消息关联的queue上有消费者,则马上将消息投递给它,如果所有queue都没有消费者,直接把消息返还给生产者,不用将消息入队列等待消费者了。 **/ amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), // 发送的消息 })

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

参数1(exchange):交换机,后续会讲到

参数2(route-key):队列名字

参数3(mandatory):是否强制性,

当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返回给生产者

当mandatory设置为false时,出现上述情形broker会直接将消息扔掉

参数4(immediate):是否立即处理

当immediate标志位设置为true时,如果exchange在将消息路由到queue(s)时发现对于的queue上么有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或者多个)都没有消费者时,该消息会通过basic.return方法返还给生产者

也就是说,mandatory 标志告诉服务器至少将该消息route到一个队列中,否则将消息返还给生产者;immediate标志告诉服务器如果该消息关联的queue上有消费者,则马上将消息投递给它,如果所有queue都没有消费者,直接把消息返还给生产者,不用将消息入队列等待消费者了。

参数5(msg):发布的消息,ContentType是传输类型,Body是发送的消息。

2.3.2 消费者

声明通道

ch, err := ConsumerMQ.Channel()

1

创建队列

q, err := ch.QueueDeclare( "hello", false, false, false, false, nil, )

1

2

3

4

5

6

7

8

读取队列消息

msgs, err := ch.Consume( q.Name, "", true, false, false, false, nil, )

1

2

3

4

5

6

7

8

9

由于消费者端需要一直监听,所以我们要用一个for循环+channel去阻塞主进程,使得主进程一直处于监听状态。

forever := make(chan bool) go func() { for d := range msgs { fmt.Printf("Received a message: %s", d.Body) } }() fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C") <-forever

1

2

3

4

5

6

7

8

2.3.3 结果

生产者

消费者

2.4 Work Queues 模型

Work queues,也被称为(Task queues),任务模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。

此时就可以使用work queues模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

2.4.1 生产者

生成10条消息到队列中

body := "Hello World! " for i := 0; i < 10; i++ { msg := strconv.Itoa(i) err = ch.Publish( "", // 交换机 q.Name, // 队列名字 false, // 是否强制性 false, // 是否立刻 amqp.Publishing{ ContentType: "text/plain", Body: []byte(body+msg), // 发送的消息 }) }

1

2

3

4

5

6

7

8

9

10

11

12

13

2.4.2 消费者

创建两个一样的消费者进行监听消费,与上面2.3.2的消费者保持一致

2.4.3 结果

消费者1号

消费者2号

2.5 Publish/Subscribe 模型

fanout 扇出 也称为广播

在广播模式下,消息发送流程如下:

可以有多个消费者

每个消费者有自己的queue(队列)

每个队列都要绑定到Exchange(交换机)

生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。

交换机把消息发送给绑定过的所有队列

队列的消费者都能拿到消息。实现一条消息被多个消费者消费

2.5.1 生产者

声明交换机

_ = ch.ExchangeDeclare("logs", "fanout", true, false, false, false, nil)

1

参数1(name):交换机名称

参数2(kind):交换机类型

生产消息

_ = ch.Publish("logs", "", false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), })

1

2

3

4

5

2.5.2 消费者

声明交换机

_ = ch.ExchangeDeclare("logs", "fanout", true, false, false, false, nil, )

1

声明队列

q, _ := ch.QueueDeclare("", false, false, true, false, nil, )

1

绑定交换机

_ = ch.QueueBind(q.Name, "", "logs", false, nil, )

1

消费消息

msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil, )

1

2.5.3 结果

【RabbitMQ】Go语言实现六种消息中间件模型

生产者

消费者

2.6 Routing 模型

P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。

X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列

C1:消费者,其所在队列指定了需要routing key 为 error 的消息

C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

在fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模型下:

队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)

消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。

Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

2.6.1 生产者

_ = ch.ExchangeDeclare("logs_direct", "direct", true, false, false, false, nil, ) body := "Hello World " _ = ch.Publish("logs_direct", "", false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), })

1

2

3

4

5

6

7

2.6.2 消费者

只接受warn

_ = ch.ExchangeDeclare("logs_direct", "direct", true, false, false, false, nil, ) q, _ := ch.QueueDeclare("hello", false, false, true, false, nil, ) _ = ch.QueueBind(q.Name, "warn", "logs_direct", false, nil, ) msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil, )

1

2

3

4

只接受info

_ = ch.ExchangeDeclare("logs_direct", "direct", true, false, false, false, nil, ) q, _ := ch.QueueDeclare("hello", false, false, true, false, nil, ) _ = ch.QueueBind(q.Name, "info", "logs_direct", false, nil, ) msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil, )

1

2

3

4

2.7 Topics 模型

Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!

这种模型Routingkey 一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

统配符

* 匹配不多不少恰好1个词

# 匹配一个或多个词

如:

fan.# 匹配 fan.one.two 或者 fan.one 等

fan.* 只能匹配 fan.one

2.7.1 生产者

_ = ch.ExchangeDeclare("logs_topic", "topic", true, false, false, false, nil, ) body := "Hello World " _ = ch.Publish("logs_topic", "", false, false, amqp.Publishing{ ContentType: "text/plain", Body: []byte(body), })

1

2

3

4

5

6

7

2.7.2 消费者

只接受*.one

_ = ch.ExchangeDeclare("logs_topic", "topic", true, false, false, false, nil, ) q, _ := ch.QueueDeclare("hello", false, false, true, false, nil, ) _ = ch.QueueBind(q.Name, "*.one", "logs_topic", false, nil, ) msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil, )

1

2

3

4

只接受*.fan

_ = ch.ExchangeDeclare("logs_topic", "topic", true, false, false, false, nil, ) q, _ := ch.QueueDeclare("hello", false, false, true, false, nil, ) _ = ch.QueueBind(q.Name, "*.fan", "logs_topic", false, nil, ) msgs, _ := ch.Consume(q.Name, "", true, false, false, false, nil, )

1

2

3

4

2.8 RPC 模型

日后补充

Go RabbitMQ

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Docker官方文档翻译2
下一篇:CentOS、Ubuntu、Debian三个linux比较异同
相关文章