Python使用MQTT连接新版ONENet

2024-05-28 07:28

本文主要是介绍Python使用MQTT连接新版ONENet,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Python MQTT 连接新版ONENet

简介

前几个教程我们使用mqtt.fx连接了新版的ONENet, 只是跑通了MQTT协议,但是在实际操作下还需要实现具体环境、具体设备的MQTT连接,本章教程将以Python MQTT的方式连接 ONENet

参考文档:

paho-mqtt · PyPI

OneNET - 中国移动物联网开放平台 (10086.cn)

准备环境

pip安装 paho-mqtt

pip install paho-mqtt

获取ONENet 三元组

准备好Onenet的三元组
在这里插入图片描述

三元组分别为

DeviceName=“wenshidap” #设备ID

Productid = “kuerSLKlo8” #产品ID

accesskey=“QUR4cEhqcVRCSHowQ01QdXE3QTVIUmRDblpoOHU1aFk=”#秘钥

根据ONENet手册的文档说明,mqtt连接onenet需要进行鉴权,可以访问链接查看鉴权算法,但是我们可以根据鉴权算法说明生成鉴权的秘钥

使用Pyhon生成的鉴权秘钥的函数为

#认证token生成函数
def get_token(id,access_key):version = '2018-10-31'#   res = 'products/%s' % id  # 通过产品ID访问产品API# res = 'userid/%s' % id  # 通过产品ID访问产品APIres="products/"+ Productid + "/devices/" + DeviceName# 用户自定义token过期时间et = str(int(time.time()) + 36000000)# et = str(int(1722499200))# 签名方法,支持md5、sha1、sha256method = 'sha1'method1 = 'sha256'# 对access_key进行decodekey = base64.b64decode(access_key)# 计算signorg = et + '\n' + method+ '\n' + res + '\n' + versionsign_b = hmac.new(key=key, msg=org.encode(), digestmod=method)sign = base64.b64encode(sign_b.digest()).decode()# value 部分进行url编码,method/res/version值较为简单无需编码sign = quote(sign, safe='')res = quote(res, safe='')# token参数拼接token = 'version=%s&res=%s&et=%s&method=%s&sign=%s' % (version, res, et, method, sign)return token

MQTT订阅和发布Topic说明

我们可以点击产品开发->设备开发->topic管理->数据流topic来查看当前设备可以订阅哪些topic,在这里我们是上传数据流,所以我们只关心发布的topic 和订阅上传成功和上传失败的topic

在这里插入图片描述

发布的topic :$sys/kuerSLKlo8/{device-name}/dp/post/json

只需要将我们的device-name换成当前的devicename即可 在本项目就是wenshidap

即:$sys/kuerSLKlo8/wenshidap/dp/post/json

订阅的topic:

上传成功:$sys/kuerSLKlo8/{device-name}/dp/post/json/accepted

上传失败:$sys/kuerSLKlo8/{device-name}/dp/post/json/rejected

当我们订阅上传成功topic时,数据流上传成功后就会返回msg的id 失败时 reject topic就会返回失败的原因

MQTT连接ONENet主程序

