树莓派学习笔记(五)——烟雾浓度检测(ONENET云平台收发数据代码)

本文主要是介绍树莓派学习笔记(五)——烟雾浓度检测(ONENET云平台收发数据代码),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 前言
  • 一、硬件连接
      • MQ-2
      • PCF8591
  • 二、Onenet平台数据收发程序
      • onenetsub.py
      • onenetget.py
  • 三、程序
      • 树莓派开启iic功能
      • 完整程序
      • OneNet界面展示

前言

用树莓派4b做一个烟雾浓度检测仪,烟雾浓度传感器模块MQ-2收集烟雾浓度数据,把数据上传到onenet平台用网页显示。

所需材料:树莓派MQ-2烟雾浓度传感器 ;PCF8591 ad转换模块;杜邦线;面包板;

[2021.6.11]

一、硬件连接

参考:

MQ-2 :pm2.5、mq-2 、mq2电压浓度转换

PCF8591:PCF8591、PCFPCF8591使用及Python控制

MQ-2、PCF8591和树莓派之间使用I2C总线通信方式,引脚连接如下:

PCF8591   树莓派
GND---GND
VCC---3.3V
SCL---SCL/GPIO3
SDA---SDA/GPIO2MQ-2   树莓派
GND---GND
VCC---5V   
D0----36引脚MQ-2   PCF8591
AO----AIN0

MQ-2

image-20210330205319733

MQ-2浓度及电压转换:

​ 烟雾浓度计算公式: ppm = 613.9f * pow(RS/R0, -2.074f)

ppm:为可燃气体的浓度。VRL:电压输出值。Rs:器件在不同气体,不同浓度下的电阻值。R0:器件在洁净空气中的电阻值。RL:负载电阻阻值。注:
pow() 方法返回 xy(x 的 y 次方) 的值。
import math
math.pow( x, y )2#浓度计算参数
CAL_PPM =20  	    # 校准环境中PPM值
RL = 5		        # RL阻值
#R0 = float(6.00)  	#元件在洁净空气中的阻值Vrl = 5 * ADC_Value / 255        #5V   ad 为8位RS = (5 - Vrl) / Vrl * RLR0 = RS / pow(CAL_PPM / 613.9, 1 / -2.074)ppm = 613.9 * pow(RS / R0, -2.074)

PCF8591

#!/usr/bin/env python3
# -*- Coding: utf-8 -*-###### pin assign
#PCF8591 --------- Raspberry pi3
# SDA ------------ GPIO2/SDA1
# SCL ------------ GPIO3/SCL1
# VCC ------------- 3.3V
# GND ------------- GND
#
######import wiringpi
import timeclass PCF8591:def __init__(self, addr):wiringpi.wiringPiSetup() #setup wiringpiself.i2c = wiringpi.I2C() #get I2Cself.dev = self.i2c.setup(addr) #setup I2C devicedef LED_ON(self):self.i2c.writeReg8(self.dev, 0x40, 0xFF)def LED_OFF(self):self.i2c.writeReg8(self.dev, 0x40, 0x00)def DAoutput(self,value):self.i2c.writeReg8(self.dev, 0x40, value)def analogRead0(self):self.i2c.writeReg8(self.dev, 0x48,0x40)self.i2c.readReg8(self.dev,0x48)	#read dummyreturn self.i2c.readReg8(self.dev,0x48)def analogRead1(self):self.i2c.writeReg8(self.dev, 0x48,0x41)self.i2c.readReg8(self.dev,0x48)	#read dummyreturn self.i2c.readReg8(self.dev,0x48)def analogRead2(self):self.i2c.writeReg8(self.dev, 0x48,0x42)self.i2c.readReg8(self.dev,0x48)	#read dummyreturn self.i2c.readReg8(self.dev,0x48)def analogRead3(self):self.i2c.writeReg8(self.dev, 0x48,0x43)self.i2c.readReg8(self.dev,0x48)	#read dummyreturn self.i2c.readReg8(self.dev,0x48)def analogRead(self,pin):self.i2c.writeReg8(self.dev, 0x48,0x40+pin)self.i2c.readReg8(self.dev,0x48)	#read dummyreturn self.i2c.readReg8(self.dev,0x48)if __name__ == "__main__":pcf8591 = PCF8591(0x48)while 1:print("QM-2:"+str(pcf8591.analogRead0())) #read the Variable register# print("VCC:"+str(pcf8591.analogRead1()))  # read the Variable register# print("END:"+str(pcf8591.analogRead3()))  # read the Variable register

