物联网网关开发基于MQTT消息总线的设计过程丨【拜托了,物联网!】

网友投稿 1100 2022-05-29

一、前言

二、网关的作用

2.1 指令转发

2.2 外网通信

2.3 协议转换

2.4 设备管理

2.5 边沿计算(自动化控制)

一、前言

二、网关的作用

2.1 指令转发

2.2 外网通信

2.3 协议转换

2.4 设备管理

2.5 边沿计算(自动化控制)

三、网关内部进程之间的通信

3.1 网关中需要哪些进程

3.2 mqtt消息总线

3.3 Topic 的设计

3.4 与 DBUS 总线的对比

四、网关与云平台之间的通信

4.1 与云平台之间的 MQTT 连接

4.2 Proc_Bridge 进程:外部和内部消息总线之间的桥接器

1. mosquitto 的 API 接口

2. 利用 UserData 指针,实现多个 MQTT 连接

五、总结

一、前言

在一个嵌入式系统中,利用

MQTT消息总线

在各进程之间进行通信,是一个很常见的进行通信方式。

这样的通信模型,对于非工控产品来说,

通信速度完全足够

。以前做过测试,在x86平台和ARM平台,一条数据从本地到云端绕一下,然后再回到本地,可以控制在

毫秒级别

这篇文章,我们来具体的聊一聊物联网系统中的

网关内部程序

可以利用 MQTT 消息总线如何进行设计。

阅读这篇文章,你可以有如下收获:

物联网系统中,设备之间是如何通信的;

网关中的进程之间消息总线通信模型;

网关内部消息总线上的数据如何与服务器进行通信;

作为消遣,了解一下物联网系统中的一些基本知识;

二、网关的作用

物联网这个词语的范畴太广,似乎所有的硬件设备,只要能够接入网络,就可以称之为物联网产品,似乎物联网这个词可以把一切都纳入到其中。这么空洞的词语不利于我们的讲解,因此我们就用一个可以

感知、想象

的场景来代替,那就是

智能家居系统

,这是最能代表物联网时代的典型产品了。

在一个智能家居系统中,假设有这么几个设备:

这些设备的通信模块,如果是

WiFi

或者是

蓝牙

,那么一般都可以直接通过手机来控制(当然,需要厂家提供相应的手机 APP),手机就相当于一个

中心节点

,控制着所有的设备。目前市面上的一些智能设备

单品

都是这样的通信方式,例如:空调、吸尘器、空气净化器、冰箱等等。只要在这些设备中加一个

无线通信模块

即可(例如:ESP8266模块)。

如果通信模块是其它的通信模块,例如:

RF433、ZigBee、ZWave

等,由于手机没有这些通信模块,因此就需要一个

网关

来“转发”指令。手机和网关都连接到家中的路由器,

处于同一个局域网中

,手机把控制指令发送给网关,网关再把指令转发给相应的设备。通信模型如下:

在上面的通信模型中,手机和网关由于处于同一个局域网中,因此可以

直接通信

。如果手机不在局域网中呢?那么就要通过云端的服务器来转发了,通信模型如下:

手机把指令发到服务器;

服务器把指令转发给网关;

网关把指令发给指定的设备;

以上描述的是控制指令的流程,如果是设备发出的报警信息呢,数据的流向就是

倒过来进行的

可以看出,

网关是所有设备之间通信的中心节点,也是内网与外网之间通信的中转节点,也就是把各种智能设备连接到互联网的中转器。

上面已经提到,硬件设备上的通信模块都是

确定的(RF,ZigBee,ZWave等等)

,一般来说,可以把这些通信模块称呼为

无线通信协议

。在一套智能家居系统中,所有设备的无线通信协议大部分都是相同的。

那么,

不同类型的无线通信协议设备

是否可以共存在同一个系统中呢?

答案是:

可以

。只要在网关中,集成了相应的

无线通信协议模块

就可以达到这个目的!如下图所示:

从手机APP上看,所有的设备都是相同的,不会关心设备的无线通信协议是什么,因此,发出的控制指令都是

协议无关

的。

当网关接收到控制指令时,首先根据指令内容

查找出目标设备

,然后确定目标设备的

无线通信协议

,最后把指令发送给对应的

硬件通信模块

,由该通信模块通过无线电信号把控制指令发送到设备。

从这个指令的传输过程来看,网关就充当着

协议转换的角色

另外还有一种通信场景:当系统中的一个“输入”设备与一个“输出”设备进行

绑定/关联

时,例如:

红外感应器与声光报警器绑定:当红外感应器监测到人体时,发出信号,然后控制声光报警器发出报警;

门磁与灯绑定:当开门时,门磁发出信号,自动打开灯光;

如果“输入”设备与“输出”设备是

不同类型的

无线通信协议,也需要

网关来进行协议转换

在一个智能家居系统中,设备可多可少,对这些设备进行管理也是很重要的事情。网关作为系统的中心节点,对设备进行管理的重任理所当然就由网关来承担。

设备管理功能包括:

设备的添加和删除;

设备状态的管理(电量、设备断网、失联等等);

设备树的管理;

在正常的情况下,网关是可以通过路由器,与服务器保持着

长连接

的。如果服务器的处理能力比较强大,智能家居系统中所有需要处理的事情

都可以丢给服务器

来计算、处理,服务器在计算之后把处理结果再发送给网关。看起来想法很完美!

但是,考虑下面这 2 种情况:

路由器出现问题了,网关无法连接到服务器,因此就无法把本地数据及时上报;

系统中出现了异常情况,需要紧急处理,如果把信息上报到服务器,由服务器计算之后再回传给网关,耗费的时间可能超过了可容忍时间,该如何处理?(可以用车联网系统来脑补一下这个场景:自动驾驶中的汽车遇到紧急情况,如果把所有信息上传给服务器,然后等待服务器的下一步指令?)

对于上面的这些场景,把一些计算、处理操作

放在网关这一端来处理

也许更合适!这也是近几年比较流行的

边沿计算

1. 边缘计算,是指在靠近物或数据源头的一侧,采用网络、计算、存储、应用核心能力为一体的开放平台,就近提供最近端服务。 2. 其应用程序在边缘侧发起,产生更快的网络服务响应,满足行业在实时业务、应用智能、安全与隐私保护等方面的基本需求。 3. 边缘计算处于物理实体和工业连接之间,或处于物理实体的顶端。而云端计算,仍然可以访问边缘计算的历史数据

三、网关内部进程之间的通信

在设计一个应用程序的架构时,可以通过

多线程

来实现,也可以通过多进程来实现,每个人的习惯都不一样,各有各的好处。我们这里不去讨论孰优孰劣,因为我对多进程这样的设计思想比较偏爱,所以就直接按照多进程的程序架构来讨论。

网关中需要执行的所有进程,是根据网关的功能来决定的,假设包括如下的功能:

(1)连接外网的进程 Proc_Bridge

网关需要连接到云端的服务器,需要一个进程与服务器之间保持长连接,这样就可以

及时接收到

服务器发来的控制指令,以及把系统内部数据

及时上报

给服务器。

这个进程需要把从服务器接收到的指令

转发

到网关系统内部,把从系统内部接收到的信息

转发

给服务器,类似于桥接的功能,因此命名为 Proc_Bridge。

(2)设备管理进程 Proc_DevMgr

这个进程用来执行设备管理功能,设备的添加(入网)、删除(退网),都由此进程来管理。

(3)协议转换进程 Proc_Protocol

下行

:把应用层的统一通信协议,转换成不同类型无线通信协议,发送给相应的无线模块。

上行

:把设备上报的、不同类型的无线通信协议,转换成应用层的统一通信协议。

(4)边沿计算进程(自动化控制) Proc_Auto

很明显,这需要一个独立的进程来处理各种计算,这个进程就相当于

系统的大脑