import paho.mqtt.client as mqtt
from paho.mqtt.client import MQTTv311
import struct
import json
import base64
import hmac
import time
from urllib.parse import quoteServerUrl = "mqtts.heclouds.com" #服务器url
ServerPort = 1883#服务器端口
DeviceName="wenshidap" #设备ID
Productid = "kuerSLKlo8" #产品ID
accesskey="QUR4cEhqcVRCSHowQ01QdXE3QTVIUmRDblpoOHU1aFk="# 发布的topic
Pub_topic1 = "$sys/"+Productid+"/"+ DeviceName+"/dp/post/json"#需要订阅的topic
#数据上传成功的消息
Sub_topic1 = "$sys/"+Productid+"/"+DeviceName+"/dp/post/json/accepted"
#接收数据上传失败的消息
Sub_topic2 = "$sys/"+Productid+"/"+DeviceName+"/dp/post/json/rejected"#测试用json数据格式
jsonstr = "{\"id\": 123,\"dp\": {\"ConEnv_Temp\": [{\"v\": 22.1}],\"ConEnv_Humi\": [{\"v\": 61.2}]}}"#认证token生成函数
def get_token(id,access_key):version = '2018-10-31'#   res = 'products/%s' % id  # 通过产品ID访问产品API# res = 'userid/%s' % id  # 通过产品ID访问产品APIres="products/"+ Productid + "/devices/" + DeviceName# 用户自定义token过期时间et = str(int(time.time()) + 36000000)# et = str(int(1722499200))# 签名方法,支持md5、sha1、sha256method = 'sha1'method1 = 'sha256'# 对access_key进行decodekey = base64.b64decode(access_key)# 计算signorg = et + '\n' + method+ '\n' + res + '\n' + versionsign_b = hmac.new(key=key, msg=org.encode(), digestmod=method)sign = base64.b64encode(sign_b.digest()).decode()# value 部分进行url编码,method/res/version值较为简单无需编码sign = quote(sign, safe='')res = quote(res, safe='')# token参数拼接token = 'version=%s&res=%s&et=%s&method=%s&sign=%s' % (version, res, et, method, sign)return tokendef on_subscribe(client, userdata, mid, reason_code_list, properties):# Since we subscribed only for a single channel, reason_code_list contains# a single entryif reason_code_list[0].is_failure:print(f"Broker rejected you subscription: {reason_code_list[0]}")else:print(f"Broker granted the following QoS: {reason_code_list[0].value}")def on_unsubscribe(client, userdata, mid, reason_code_list, properties):# Be careful, the reason_code_list is only present in MQTTv5.# In MQTTv3 it will always be emptyif len(reason_code_list) == 0 or not reason_code_list[0].is_failure:print("unsubscribe succeeded (if SUBACK is received in MQTTv3 it success)")else:print(f"Broker replied with failure: {reason_code_list[0]}")client.disconnect()# 当客户端收到来自服务器的CONNACK响应时的回调。也就是申请连接,服务器返回结果是否成功等
def on_connect(client, userdata, flags, reason_code, properties):if reason_code.is_failure:print(f"Failed to connect: {reason_code}. loop_forever() will retry connection")else:# we should always subscribe from on_connect callback to be sure# our subscribed is persisted across reconnections.# client.subscribe("$SYS/#")print("连接结果:" + mqtt.connack_string(reason_code))#连接成功后就订阅topicclient.subscribe(Sub_topic1)client.subscribe(Sub_topic2)# 从服务器接收发布消息时的回调。
def on_message(client, userdata, message):print(str(message.payload,'utf-8'))#当消息已经被发送给中间人,on_publish()回调将会被触发
def on_publish(client, userdata, mid):print(str(mid))def main():passw=get_token(DeviceName,accesskey)print(passw)mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2,DeviceName)mqttc.on_connect = on_connectmqttc.on_message = on_messagemqttc.on_subscribe = on_subscribemqttc.on_unsubscribe = on_unsubscribe# client = mqtt.Client(DeviceName,protocol=MQTTv311)#client.tls_set(certfile='/Users/mryu/PycharmProjects/MyProject/onenet/MQTTS-certificate.pem') #鉴权证书mqttc.connect(ServerUrl, port=ServerPort, keepalive=120)mqttc.username_pw_set(Productid,passw)mqttc.loop_start()while(1):mqttc.publish(Pub_topic1,jsonstr,qos=0)print("okk")time.sleep(2)if __name__ == '__main__':main()

运行测试一下

在这里插入图片描述

可以看到我们订阅的topic 返回了我们消息的ID 123 说明我们的数据上传成功 ,平台上也可以看到我们的数据流

在这里插入图片描述

这篇关于Python使用MQTT连接新版ONENet的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1009911

相关文章

W外链微信推广短连接怎么做?

