Python使用MQTT连接新版ONENet

2024-05-28 07:28

本文主要是介绍Python使用MQTT连接新版ONENet,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Python MQTT 连接新版ONENet

简介

前几个教程我们使用mqtt.fx连接了新版的ONENet, 只是跑通了MQTT协议,但是在实际操作下还需要实现具体环境、具体设备的MQTT连接,本章教程将以Python MQTT的方式连接 ONENet

参考文档:

paho-mqtt · PyPI

OneNET - 中国移动物联网开放平台 (10086.cn)

准备环境

pip安装 paho-mqtt

pip install paho-mqtt

获取ONENet 三元组

准备好Onenet的三元组
在这里插入图片描述

三元组分别为

DeviceName=“wenshidap” #设备ID

Productid = “kuerSLKlo8” #产品ID

accesskey=“QUR4cEhqcVRCSHowQ01QdXE3QTVIUmRDblpoOHU1aFk=”#秘钥

根据ONENet手册的文档说明,mqtt连接onenet需要进行鉴权,可以访问链接查看鉴权算法,但是我们可以根据鉴权算法说明生成鉴权的秘钥

使用Pyhon生成的鉴权秘钥的函数为

#认证token生成函数
def get_token(id,access_key):version = '2018-10-31'#   res = 'products/%s' % id  # 通过产品ID访问产品API# res = 'userid/%s' % id  # 通过产品ID访问产品APIres="products/"+ Productid + "/devices/" + DeviceName# 用户自定义token过期时间et = str(int(time.time()) + 36000000)# et = str(int(1722499200))# 签名方法,支持md5、sha1、sha256method = 'sha1'method1 = 'sha256'# 对access_key进行decodekey = base64.b64decode(access_key)# 计算signorg = et + '\n' + method+ '\n' + res + '\n' + versionsign_b = hmac.new(key=key, msg=org.encode(), digestmod=method)sign = base64.b64encode(sign_b.digest()).decode()# value 部分进行url编码,method/res/version值较为简单无需编码sign = quote(sign, safe='')res = quote(res, safe='')# token参数拼接token = 'version=%s&res=%s&et=%s&method=%s&sign=%s' % (version, res, et, method, sign)return token

MQTT订阅和发布Topic说明

我们可以点击产品开发->设备开发->topic管理->数据流topic来查看当前设备可以订阅哪些topic,在这里我们是上传数据流,所以我们只关心发布的topic 和订阅上传成功和上传失败的topic

在这里插入图片描述

发布的topic :$sys/kuerSLKlo8/{device-name}/dp/post/json

只需要将我们的device-name换成当前的devicename即可 在本项目就是wenshidap

即:$sys/kuerSLKlo8/wenshidap/dp/post/json

订阅的topic:

上传成功:$sys/kuerSLKlo8/{device-name}/dp/post/json/accepted

上传失败:$sys/kuerSLKlo8/{device-name}/dp/post/json/rejected

当我们订阅上传成功topic时,数据流上传成功后就会返回msg的id 失败时 reject topic就会返回失败的原因

MQTT连接ONENet主程序

