物联网数据库 IoTDB —— 从协议到数据

2024-03-13 01:30

本文主要是介绍物联网数据库 IoTDB —— 从协议到数据,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在这个系列之前的文章里,我们介绍了IoTDB的LSM,以及Iot中的最佳实践,这次我们看看如何将mqtt和IoTDB整合起来。下面我们开始:

IoTDB in docker

首先,做一个测试环境,我现在越发喜欢docker 和 WSL 了,除了吃点硬盘,内存和CPU资源以外,没有什么缺点了......

run in docker

直接把该开的端口都打开,只是测试环境,我就没再挂目录。

docker run -d -p 6667:6667 -p 31999:31999 -p 8181:8181 -p 5555:5555 -p 1883:1883 apache/iotdb

等待一会,执行 docker ps 查看是否成功了

➜ ~ docker ps
CONTAINER ID   IMAGE         COMMAND                 CREATED       STATUS       PORTS                                                                                                                                                                                                                       NAMES
ad9b18f8bff3   apache/iotdb   "/iotdb/sbin/start-s…"   2 hours ago   Up 2 hours    0.0.0.0:1883->1883/tcp, :::1883->1883/tcp, 0.0.0.0:5555->5555/tcp, :::5555->5555/tcp, 0.0.0.0:6667->6667/tcp, :::6667->6667/tcp, 0.0.0.0:8181->8181/tcp, :::

初步的IoTDB in docker 环境,我们就搞好了。接下来,开启mqtt服务。

开启 Mqtt 服务

进入IoTDB的docker docker exec -it ad9b18f8bff3 /bin/bash

编辑配置文件vi iotdb/conf/iotdb-engine.properties

开启服务,根据自己的需要,配置ip和端口等。

####################
### MQTT Broker Configuration
##################### whether to enable the mqtt service.
enable_mqtt_service=false   # 修改成 true , 代表开启 mqtt服务# the mqtt service binding host.
mqtt_host=0.0.0.0 # ip# the mqtt service binding port.
mqtt_port=1883  # 端口# the handler pool size for handing the mqtt messages.
mqtt_handler_pool_size=1# the mqtt message payload formatter.
mqtt_payload_formatter=json  # 数据格式# max length of mqtt message in byte
mqtt_max_message_size=1048576

重启服务,如果不会,就重启docker镜像。

IoTDB 基础操作

  • 启动服务: sbin/start-client.sh

    root@ad9b18f8bff3:/iotdb/sbin# ./start-cli.sh
    ---------------------
    Starting IoTDB Cli
    ---------------------
    _____       _________ ______   ______
    |_   _|     | _   _ ||_   _ `.|_   _ \
    | |   .--.|_/ | | \_| | | `. \ | |_) |
    | | / .'`\ \ | |     | | | | | __'.
    _| |_| \__. | _| |_   _| |_.' /_| |__) |
    |_____|'.__.' |_____| |______.'|_______/ version 0.11.1IoTDB> login successfully
  • 退出CLI: quitexit

  • 停止服务:$sbin/stop-server.sh

  • 设置一个存储组到IoTDB,名为root  :  IoTDB> SET STORAGE GROUP TO root

  • 查看当前IoTDB的存储组 : IoTDB> SHOW STORAGE GROUP

    IoTDB> SHOW STORAGE GROUP
    +-------------+
    |storage group|
    +-------------+
    |   root.test|
    +-------------+
    Total line number = 1
    It costs 0.127s
  • 查看系统中存在的所有时间序列 :IoTDB> SHOW TIMESERIES

    IoTDB> show timeseries
    +-------------------------------+-----+-------------+--------+--------+-----------+----+----------+
    |                     timeseries|alias|storage group|dataType|encoding|compression|tags|attributes|
    +-------------------------------+-----+-------------+--------+--------+-----------+----+----------+
    |root.test.wf01.wt01.temperature| null|   root.test|   FLOAT| GORILLA|     SNAPPY|null|     null|
    |     root.test.wf01.wt01.status| null|   root.test| BOOLEAN|     RLE|     SNAPPY|null|     null|
    |   root.test.wf01.wt01.hardware| null|   root.test|   TEXT|   PLAIN|     SNAPPY|null|     null|
    +-------------------------------+-----+-------------+--------+--------+-----------+----+----------+
    Total line number = 3
    It costs 0.009s
  • 查看系统中存在的特定时间序列: SHOW TIMESERIES root.test.wf01.wt01.status

    IoTDB> SHOW TIMESERIES root.test.wf01.wt01.status
    +--------------------------+-----+-------------+--------+--------+-----------+----+----------+
    |               timeseries|alias|storage group|dataType|encoding|compression|tags|attributes|
    +--------------------------+-----+-------------+--------+--------+-----------+----+----------+
    |root.test.wf01.wt01.status| null|   root.test| BOOLEAN|     RLE|     SNAPPY|null|     null|
    +--------------------------+-----+-------------+--------+--------+-----------+----+----------+
    Total line number = 1
    It costs 0.003s
  • 插入数据 INSERT INTO root.test.wf01.wt01(timestamp,status,temperature) values(200,false,20.71)

    IoTDB> INSERT INTO root.test.wf01.wt01(timestamp,status,temperature) values(200,false,20.71)
    Msg: The statement is executed successfully.
  • 查看数据: select * from root.test;

    IoTDB> select * from root.test;
    +------------------------+-------------------------------+--------------------------+----------------------------+
    |                   Time|root.test.wf01.wt01.temperature|root.test.wf01.wt01.status|root.test.wf01.wt01.hardware|
    +------------------------+-------------------------------+--------------------------+----------------------------+
    |2021-01-20T02:00:00.000Z|                           21.2|                     true|                       hello|
    +------------------------+-------------------------------+--------------------------+----------------------------+
    Total line number = 1
    It costs 0.077s
  • 查看设备:show devices

    IoTDB> show devices
    +-------------------+
    |           devices|
    +-------------------+
    |root.test.wf01.wt01|
    +-------------------+
    Total line number = 1
    It costs 0.002s