制作微信推广链接的难点分析 一、内容创作难度 制作微信推广链接时,首先需要创作有吸引力的内容。这不仅要求内容本身有趣、有价值,还要能够激起人们的分享欲望。对于许多企业和个人来说,尤其是那些缺乏创意和写作能力的人来说,这是制作微信推广链接的一大难点。 二、精准定位难度 微信用户群体庞大,不同用户的需求和兴趣各异。因此,制作推广链接时需要精准定位目标受众,以便更有效地吸引他们点击并分享链接

中文分词jieba库的使用与实景应用(一)

知识星球:https://articles.zsxq.com/id_fxvgc803qmr2.html 目录 一.定义: 精确模式(默认模式): 全模式: 搜索引擎模式: paddle 模式(基于深度学习的分词模式): 二 自定义词典 三.文本解析   调整词出现的频率 四. 关键词提取 A. 基于TF-IDF算法的关键词提取 B. 基于TextRank算法的关键词提取

python: 多模块(.py)中全局变量的导入

文章目录 global关键字可变类型和不可变类型数据的内存地址单模块(单个py文件)的全局变量示例总结 多模块(多个py文件)的全局变量from x import x导入全局变量示例 import x导入全局变量示例 总结 global关键字 global 的作用范围是模块(.py)级别: 当你在一个模块(文件)中使用 global 声明变量时,这个变量只在该模块的全局命名空

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

Hadoop数据压缩使用介绍

一、压缩原则 (1)运算密集型的Job,少用压缩 (2)IO密集型的Job,多用压缩 二、压缩算法比较 三、压缩位置选择 四、压缩参数配置 1)为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器 2)要在Hadoop中启用压缩,可以配置如下参数

Makefile简明使用教程

文章目录 规则makefile文件的基本语法:加在命令前的特殊符号:.PHONY伪目标: Makefilev1 直观写法v2 加上中间过程v3 伪目标v4 变量 make 选项-f-n-C Make 是一种流行的构建工具,常用于将源代码转换成可执行文件或者其他形式的输出文件(如库文件、文档等)。Make 可以自动化地执行编译、链接等一系列操作。 规则 makefile文件

使用opencv优化图片(画面变清晰)

文章目录 需求影响照片清晰度的因素 实现降噪测试代码 锐化空间锐化Unsharp Masking频率域锐化对比测试 对比度增强常用算法对比测试 需求 对图像进行优化,使其看起来更清晰,同时保持尺寸不变,通常涉及到图像处理技术如锐化、降噪、对比度增强等 影响照片清晰度的因素 影响照片清晰度的因素有很多,主要可以从以下几个方面来分析 1. 拍摄设备 相机传感器:相机传

【Python编程】Linux创建虚拟环境并配置与notebook相连接

1.创建 使用 venv 创建虚拟环境。例如,在当前目录下创建一个名为 myenv 的虚拟环境: python3 -m venv myenv 2.激活 激活虚拟环境使其成为当前终端会话的活动环境。运行: source myenv/bin/activate 3.与notebook连接 在虚拟环境中,使用 pip 安装 Jupyter 和 ipykernel: pip instal

pdfmake生成pdf的使用

实际项目中有时会有根据填写的表单数据或者其他格式的数据,将数据自动填充到pdf文件中根据固定模板生成pdf文件的需求 文章目录 利用pdfmake生成pdf文件1.下载安装pdfmake第三方包2.封装生成pdf文件的共用配置3.生成pdf文件的文件模板内容4.调用方法生成pdf 利用pdfmake生成pdf文件 1.下载安装pdfmake第三方包 npm i pdfma

零基础学习Redis(10) -- zset类型命令使用

zset是有序集合,内部除了存储元素外,还会存储一个score,存储在zset中的元素会按照score的大小升序排列,不同元素的score可以重复,score相同的元素会按照元素的字典序排列。 1. zset常用命令 1.1 zadd  zadd key [NX | XX] [GT | LT]   [CH] [INCR] score member [score member ...]