使用JAVA版Paho框架开发原生MQTT接口

网友投稿 1123 2022-05-30

说明:阅读该文档之前需要对Mqtt有一定的了解,这里不对Mqtt知识作介绍,对Mqtt的了解请自行搜索学习。主要说明一下用一个简单的Demo样例,实现和IoT平台的对接,上报数据,下发命令等

一、注册设备

产品信息中的 协议类型 必须为MQTT

使用JAVA版Paho框架开发原生MQTT接口

设备管理—>新增真实设备—>选择上面开发好的产品—>接入方式选择 直连

保存设备ID和密钥,利用其构建clientID

进入设备管理界面—>产品模型

如果没有产品模型,可以点击右上角,从产品中心导入或者是本地导入

注意:产品的协议类型必须为MQTT

进入设备管理界面—>设备—>设备注册—>创建

保存设备ID和密钥,利用其构建clientID

注册设备(密码方式)

https://support.huaweicloud.com/api-IoT/iot_06_0005.html

二、IoT平台提供的原生MQTT接口

https://support.huaweicloud.com/api-IoT/iot_06_3002.html

Java

本篇文档基于eclipse的paho框架,该框架网上资料较多,可自行百度搜索学习。

代码中在Maven依赖上加载:

    org.eclipse.paho     org.eclipse.paho.client.mqttv3     1.2.0

或者直接引用jar包:org.eclipse.paho.client.mqttv3-1.2.0.jar

(1) 主要是证书的配置,certFile就是证书的路径

(2) 其他参数的配置请参考源码:)

Java

private static MqttClient mqttClient; //连接地址每个局点不一样,比如开发中心是:"ssl://iot-acc-dev.huaweicloud.com:8883" private static String url = "ssl://xx.xx.xx.xx:8883"; //注册直连设备的时候返回的设备ID private static String deviceID = "9a57a-***-***-816b3e"; //注册直连设备的时候返回的秘钥 private static String secret = "cbd*******3abv"; private static String curTime = curTimeStamp(); private static String password = makePwd(secret, curTime); //clientID参考API文档拼装 private static String clientId = deviceID + "_0_0_" + curTime; mqttClient = new MqttClient(url, clientId, new MemoryPersistence()); // 设置回调,这里主要写了接收消息之后的响应 mqttClient.setCallback(new MqttCallback() {                 @Override     public void messageArrived(String topic, MqttMessage message) throws Exception {            String content = new String(message.getPayload(), "utf-8");         System.out.println("收到mqtt消息,topic: " + topic + " ,content: " + content);         // 设备响应命令         commandRsp();     }                 @Override     public void deliveryComplete(IMqttDeliveryToken arg0) {         System.out.println("mqtt 发送完成!");     }     @Override     public void connectionLost(Throwable arg0) {         System.out.println("mqtt 失去了连接!");     } }); // 连接(MQTT CONNECT连接鉴权) mqttConnection(); // 发布(设备上报数据) publish(); // 订阅(设备接收命令) subscribe(); public static MqttClient mqttConnection() {     if (mqttClient != null) {         try {             MqttConnectOptions options = new MqttConnectOptions();             options.setCleanSession(true);             options.setKeepAliveInterval(20);             options.setConnectionTimeout(100);             options.setUserName(deviceID);             options.setPassword(password.toCharArray());             //证书配置,mqtt.jks是平台提供的证书             String certFile = "../ca.jks";             String certPWD = "IoT@2019";             InputStream stream = new FileInputStream(certFile);             SSLContext sslContext = SSLContext.getInstance("TLS");             KeyStore ks = KeyStore.getInstance("JKS");             ks.load(stream, certPWD.toCharArray());             TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());             tmf.init(ks);             TrustManager[] tm = tmf.getTrustManagers();             sslContext.init(null, tm, new SecureRandom());             SocketFactory factory = sslContext.getSocketFactory();             options.setSocketFactory(factory);             mqttClient.connect(options);         } catch (Exception e) {             e.printStackTrace();         }     }         return mqttClient; }