mqtt to IoTDB

代码

构建一个实体对象,用于存储

package wang.datahub.iotdb;import com.google.gson.Gson;
import java.util.List;public class IotdbVO {private String device;private long timestamp = System.currentTimeMillis();private List<String> measurements;private List<Object> values;public String getDevice() {return device;}public void setDevice(String device) {this.device = device;}public long getTimestamp() {return timestamp;}public void setTimestamp(long timestamp) {this.timestamp = timestamp;}public List<String> getMeasurements() {return measurements;}public void setMeasurements(List<String> measurements) {this.measurements = measurements;}public List<Object> getValues() {return values;}public void setValues(List<Object> values) {this.values = values;}public String toJson(){Gson g = new Gson();String jsonData = g.toJson(this);return jsonData;}@Overridepublic String toString() {return "IotdbVO{" +"device='" + device + '\'' +", timestamp=" + timestamp +", measurements=" + measurements +", values=" + values +'}';}
}

使用祖传的代码来模拟数据发射到IoTDB,这里直接将mqtt的主机和端口,配置到前文所修改的IoTDB的mqtt服务上,就大功告成了。

package wang.datahub.iotdb;import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.QoS;import java.util.ArrayList;
import java.util.List;
import java.util.Random;public class EmmitToIotdb {public static void main(String[] args) {String[] hardwares = new String[]{"a1","b1","b2","c3","d1","f5"};int count = 1000;for(int i = 0; i < count ;i++){IotdbVO iotdbVO = new IotdbVO();iotdbVO.setDevice("root.test.wf01.wt01");List<String> measurements = new ArrayList<>();List<Object> values = new ArrayList<>();measurements.add("temperature");measurements.add("status");measurements.add("hardware");Random r = new Random();values.add(r.nextInt(40));values.add(r.nextBoolean());values.add(hardwares[r.nextInt(hardwares.length)]);iotdbVO.setMeasurements(measurements);iotdbVO.setValues(values);emmitToIotdb(iotdbVO);}}public static void emmitToIotdb(IotdbVO content){try {MQTT mqtt = new MQTT();mqtt.setHost("127.0.0.1", 1883);mqtt.setUserName("root");mqtt.setPassword("root");BlockingConnection connection = mqtt.blockingConnection();connection.connect();String payload = content.toJson();connection.publish(content.getDevice(),payload.getBytes(), QoS.AT_LEAST_ONCE,false);connection.disconnect();} catch (Exception e){e.printStackTrace();}}}

执行结果

这篇关于物联网数据库 IoTDB —— 从协议到数据的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/803234

相关文章

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

关于数据埋点,你需要了解这些基本知识

产品汪每天都在和数据打交道,你知道数据来自哪里吗? 移动app端内的用户行为数据大多来自埋点,了解一些埋点知识,能和数据分析师、技术侃大山,参与到前期的数据采集,更重要是让最终的埋点数据能为我所用,否则可怜巴巴等上几个月是常有的事。   埋点类型 根据埋点方式,可以区分为: 手动埋点半自动埋点全自动埋点 秉承“任何事物都有两面性”的道理:自动程度高的,能解决通用统计,便于统一化管理,但个性化定

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

异构存储(冷热数据分离)

异构存储主要解决不同的数据,存储在不同类型的硬盘中,达到最佳性能的问题。 异构存储Shell操作 (1)查看当前有哪些存储策略可以用 [lytfly@hadoop102 hadoop-3.1.4]$ hdfs storagepolicies -listPolicies (2)为指定路径(数据存储目录)设置指定的存储策略 hdfs storagepolicies -setStoragePo

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

MySQL数据库宕机,启动不起来,教你一招搞定!

作者介绍:老苏,10余年DBA工作运维经验,擅长Oracle、MySQL、PG、Mongodb数据库运维(如安装迁移,性能优化、故障应急处理等)公众号:老苏畅谈运维欢迎关注本人公众号,更多精彩与您分享。 MySQL数据库宕机,数据页损坏问题,启动不起来,该如何排查和解决,本文将为你说明具体的排查过程。 查看MySQL error日志 查看 MySQL error日志,排查哪个表(表空间

【Prometheus】PromQL向量匹配实现不同标签的向量数据进行运算

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。 🏆《博客》:Python全栈,前后端开发,小程序开发,人工智能,js逆向,App逆向,网络系统安全,数据分析,Django,fastapi

烟火目标检测数据集 7800张 烟火检测 带标注 voc yolo

一个包含7800张带标注图像的数据集,专门用于烟火目标检测,是一个非常有价值的资源,尤其对于那些致力于公共安全、事件管理和烟花表演监控等领域的人士而言。下面是对此数据集的一个详细介绍: 数据集名称:烟火目标检测数据集 数据集规模: 图片数量:7800张类别:主要包含烟火类目标,可能还包括其他相关类别,如烟火发射装置、背景等。格式:图像文件通常为JPEG或PNG格式;标注文件可能为X