import paho.mqtt.client as mqtt
from paho.mqtt.client import MQTTv311
import struct
import json
import base64
import hmac
import time
from urllib.parse import quoteServerUrl = "mqtts.heclouds.com" #服务器url
ServerPort = 1883#服务器端口
DeviceName="wenshidap" #设备ID
Productid = "kuerSLKlo8" #产品ID
accesskey="QUR4cEhqcVRCSHowQ01QdXE3QTVIUmRDblpoOHU1aFk="# 发布的topic
Pub_topic1 = "$sys/"+Productid+"/"+ DeviceName+"/dp/post/json"#需要订阅的topic
#数据上传成功的消息
Sub_topic1 = "$sys/"+Productid+"/"+DeviceName+"/dp/post/json/accepted"
#接收数据上传失败的消息
Sub_topic2 = "$sys/"+Productid+"/"+DeviceName+"/dp/post/json/rejected"#测试用json数据格式
jsonstr = "{\"id\": 123,\"dp\": {\"ConEnv_Temp\": [{\"v\": 22.1}],\"ConEnv_Humi\": [{\"v\": 61.2}]}}"#认证token生成函数
def get_token(id,access_key):version = '2018-10-31'#   res = 'products/%s' % id  # 通过产品ID访问产品API# res = 'userid/%s' % id  # 通过产品ID访问产品APIres="products/"+ Productid + "/devices/" + DeviceName# 用户自定义token过期时间et = str(int(time.time()) + 36000000)# et = str(int(1722499200))# 签名方法,支持md5、sha1、sha256method = 'sha1'method1 = 'sha256'# 对access_key进行decodekey = base64.b64decode(access_key)# 计算signorg = et + '\n' + method+ '\n' + res + '\n' + versionsign_b = hmac.new(key=key, msg=org.encode(), digestmod=method)sign = base64.b64encode(sign_b.digest()).decode()# value 部分进行url编码,method/res/version值较为简单无需编码sign = quote(sign, safe='')res = quote(res, safe='')# token参数拼接token = 'version=%s&res=%s&et=%s&method=%s&sign=%s' % (version, res, et, method, sign)return tokendef on_subscribe(client, userdata, mid, reason_code_list, properties):# Since we subscribed only for a single channel, reason_code_list contains# a single entryif reason_code_list[0].is_failure:print(f"Broker rejected you subscription: {reason_code_list[0]}")else:print(f"Broker granted the following QoS: {reason_code_list[0].value}")def on_unsubscribe(client, userdata, mid, reason_code_list, properties):# Be careful, the reason_code_list is only present in MQTTv5.# In MQTTv3 it will always be emptyif len(reason_code_list) == 0 or not reason_code_list[0].is_failure:print("unsubscribe succeeded (if SUBACK is received in MQTTv3 it success)")else:print(f"Broker replied with failure: {reason_code_list[0]}")client.disconnect()# 当客户端收到来自服务器的CONNACK响应时的回调。也就是申请连接,服务器返回结果是否成功等
def on_connect(client, userdata, flags, reason_code, properties):if reason_code.is_failure:print(f"Failed to connect: {reason_code}. loop_forever() will retry connection")else:# we should always subscribe from on_connect callback to be sure# our subscribed is persisted across reconnections.# client.subscribe("$SYS/#")print("连接结果:" + mqtt.connack_string(reason_code))#连接成功后就订阅topicclient.subscribe(Sub_topic1)client.subscribe(Sub_topic2)# 从服务器接收发布消息时的回调。
def on_message(client, userdata, message):print(str(message.payload,'utf-8'))#当消息已经被发送给中间人,on_publish()回调将会被触发
def on_publish(client, userdata, mid):print(str(mid))def main():passw=get_token(DeviceName,accesskey)print(passw)mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2,DeviceName)mqttc.on_connect = on_connectmqttc.on_message = on_messagemqttc.on_subscribe = on_subscribemqttc.on_unsubscribe = on_unsubscribe# client = mqtt.Client(DeviceName,protocol=MQTTv311)#client.tls_set(certfile='/Users/mryu/PycharmProjects/MyProject/onenet/MQTTS-certificate.pem') #鉴权证书mqttc.connect(ServerUrl, port=ServerPort, keepalive=120)mqttc.username_pw_set(Productid,passw)mqttc.loop_start()while(1):mqttc.publish(Pub_topic1,jsonstr,qos=0)print("okk")time.sleep(2)if __name__ == '__main__':main()

运行测试一下

在这里插入图片描述

可以看到我们订阅的topic 返回了我们消息的ID 123 说明我们的数据上传成功 ,平台上也可以看到我们的数据流

在这里插入图片描述

