14 rocketmq整合SpringCloudStream

网友投稿 614 2022-05-30

14 RocketMQ整合springCloudStream

14 rocketmq整合SpringCloudStream

发送消息

消费消息:

Spring Cloud Stream

14 rocketmq整合SpringCloudStream

发送消息

消费消息:

Spring Cloud Stream

14 rocketmq整合SpringCloudStream

发送消息

org.springframework.boot spring-boot-starter-web com.alibaba.cloud spring-cloud-starter-stream-rocketmq

spring: cloud: stream: rocketmq: binder: name-server: 127.0.0.1:9876 bindings: output: destination: TopicTest group: PRODUCER_GROUP_TOPIC_TEST

@SpringBootApplication @EnableBinding({ Source.class }) public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }

@Component public class ProduceController { @Autowired private Source source; @PostConstruct private void init() throws InterruptedException { MessageBuilder builder = MessageBuilder.withPayload("init..."); Message message = builder.build(); source.output().send(message); System.out.println("init..."); } }

@EnableBinding({ Source.class })表示绑定配置文件中名称为output的消息通道Binding,Source类中定义的消息通道名称为output。

消费消息:

spring: cloud: stream: rocketmq: binder: name-server: 127.0.0.1:9876 bindings: input: destination: TopicTest2 group: CONSUER_GROUP_DEMO_1

name-server是RocketMq的NameServer地址,destination指定Topic名称,指定名称为input的Binding接收TopicTest的消息

消息监听:

@EnableBinding({ Sink.class}) @SpringBootApplication public class Application { @StreamListener(value = InputChannel.ORDER_INPUT) public void receive(String receiveMsg) { System.out.println("receive: " + receiveMsg); } public static void main(String[] args) { SpringApplication.run(Application.class, args); } }

@EnableBinding({ Sink.class})表示绑定配置文件名称为input的消息通道Binding,Sink类中定义的消息通道名称为input,@StreamListener表示定义一个消息监听器,接收RocketMQ中的消息。

Spring Cloud Stream

Spring Cloud Stream是构建与共享消息传递系统连接的高度可伸缩的事件驱动微服务,目的是简化消息业务在Spring Cloud应用程序中的开发。

通过Spring Cloud Stream注入的输入通道inputs和输出通道outputs与消息中间件Middleware通信,消息通道通过特定中间件绑定器Binder实现连接到外部代理。

Spring Cloud Stream实现基于发布/订阅机制,核心四个部分组成:Spring Framework中的Spring Messaging和Spring Integration,Spring Cloud Stream中的Binders和Bindings。

Spring Messaging:Spring Framework中的统一消息编程模型

Message:消息对象,包含消息头Header和消息体Payload

MessageChannel:消息通道接口,用于接收消息,提供send方法将消息发送至消息通道。

MessageHandler:消息处理器接口,用于处理消息逻辑。

Spring Integration:支持企业集成的扩展机制,提供简单的模型来构建企业集成解决方案,对Spring Messaging进行扩展。

MessageDispatcher:消息分发接口,用于分发消息和添加删除消息处理器

MessageRouter:消息路由接口,定义默认的输出消息通道。

Filter:消息过滤注解,用于配置消息过滤表达式

Aggregator:消息的聚合注解,用于将一条消息拆分成多条。

Splitter:消息分割,用于将一条消息拆分成多条。

Binders:目标绑定器,负责与外部消息中间件系统集成的组件。

doBindProducer:绑定消息中间件客户端发送消息模块。

doBindConsumer:绑定消息中间件客户端接收消息模块。

Bindings:外部消息中间件系统与应用程序提供的消息生产者和消费者之间的桥梁。

Spring Cloud Alibaba RocketMQ架构图

MessageChannel(output):消息通道,用于发送消息,Spring Cloud Stream的标准接口

MessageChannel(input):消息通道,用于订阅消息,Spring Cloud Stream的标准接口

Binder bindProducer:目标绑定器,将发送通道发过来的消息发送到RocketMQ消息服务器

Binder bindConsumer:目标绑定器,将接收到RocketMQ消息服务器的消息推送给订阅通道

Spring

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:为什么他们选择了GaussDB
下一篇:微服务进阶(四):Spring Cloud netflix概览和架构设计
相关文章