数据上报就是往平台指定的topic上发布数据

Java

public static void publish() {     try {         String message = "" + "{ \n" + "\"msgType\":\"deviceReq\", \n" + "\"data\": [{ \n"+ "\"serviceId\":\"Storage\", \n" + "\"serviceData\":{\n" + "\"storage\": 22\n" + "}\n" + "}] \n"+ "}";         MqttMessage mqttMessage = new MqttMessage();         mqttMessage.setQos(1);         mqttMessage.setPayload(message.getBytes());         String pubTopic = "/huawei/v1/devices/" + deviceID + "/data/json";         mqttClient.publish(pubTopic, mqttMessage, null, null);     } catch (Exception e) {         e.printStackTrace();     } }

命令接收就是订阅平台指定的topic,平台往该topic发送命令时,设备端就能收到

Java

//先订阅平台的topic public static void subscribe() {     String subtopic = "/huawei/v1/devices/" + deviceID + "/command/json";     mqttClient.subscribe(subtopic, 1); } // 在回调函数里面重写messageArrived方法,打印收到的消息 @Override public void messageArrived(String topic, MqttMessage message) throws Exception {     String content = new String(message.getPayload(), "utf-8");     System.out.println("content:" + content);     // 设备响应命令     commandRsp();     } }

应用服务器要需要调用“订阅平台业务数据”API订阅“commandRsp”类型的通知后,才能接收到设备对控制命令的应答;

先订阅topic(/huawei/v1/devices/{deviceId}/command/{codecMode})接收到命令,然后往另外一个topic(/huawei/v1/devices/{deviceId}/data/{codecMode})发数据响应给平台,就视为对这条命令的响应,但是要注意,数据上报和命令响应的topic虽然是相同的,但是他们上报的结构体是有区别的

Java

public static void commandRsp() {     try {         String message = "\n" + "{ \n" + "\"msgType\":\"deviceRsp\", \n" + "\"mid\":1,\n\"errcode\":0, \n"+ "\"body\":{\n" + "\"result\": 0\n" + "}\n" + "}";         MqttMessage mqttMessage = new MqttMessage();         mqttMessage.setQos(1);         mqttMessage.setPayload(message.getBytes());         System.out.println("message" + message);         String RspTopic = "/huawei/v1/devices/" + deviceID + "/data/json";         mqttClient.publish(RspTopic, mqttMessage, null, null);     } catch (Exception e) {         e.printStackTrace();     } }

获取当前时间

public static String curTimeStamp() {     String TIMESTAMP_FORMAT = "yyyyMMddHH";     SimpleDateFormat sdf = new SimpleDateFormat(TIMESTAMP_FORMAT);     String curTimeStamp = sdf.format(new Date(System.currentTimeMillis()));     return curTimeStamp; }

生成 password

public static String makePwd(String secret, String curTimeStamp) {     String passWord = null;     try {         Mac sha256_HMAC = Mac.getInstance("HmacSHA256");         SecretKeySpec secret_key = new SecretKeySpec(curTimeStamp.getBytes(), "HmacSHA256");         sha256_HMAC.init(secret_key);         byte[] bytes = sha256_HMAC.doFinal(secret.getBytes());         passWord = byteArrayToHexString(bytes);     } catch (Exception e) {         System.out.println("Error HmacSHA256 ===========" + e.getMessage());     }     return passWord; } public static String byteArrayToHexString(byte[] b) {     StringBuilder hs = new StringBuilder();     String stmp;     for (int n = 0; b != null && n < b.length; n++) {         stmp = Integer.toHexString(b[n] & 0XFF);         if (stmp.length() == 1)             hs.append('0');         hs.append(stmp);     }        return hs.toString().toLowerCase(); }

IoT MQTT

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:CVE-2017-5644 Apache POI Dos漏洞原理详解
下一篇:在Scala中构建Web API的4大框架
相关文章