二、Onenet平台数据收发程序

参考:onenet_get_put

onenet平台 设置方法见上期:树莓派学习笔记(四)——温湿度检测(本地OLED显示、ONENET云平台显示)

onenetsub.py

将树莓派读取到的传感器数据,通过http协议发送至 平台的对应设备

import urllib.request
import json
import time
from time import sleep
#设备IDdeviceId = "你的设备ID"
APIKey = "你的设备APIKEY"#上传函数
def http_put_data(data):url = "http://api.heclouds.com/devices/" + deviceId + '/datapoints'd = time.strftime('%Y-%m-%dT%H:%M:%S')values = {"datastreams": [ {"id": "CO2", "datapoints": [{"value": data}]},{"id": "PM25", "datapoints": [{"value": data}]},{"id": "PM10", "datapoints": [{"value": data}]},{"id": "VOC", "datapoints": [{"value": data}]} ]}jdata = json.dumps(values).encode("utf-8")request = urllib.request.Request(url, jdata)request.add_header('api-key', APIKey)request.get_method = lambda: 'POST'request = urllib.request.urlopen(request)return request.read()if __name__ == '__main__':R = http_put_data(10)print(R)

onenetget.py

读取平台对应设备的数据流,可以在另一台设备,如PC、树莓派读取onenet平台对应设备的数据流,不过由于平台限制有3s延时

import requests
import json#设备ID
deviceId = "你的设备ID"
APIKey = "你的设备APIKEY"# 基本设置
url = "http://api.heclouds.com/devices/"+deviceId+"/datastreams"
headers = {'api-key': APIKey}# 获得结果并打印
r = requests.get(url, headers=headers)
t: str = r.text#print(t)
params = json.loads(t)
#上面这个语句是将我们获得的内容转成数据字典,这样转是因为我们接收到的内容具有数据字典的形式
#转换成数据字典利于我们后面的操作
#print params['error'][]#print(type(params))
#如果执行上面这条语句我们可以看到返回的结果是dict,也就是我们已经成功转换x = params['data']
#这个语句是从数据转换后的数据字典中获取我们需要的数据,从结果上看params是一个list
#在data前面的都只是一些描述内容,参考教程:https://blog.csdn.net/zhiaicq_r/article/details/79278530print('环境参数'+'\t\t\t\t'+'更新时间'+'\t\t\t\t\t'+'数值')
#接下来是获取不同的数据流
for index,values in enumerate(x):#只需要更新时间,id和值,所以这里对获得的数据字典做一下更改#print(values)#这里得到的values也是一个数据字典#因为在onenet那边对这些数据没有给出来,而且也没有意义,所以我们就不在这里显示,因此现将其删除# del values['unit']# del values['unit_symbol']# del values['create_time']#print(values.items())#print(values['update_at'])#这里不知道为什么直接使用values.get('update_at','')和values.get('current_value','')#或者用values['update_at']和values['current_value']报:KeyError错误,而且if里面的那条语句会执行#所以我们通过get方法解决,其中要注意的是,如果没有给定第二个参数,那么默认输出NONEa= str(values.get('update_at',''))b= str(values.get('current_value',''))#因为如果有更新时间就会有相应最新的值,所以这里只用其中一个作为判断条件if (a != ""):if (values['id'] == 'm_temp'  ):    #CPU_temp statusprint(str(values['id']) + '\t\t\t' + a + '\t\t\t' + b)T = belif(values['id'] == 'm_hum'):print(str(values['id']) + '\t\t\t\t' + a + '\t\t\t'+ b)H = belif(values['id'] == 'CPU_temp'):print(str(values['id']) + '\t\t\t\t' + a + '\t\t\t' + b)CT = belse:if(values['id'] == 'm_temp' or values['id'] == 'm_hum'):print(values['id']+ '\t\t\t' +'目前还没有收到任何数据')print(T,H,CT)

三、程序

树莓派开启iic功能

远程登陆树莓派后,输入sudo raspi-config 后,选择5.Interfacing Options 选择P5 I2C 选择

完整程序

带ad转换的版本,求出具体烟雾浓度