(5)无线通信协议相关的进程 Proc_ZigBee, Proc_RF, Proc_ZWave

在硬件上,每一种无线通信模块通过

串口或其他硬件连接方式

与到网关的 CPU 进行通信,因此,每一种无线通信模块都需要一个相应的进程来处理。

(6)其他“软设备”进程 Proc_Xxx

在之前的项目中,还遇到一些硬件设备,它们与门磁、插座等设备

在逻辑上处于同一个层次

,但是与网关之间是通过

TCP

来连接。对于这样的设备,也可以使用一个独立的进程来进行管理。

上面的这些进程,在网关中的运行模型如下:

以上这些进程之间需要相互通信,

不是简单的点对点通信,而是一个网状的通信模型

。比如:

设备管理进程 Proc_DevMgr:当任何一种设备被添加到系统中时,都需要处进行处理,因此它需要与 Proc_ZigBee, Proc_RF, Proc_ZWave 这些进程进行通信;

当某个设备上报数据时(例如:Proc_ZigBee),Proc_Protocol 进程需要把数据进行协议转换,然后 Proc_Bridge 进程把转换后的数据上报给服务器,同时 Proc_Auto 进程需要检查这个设备上报的数据是否触发了其他相关联的设备;

也就是说,这些进程中间的通信是

相互交叉

的,如果通过传统的 IPC 方式(共享内存、命名管道、消息队列、Socket)等,处理起来比较复杂。

引入了

MQTT 消息总线

之后,每个进程只需要挂载到总线上。每个进程只需要

监听自己感兴趣的 topic

,就可以接收到相应的数据。

既然这些进程之间的通信关系比较复杂,那么一个良好的 topic 设计规范就显得很重要了!

MQTT 的通信模型是基于

订阅/发布

物联网网关开发:基于MQTT消息总线的设计过程丨【拜托了,物联网!】

的模式,一个客户端(进程)接入到消息总线之后,需要

注册

自己感兴趣的

主题 topic

,其他客户端(进程)往这个 topic 发送消息,即可被订阅者接收到。

主题 topic 是一个以

反斜线(/)

分割的字符串,用来表示

多层的分级结构

,例如下面的这 2 个 topic,是亚马逊 AWS 平台中在线升级(OTA)相关的 topic:

/aws/things/MyThing/jobs/get/accepted

/aws/things/MyThing/jobs/get/rejected

在我们的示例场景中,可以按照下面这样来设计主题 topic:

(1) Proc_DevMgr

订阅主题:

/iot/v1/ZigBee/Register

/iot/v1/ZigBee/UnRegister

/iot/v1/RF/Register

/iot/v1/RF/UnRegister

/iot/v1/ZWave/Register

/iot/v1/ZWave/UnRegister

(2) Proc_Bridge

订阅主题:

/iot/v1/Device/Report

发出数据的主题:

/iot/v1/Device/Control

/iot/v1/Device/Remove

/iot/v1/Auto/AddRule

/iot/v1/Auto/RemoveRule

(3) Proc_Protocol

订阅主题:

/iot/v1/Device/Control

/iot/v1/Device/Remove

/iot/v1/ZigBee/Report

/iot/v1/RF/Report

/iot/v1/ZWave/Report

发送数据主题:

/iot/v1/Device/Report

/iot/v1/ZigBee/Control

/iot/v1/ZigBee/Remove

/iot/v1/RF/Control

/iot/v1/RF/Remove

/iot/v1/ZWave/Control

/iot/v1/ZWave/Remove

(4) Proc_Auto

订阅主题:

/iot/v1/Auto/AddRule

/iot/v1/Auto/RemoveRule

/iot/v1/Device/Report

发送数据主题:

/iot/v1/Device/Control

(5) Proc_ZigBee

订阅主题:

/iot/v1/ZigBee/Control

/iot/v1/ZigBee/Remove

发送数据主题:

/iot/v1/ZigBee/Register

/iot/v1/ZigBee/UnRegister