这篇关于Python使用MQTT连接新版ONENet的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1009911

相关文章

使用Navicat工具比对两个数据库所有表结构的差异案例详解

《使用Navicat工具比对两个数据库所有表结构的差异案例详解》:本文主要介绍如何使用Navicat工具对比两个数据库test_old和test_new,并生成相应的DDLSQL语句,以便将te... 目录概要案例一、如图两个数据库test_old和test_new进行比较:二、开始比较总结概要公司存在多

CSS3中使用flex和grid实现等高元素布局的示例代码

《CSS3中使用flex和grid实现等高元素布局的示例代码》:本文主要介绍了使用CSS3中的Flexbox和Grid布局实现等高元素布局的方法,通过简单的两列实现、每行放置3列以及全部代码的展示,展示了这两种布局方式的实现细节和效果,详细内容请阅读本文,希望能对你有所帮助... 过往的实现方法是使用浮动加

如何使用Spring boot的@Transactional进行事务管理

《如何使用Springboot的@Transactional进行事务管理》这篇文章介绍了SpringBoot中使用@Transactional注解进行声明式事务管理的详细信息,包括基本用法、核心配置... 目录一、前置条件二、基本用法1. 在方法上添加注解2. 在类上添加注解三、核心配置参数1. 传播行为(

在Java中使用ModelMapper简化Shapefile属性转JavaBean实战过程

《在Java中使用ModelMapper简化Shapefile属性转JavaBean实战过程》本文介绍了在Java中使用ModelMapper库简化Shapefile属性转JavaBean的过程,对比... 目录前言一、原始的处理办法1、使用Set方法来转换2、使用构造方法转换二、基于ModelMapper

c++中std::placeholders的使用方法

《c++中std::placeholders的使用方法》std::placeholders是C++标准库中的一个工具,用于在函数对象绑定时创建占位符,本文就来详细的介绍一下,具有一定的参考价值,感兴... 目录1. 基本概念2. 使用场景3. 示例示例 1:部分参数绑定示例 2:参数重排序4. 注意事项5.

使用C++将处理后的信号保存为PNG和TIFF格式

《使用C++将处理后的信号保存为PNG和TIFF格式》在信号处理领域,我们常常需要将处理结果以图像的形式保存下来,方便后续分析和展示,C++提供了多种库来处理图像数据,本文将介绍如何使用stb_ima... 目录1. PNG格式保存使用stb_imagephp_write库1.1 安装和包含库1.2 代码解

java如何通过Kerberos认证方式连接hive

《java如何通过Kerberos认证方式连接hive》该文主要介绍了如何在数据源管理功能中适配不同数据源(如MySQL、PostgreSQL和Hive),特别是如何在SpringBoot3框架下通过... 目录Java实现Kerberos认证主要方法依赖示例续期连接hive遇到的问题分析解决方式扩展思考总

一文教你使用Python实现本地分页

《一文教你使用Python实现本地分页》这篇文章主要为大家详细介绍了Python如何实现本地分页的算法,主要针对二级数据结构,文中的示例代码简洁易懂,有需要的小伙伴可以了解下... 在项目开发的过程中,遇到分页的第一页就展示大量的数据,导致前端列表加载展示的速度慢,所以需要在本地加入分页处理,把所有数据先放

树莓派启动python的实现方法

《树莓派启动python的实现方法》本文主要介绍了树莓派启动python的实现方法,文中通过图文介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧... 目录一、RASPBerry系统设置二、使用sandroidsh连接上开发板Raspberry Pi三、运

Python给Excel写入数据的四种方法小结

《Python给Excel写入数据的四种方法小结》本文主要介绍了Python给Excel写入数据的四种方法小结,包含openpyxl库、xlsxwriter库、pandas库和win32com库,具有... 目录1. 使用 openpyxl 库2. 使用 xlsxwriter 库3. 使用 pandas 库