#! /usr/bin/env python3
import RPi.GPIO as GPIO  # 导入库,并进行别名的设置
import time
import wiringpi
import psutil
import numpy as np
#onenet
import urllib
import urllib.request
import json
import datetime
import requestsCHANNEL = 36  # 确定引脚口。按照真实的位置确定
GPIO.setmode(GPIO.BOARD)  # 选择引脚系统,这里我们选择了BOARD
GPIO.setup(CHANNEL, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
# 初始化引脚,将36号引脚设置为输入下拉电阻,因为在初始化的时候不确定的的引电平,因此这样设置是用来保证精准,(但是也可以不写“pull_up_down=GPIO.PUD_DOWN”)###### pin assign
#PCF8591 --------- Raspberry pi3
# SDA ------------ GPIO2/SDA1
# SCL ------------ GPIO3/SCL1
# VCC ------------- 3.3V
# GND ------------- GNDclass PCF8591:def __init__(self, addr):wiringpi.wiringPiSetup()  # setup wiringpiself.i2c = wiringpi.I2C()  # get I2Cself.dev = self.i2c.setup(addr)  # setup I2C devicedef LED_ON(self):self.i2c.writeReg8(self.dev, 0x40, 0xFF)def LED_OFF(self):self.i2c.writeReg8(self.dev, 0x40, 0x00)def DAoutput(self, value):self.i2c.writeReg8(self.dev, 0x40, value)def analogRead0(self) -> object:self.i2c.writeReg8(self.dev, 0x48, 0x40)self.i2c.readReg8(self.dev, 0x48)  # read dummyreturn self.i2c.readReg8(self.dev, 0x48)def analogRead1(self):self.i2c.writeReg8(self.dev, 0x48, 0x41)self.i2c.readReg8(self.dev, 0x48)  # read dummyreturn self.i2c.readReg8(self.dev, 0x48)def analogRead2(self):self.i2c.writeReg8(self.dev, 0x48, 0x42)self.i2c.readReg8(self.dev, 0x48)  # read dummyreturn self.i2c.readReg8(self.dev, 0x48)def analogRead3(self):self.i2c.writeReg8(self.dev, 0x48, 0x43)self.i2c.readReg8(self.dev, 0x48)  # read dummyreturn self.i2c.readReg8(self.dev, 0x48)def analogRead(self, pin):self.i2c.writeReg8(self.dev, 0x48, 0x40 + pin)self.i2c.readReg8(self.dev, 0x48)  # read dummyreturn self.i2c.readReg8(self.dev, 0x48)# if __name__ == "__main__":
#     pcf8591 = PCF8591(0x48)
#     while 1:
#         print("ADC_Value:" + str(pcf8591.analogRead0()))def getADC_Value():pcf8591 = PCF8591(0x48)AD_V=[]time.sleep(0.3)for i in range(10):v = pcf8591.analogRead0()AD_V.append(v)time.sleep(0.1)ADC_Value = np.sum(AD_V)/10          #采样10次,求平均值# print("ADC_Value:" + str(ADC_Value))return ADC_Valuedef SmokeConcentration(ADC_Value):#烟雾浓度计算# 浓度计算参数CAL_PPM = 20  # 校准环境中PPM值RL = 5  # RL阻值R0 = 6.00  	#元件在洁净空气中的阻值Vrl = 5 * ADC_Value / 255        #5V   ad 为8位RS = (5 - Vrl) / Vrl * RLRunTime =  psutil.boot_time()# print("Vrl:"+str(Vrl))# print("RS:" + str(RS))if RunTime >=3:# R0 = RS / pow(CAL_PPM / 613.9, 1 / -2.074)# print("R0:" + str(R0))# print("RS/R0:" + str(RS/R0))ppm = 613.9 * pow(RS / R0, -2.074)return ppmdef getSmokeAlarm():  #烟雾报警,DO引脚控制status = GPIO.input(CHANNEL)  # 检测36号引脚口的输入高低电平状态    1为正常#print(status)  # 实时打印此时的电平状态return statusdef getPPM():AD_V=getADC_Value()concentration = SmokeConcentration(AD_V)PPM = concentrationPercentage = PPM * pow(10,-6)print("smoke:"+str(PPM))return PPM#上传到onenet
#onenet
api_key = '你的密钥'  # 密钥
device_ID = '你的id'  # 设备ID
headers = {'api-key': api_key}
url_post = "https://api.heclouds.com/devices/" + device_ID + "/datapoints"  # 数据点
url_get = "https://api.heclouds.com/devices/" + device_ID + "/datastreams"  # 数据流def http_post_0():data = {'datastreams': [{"id": "status", "datapoints": [{"value": ' 检测到危险气体 ! ! ! '}]}]}  # id是你的数据流名称jdata = json.dumps(data).encode("utf-8")r = requests.post(url=url_post, headers=headers, data=jdata)print("发送成功:", r.text)def http_post_1():data = {'datastreams': [{"id": "status", "datapoints": [{"value": ' 空气状况正常\t无有害气体 '}]}]}  # id是你的数据流名称jdata = json.dumps(data).encode("utf-8")r = requests.post(url=url_post, headers=headers, data=jdata)print("发送成功:", r.text)def ppm_onenet_put():# 保留两位小数Smoke_ppm = round(getPPM(), 2)Smoke_Alarm = getSmokeAlarm()CurTime = datetime.datetime.now()# 这url只需把数字部分换成你在onenet中创建的设备号就行url = 'http://api.heclouds.com/devices/602743016/datapoints'values = {'datastreams': [{"id": "Smoke_ppm", "datapoints": [{"time": CurTime.isoformat(), "value": Smoke_ppm}]},{"id": "Smoke_Alarm", "datapoints": [{"time": CurTime.isoformat(), "value": Smoke_Alarm}]}]}jdata = json.dumps(values).encode("utf-8")  # 对数据进行JSON格式化编码  ##然后# 打印json内容print(jdata)##python2:request = urllib.request.Request(url, jdata)request.add_header('api-key', api_key)request.get_method = lambda: 'POST'  # 设置HTTP的访问方式request = urllib.request.urlopen(request)return request.read().decode("utf-8")def mq2_local():time.sleep(3)Smoke_ppm = round(getPPM(), 2)Smoke_Alarm = getSmokeAlarm()if Smoke_Alarm == True:  # 如果为高电平,说明MQ-2正常,并打印“OK”print(' 正常 ')else:  # 如果为低电平,说明MQ-2检测到有害气体,并打印“dangerous”print(' 检测到危险气体 ! ! ! ')print(Smoke_Alarm)  # 实时打印此时的电平状态print("Smoke_ppm=%0.2f" % Smoke_ppm)def mq2_onenet_put():   #上传   Smoke_ppm    status   Smoke_Alarmtime.sleep(3)Smoke_Alarm = getSmokeAlarm()resp = ppm_onenet_put()print("OneNET_result: %s" % resp)if Smoke_Alarm == True:http_post_1()else:http_post_0()if __name__ == '__main__':while (1):mq2_local()mq2_onenet_put()time.sleep(1)GPIO.cleanup()  # 清理运行完成后的残余

不带ad转换的程序,没有PCF8591的时候用,输出 是否有 危险气体的开关量

#! /usr/bin/env python3
import RPi.GPIO as GPIO  # 导入库,并进行别名的设置
import time
#onenet
import requests
import json#mq2
CHANNEL = 36  # 确定引脚口。按照真实的位置确定
GPIO.setmode(GPIO.BOARD)  # 选择引脚系统,这里我们选择了BOARDGPIO.setup(CHANNEL, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
# 初始化引脚,将36号引脚设置为输入下拉电阻,因为在初始化的时候不确定的的引电平,因此这样设置是用来保证精准,(但是也可以不写“pull_up_down=GPIO.PUD_DOWN”)#onenet
api_key = '密钥'  # 密钥
device_ID = '设备ID'  # 设备ID
headers = {'api-key': api_key}
url_post = "https://api.heclouds.com/devices/" + device_ID + "/datapoints"  # 数据点
url_get = "https://api.heclouds.com/devices/" + device_ID + "/datastreams"  # 数据流def http_post_0():data = {'datastreams': [{"id": "status", "datapoints": [{"value": ' 检测到危险气体 ! ! ! '}]}]}  # id是你的数据流名称jdata = json.dumps(data).encode("utf-8")r = requests.post(url=url_post, headers=headers, data=jdata)#print("发送成功:", r.text)def http_post_1():data = {'datastreams': [{"id": "status", "datapoints": [{"value": ' 空气状况正常\t无有害气体 '}]}]}  # id是你的数据流名称jdata = json.dumps(data).encode("utf-8")r = requests.post(url=url_post, headers=headers, data=jdata)#print("发送成功:", r.text)# 带有异常处理的主程序
try:while True:  # 执行一个while死循环status = GPIO.input(CHANNEL)  # 检测36号引脚口的输入高低电平状态# print(status) # 实时打印此时的电平状态if status == True:  # 如果为高电平,说明MQ-2正常,并打印“OK”print(' 空气状况正常 ')http_post_1()else:  # 如果为低电平,说明MQ-2检测到有害气体,并打印“dangerous”print(' 检测到危险气体 ! ! ! ')http_post_0()time.sleep(0.1)  # 睡眠0.1秒,以后再执行while循环
except KeyboardInterrupt:  # 异常处理,当检测按下键盘的Ctrl+C,就会退出这个>脚本GPIO.cleanup()  # 清理运行完成后的残余

OneNet界面展示

image-20210628160801639

这篇关于树莓派学习笔记(五)——烟雾浓度检测(ONENET云平台收发数据代码)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/686471

相关文章

HarmonyOS学习(七)——UI(五)常用布局总结

自适应布局 1.1、线性布局(LinearLayout) 通过线性容器Row和Column实现线性布局。Column容器内的子组件按照垂直方向排列,Row组件中的子组件按照水平方向排列。 属性说明space通过space参数设置主轴上子组件的间距,达到各子组件在排列上的等间距效果alignItems设置子组件在交叉轴上的对齐方式,且在各类尺寸屏幕上表现一致,其中交叉轴为垂直时,取值为Vert

Ilya-AI分享的他在OpenAI学习到的15个提示工程技巧

Ilya(不是本人,claude AI)在社交媒体上分享了他在OpenAI学习到的15个Prompt撰写技巧。 以下是详细的内容: 提示精确化:在编写提示时,力求表达清晰准确。清楚地阐述任务需求和概念定义至关重要。例:不用"分析文本",而用"判断这段话的情感倾向:积极、消极还是中性"。 快速迭代:善于快速连续调整提示。熟练的提示工程师能够灵活地进行多轮优化。例:从"总结文章"到"用

流媒体平台/视频监控/安防视频汇聚EasyCVR播放暂停后视频画面黑屏是什么原因?

视频智能分析/视频监控/安防监控综合管理系统EasyCVR视频汇聚融合平台,是TSINGSEE青犀视频垂直深耕音视频流媒体技术、AI智能技术领域的杰出成果。该平台以其强大的视频处理、汇聚与融合能力,在构建全栈视频监控系统中展现出了独特的优势。视频监控管理系统EasyCVR平台内置了强大的视频解码、转码、压缩等技术,能够处理多种视频流格式,并以多种格式(RTMP、RTSP、HTTP-FLV、WebS

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

关于数据埋点,你需要了解这些基本知识

产品汪每天都在和数据打交道,你知道数据来自哪里吗? 移动app端内的用户行为数据大多来自埋点,了解一些埋点知识,能和数据分析师、技术侃大山,参与到前期的数据采集,更重要是让最终的埋点数据能为我所用,否则可怜巴巴等上几个月是常有的事。   埋点类型 根据埋点方式,可以区分为: 手动埋点半自动埋点全自动埋点 秉承“任何事物都有两面性”的道理:自动程度高的,能解决通用统计,便于统一化管理,但个性化定

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

异构存储(冷热数据分离)

异构存储主要解决不同的数据,存储在不同类型的硬盘中,达到最佳性能的问题。 异构存储Shell操作 (1)查看当前有哪些存储策略可以用 [lytfly@hadoop102 hadoop-3.1.4]$ hdfs storagepolicies -listPolicies (2)为指定路径(数据存储目录)设置指定的存储策略 hdfs storagepolicies -setStoragePo

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

【前端学习】AntV G6-08 深入图形与图形分组、自定义节点、节点动画(下)

【课程链接】 AntV G6:深入图形与图形分组、自定义节点、节点动画(下)_哔哩哔哩_bilibili 本章十吾老师讲解了一个复杂的自定义节点中,应该怎样去计算和绘制图形,如何给一个图形制作不间断的动画,以及在鼠标事件之后产生动画。(有点难,需要好好理解) <!DOCTYPE html><html><head><meta charset="UTF-8"><title>06