/iot/v1/ZigBee/Report

(6) Proc_RF

订阅主题:

/iot/v1/RF/Control

/iot/v1/RF/Remove

发送数据主题:

/iot/v1/RF/Register

/iot/v1/RF/UnRegister

/iot/v1/RF/Report

(7) Proc_ZWave

订阅主题:

/iot/v1/ZWave/Control

/iot/v1/ZWave/Remove

发送数据主题:

/iot/v1/ZWave/Register

/iot/v1/ZWave/UnRegister

/iot/v1/ZWave/Report

以上这些主题 topic 的设计,还是有些

粗略

的。如果借助

通配符(#, +, $)

,可以设计出更灵活的层次结构。

多层通配符: “#”是用于匹配主题中任意层级的通配符,多层通配符表示它的父级和任意数量的子层级。

单层通配符:“+”加号是只能用于单个主题层级匹配的通配符,在主题过滤器的任意层级都可以使用单层通配符,包括第一个和最后一个层级。

通配符:“$”表示匹配一个字符,只要不是放在主题的最开头,其它情况下都表示匹配一个字符。

我们以一个

控制指令

为例,来梳理一下数据是如何通过 topic 进行流动:

Proc_Bridge 进程从服务器接收到控制指令后,发送到消息总线上的 topic: /iot/v1/Device/Control。

由于 Proc_Protocol 进程订阅了这个 topic,所以立刻接收到指令。

Proc_Protocol 分析指令内容,发现是一个 ZigBee 设备,于是进行协议转换,发送一个 ZigBee 控制指令到消息总线上的 topic: /iot/v1/ZigBee/Control。

由于 Proc_ZigBee 进程订阅了这个 topic,因此它接收到这个控制指令。

Proc_ZigBee 把控制指令转换成 ZigBee 无线通信模块要求的格式,通过硬件发送给设备灯泡。

我们再分析一下

设备数据上报

的场景:

先关注图中

红色箭头

,忽略蓝色箭头:

门磁打开后,通过无线通信把信息上报给进程 Proc_CF。

Proc_RF 进程接收到 RF433 通信模块上报的数据,把“门磁打开”这个信息发送到消息总线上的 topic:/iot/v1/RF/Report。

由于 Proc_Protocol 进程订阅了这个 topic,因此接收到上报的门磁数据。

Proc_Protocol 分析数据,把 RF433 协议的数据转成统一的应用层协议的数据,发送到消息总线上的 topic:/iot/v1/Device/Report。

由于 Proc_Bridge 进程订阅了这个 topic,因此就接收到了这次上报的数据。

Proc_Bridge 进程把数据上报给服务器。

再来看一下

蓝色箭头

流程:

在上面的第 4 步:Proc_Protocol 进程把 RF433 协议数据转成应用层统一协议之后,把数据发送到消息总线上的 topic:

/iot/v1/Device/Report

之后,

Proc_Auto

进程同时进行如下操作:

由于 Proc_Auto 也订阅了这个 topic,因此它也接收到了门磁上报的这个应用层协议的数据。

Proc_Auto 查找自己的配置信息(假设用户已经提前配置好了一条规则:当门磁打开的时候,就触发声光报警器),发现匹配到了“门磁->报警器”这条规则,于是发出一条控制报警器的指令,发送到消息总线上的 topic: /iot/v1/Device/Control。

后面的 7,8,9,10 这四个步骤就与上面的

控制指令流程完全一样了

从上面描述的 3 个数据流向的场景中,是不是感觉到使用 topic 为

“数据管道”

的这种通信方式,与

Linux 系统中的 DBUS

总线特别的相似?

DBUS 总线也是用于

进程之间的通信

,按照我个人的理解,DBUS中其实是把进程之间的两种通信组织在一起了:

基于信号的数据传输;

基于方法的 RPC 远程调用;

DBUS 总线包含的概念更复杂一些,包括:

路径、对象、接口、方法

等等,这些概念组织在一起

共同定位到

一个具体的服务提供者了。

相比较而言,我感觉 MQTT 这样的方式

更简洁

一些。

所谓的 RPC 远程调用,就是调用位于远程机器上的一个函数,主要解决两个问题:

网络连接;

数据的序列化和反序列化;

后面我会专门写一篇文章,利用

protobuf 框架

来实现 RPC 调用。

四、网关与云平台之间的通信

上面讲解的设计过程,是

网关内部

的各功能模块之间通信方式,这也是我们作为嵌入式开发者能

充分发挥的部分

网关与云平台之间的通信方式一般都是客户指定的,就那么几种(阿里云、华为云、腾讯云、亚马逊AWS平台)。一般都要求网关与云平台之间处于

长连接

的状态,这样云端的各种指令就可以随时发送到网关。

当然了,这些云平台都会提供相应的 SDK 开发包,一般使用 MQTT 协议来连接云平台的更多一些。在一些文档中,会把位于云端的 MQTT 服务器称作

Broker

,其实就是一个服务器。

进程

Proc_Bridge

的功能主要有 2 点:

与云平台的数据传输通道;

协议转换:把云平台相关的协议转换成网关内部的协议,以及相反的转换。

也就是说:Proc_Bridge 进程需要

同时连接到云平台的 MQTT Broker 和网关内部的 MQTT 消息总线

目前的几大物联网云平台,都提供了不同的接入方式。对于网关来说,应用最多的就是

MQTT

接入。

我们知道,MQTT 只是一个

协议

而已,不同的编程语言中都有实现,在 C 语言中也有好几个实现。

在网关内部,运行着一个后台 deamon:

MQTT Broker

,其实就是 mosquitto 这个可执行程序,它充当着消息总线的功能。这里请大家注意:因为这个消息总线是

运行在嵌入式系统的内部

,接入总线的客户端就是需要相互通信的那些

进程

。这些进程的数量是有限的,即使是一个比较复杂的系统,最多十几个进程也就差不多了。因此,mosquitto 这个实现是

完全可以支撑系统负载的

那么,如果在

云端

部署一个 MQTT Broker,理论上是可以直接使用 mosquitto 这个实现来作为消息总线的,但是你要评估接入的

客户端(也就是网关)在一个什么样的数量级,考虑到并发的问题,一定要做压力测试。

对于后台开发,我的经验不多,不敢(也不能)多言,误导大家就罪过了。不过,对于一般的学习和测试来说,在云端直接部署 mosquitto 作为消息总线,是没有问题的。

下面这张图,说明了

Proc_Bridge 进程

在这个模型中的作用:

从云平台消息总线接收到的消息,需要转发到内部的消息总线;

从内部消息总线接收到的消息,需要转发到云平台的消息总线;

如果用 mosquitto 来实现,应该如何来实现呢?

mosquitto 这个实现是基于

回调函数

的机制来运行的,例如:

// 连接成功时的回调函数 void my_connect_callback(struct mosquitto *mosq, void *obj, int rc) { // ... } // 连接失败时的回调函数 void my_disconnect_callback(struct mosquitto *mosq, void *obj, int result) { // ... } // 接收到消息时的回调函数 void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message) { // .. } int main() { // 其他代码 // ... // 创建一个 mosquitto 对象 struct mosquitto g_mosq = mosquitto_new("client_name", true, NULL); // 注册回调函数 mosquitto_connect_callback_set(g_mosq, my_connect_callback); mosquitto_disconnect_callback_set(g_mosq, my_disconnect_callback); mosquitto_message_callback_set(g_mosq, my_message_callback); // 这里还有其他的回调函数设置 // 开始连接到消息总线 mosquitto_connect(g_mosq, "127.0.0.1", 1883, 60); while(1) { int rc = mosquitto_loop(g_mosq, -1, 1); if (rc) { printf("mqtt_portal: mosquitto_loop rc = %d \n", rc); sleep(1); mosquitto_reconnect(g_mosq); } } mosquitto_destroy(g_mosq); mosquitto_lib_cleanup(); return 0; }

以上代码就是一个 mosquitto 客户端的

最简代码

了,使用回调函数的机制,让程序的开发非常简单。

mosquitto 把底层的细节问题都帮助我们处理了,只要我们注册的函数

被调用

了,就说明

发生了我们感兴趣的事件

这样的回调机制在各种开源软件中使用的比较多,比如:

glib 里的定时器、libevent通讯处理、libmodbus 里的数据处理、linux 内核中的驱动开发和定时器

,都是这个套路,一通百通!

在网关中的每个进程,只需要添加上面这部分代码,就可以

挂载到消息总线上

,从而可以与其它进程进行收发数据了。

上面的实例仅仅是连接到

一个

消息总线上,对于一个普通的进程来说,达到了通信的目的。

但是对于

Proc_Bridge 进程

来说,还没有达到目的,因为这个进程处于

桥接

的位置,需要

同时连接到远程和本地这两个消息总线上

。那么应该如何实现呢?

看一下

mosquitto_new

这个函数的签名:

/* * obj - A user pointer that will be passed as an argument to any * callbacks that are specified. */ libmosq_EXPORT struct mosquitto *mosquitto_new(const char *id, bool clean_session, void *obj);

最后一个参数的作用是:可以设置一个

用户自己的数据

(作为指针传入),那么 mosquitto 在

回调

我们的注册的任何一个函数时,

都会把这个指针传入

。因此,我们可以利用这个参数来区分这个连接是远程连接?还是本地连接。

所以,我们可以定义一个结构体变量,把一个 MQTT 连接的

所有信息

都记录在这里,然后注册给 mosquitto。当 mosquitto 回调函数时,把这个结构体变量的指针

回传

给我们,这样就拿到了这个连接的所有数据,在某种程度上来说,这也是一种面向对象的思想。

// 从来表示一个 MQTT 连接的结构体 typedef struct{ char *id; char *name; char *pw; char *host; int port; pthread_t tHandle; struct mosquitto *mosq; int mqtt_num; }MQData;

完整的代码已经放到网盘里了,为了让你

先从原理上看明白

,我把关键几个地方的代码贴在这里:

// 分配结构体变量 MQData userData = (MQData *)malloc(sizeof(MQData)); // 设置属于这里连接的参数: id, name 等等 // 创建 mosquitto 对象时,传入 userData。 struct mosquitto *mosq = mosquitto_new(userData->id, true, userData); // 在回调函数中,把 obj 指针前转成 MQData 指针 static void messageCB(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message) { MQData *userData = (MQData *)obj; // 此时就可以根据 userData 指针中的内容分辨出这是哪一个链接了 }

另外一个问题:不知道你是否注意到示例中的

mosquitto_loop()

这个函数?这个函数需要

放在 while 死循环中不停的调用,才能出发 mosuiqtto 内部的事件

。(其实在 mosuiqtto 中,还提供了另一个简化的函数

mosquitto_loop_forever

)。

也就是说:在每个连接中,需要

持续的触发

mosquitto 底层的事件,才能让消息系统顺利的收发。因此,在示例代码中,使用

两个线程

分别连接到云平台的总线和内部的总线。

五、总结

这篇文章,基本上把一个

物联网系统

的网关中,

最基本的通信模型

聊完了,相当于是一个程序的骨架吧,剩下的事情就是处理业务层的细节问题了。

万里长征,这才是第一步!

对于一个网关来说,还有其他更多的问题需要处理,比如:MQTT 连接的鉴权(用户名+密码,证书)、通信数据的序列化和反序列化、加密和解密等等,以后慢慢聊吧,希望我们一路前行!

【拜托了,物联网!】有奖征文火热进行中:https://bbs.huaweicloud.com/blogs/296704

IoT MQTT NAT 嵌入式

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:案例|银行 | Zabbix 监控架构分享
下一篇:计算机控制技术实验说明
相关文章