influxdb产生实时数据

2024-08-31 03:32
文章标签 数据 实时 influxdb 产生

本文主要是介绍influxdb产生实时数据,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

开发工具:idea 简历maven工程

<dependency><groupId>org.influxdb</groupId><artifactId>influxdb-java</artifactId><version>2.17</version>
</dependency>
<dependency><groupId>com.github.javafaker</groupId><artifactId>javafaker</artifactId><version>1.0.2</version>
</dependency>

 java工程钟创立两个java文件,目录结构如下

influxDBConnection文件中添加如下代码

package com.influx;import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.*;import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;public class InfluxDBConnection {// 用户名private String username;// 密码private String password;// 连接地址private String openurl;// 数据库private String database;// 保留策略private String retentionPolicy;private InfluxDB influxDB;public InfluxDBConnection(String username, String password, String openurl, String database,String retentionPolicy) {this.username = username;this.password = password;this.openurl = openurl;this.database = database;this.retentionPolicy = retentionPolicy == null || retentionPolicy.equals("") ? "autogen" : retentionPolicy;influxDbBuild();}/*** 创建数据库** @param dbName*/@SuppressWarnings("deprecation")public void createDB(String dbName) {influxDB.createDatabase(dbName);}/*** 删除数据库** @param dbName*/@SuppressWarnings("deprecation")public void deleteDB(String dbName) {influxDB.deleteDatabase(dbName);}/*** 测试连接是否正常** @return true 正常*/public boolean ping() {boolean isConnected = false;Pong pong;try {pong = influxDB.ping();if (pong != null) {isConnected = true;}} catch (Exception e) {e.printStackTrace();}return isConnected;}/*** 连接时序数据库 ,若不存在则创建** @return*/public InfluxDB influxDbBuild() {if (influxDB == null) {influxDB = InfluxDBFactory.connect(openurl, username, password);}try {// if (!influxDB.databaseExists(database)) {// influxDB.createDatabase(database);// }} catch (Exception e) {// 该数据库可能设置动态代理,不支持创建数据库// e.printStackTrace();} finally {influxDB.setRetentionPolicy(retentionPolicy);}influxDB.setLogLevel(InfluxDB.LogLevel.NONE);return influxDB;}/*** 创建自定义保留策略** @param policyName*            策略名* @param duration*            保存天数* @param replication*            保存副本数量* @param isDefault*            是否设为默认保留策略*/public void createRetentionPolicy(String policyName, String duration, int replication, Boolean isDefault) {String sql = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s ", policyName,database, duration, replication);if (isDefault) {sql = sql + " DEFAULT";}this.query(sql);}/*** 创建默认的保留策略** @param :default,保存天数:30天,保存副本数量:1*            设为默认保留策略*/public void createDefaultRetentionPolicy() {String command = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s DEFAULT","default", database, "30d", 1);this.query(command);}/*** 查询** @param command*            查询语句* @return*/public QueryResult query(String command) {return influxDB.query(new Query(command, database));}/*** 插入** @param measurement*            表* @param tags*            标签* @param fields*            字段*/public void insert(String measurement, Map<String, String> tags, Map<String, Object> fields, long time,TimeUnit timeUnit) {Point.Builder builder = Point.measurement(measurement);builder.tag(tags);builder.fields(fields);if (0 != time) {builder.time(time, timeUnit);}influxDB.write(database, retentionPolicy, builder.build());}/*** 批量写入测点** @param batchPoints*/public void batchInsert(BatchPoints batchPoints) {influxDB.write(batchPoints);// influxDB.enableGzip();// influxDB.enableBatch(2000,100,TimeUnit.MILLISECONDS);// influxDB.disableGzip();// influxDB.disableBatch();}/*** 批量写入数据** @param database*            数据库* @param retentionPolicy*            保存策略* @param consistency*            一致性* @param records*            要保存的数据(调用BatchPoints.lineProtocol()可得到一条record)*/public void batchInsert(final String database, final String retentionPolicy, final InfluxDB.ConsistencyLevel consistency,final List<String> records) {influxDB.write(database, retentionPolicy, consistency, records);}/*** 删除** @param command*            删除语句* @return 返回错误信息*/public String deleteMeasurementData(String command) {QueryResult result = influxDB.query(new Query(command, database));return result.getError();}/*** 关闭数据库*/public void close() {influxDB.close();}/*** 构建Point** @param measurement* @param time* @param fields* @return*/public Point pointBuilder(String measurement, long time, Map<String, String> tags, Map<String, Object> fields) {Point point = Point.measurement(measurement).time(time, TimeUnit.MILLISECONDS).tag(tags).fields(fields).build();return point;}
}

 InfluxDBManager中编写数据写入代码

package com.influx;import com.github.javafaker.Faker;
import com.mysql.cj.x.protobuf.MysqlxDatatypes;
import org.influxdb.InfluxDB;
import org.influxdb.dto.QueryResult;import java.util.HashMap;
import java.util.Locale;
import java.util.concurrent.TimeUnit;public class InfluxDBManager {public static void main(String[] args) throws InterruptedException {//这一行代码数据库名,表名,ip地址,还有持久策略要根据每个人实际情况填写InfluxDBConnection influxDBConnection = new InfluxDBConnection("influx", "influx", "http://192.168.224.129:8086", "influxdb", "influx_retention");Faker faker = new Faker(new Locale("zh-CN"));String[] areas = {"南","北","东","西"};Integer[] altitudes = new Integer[]{500,800,1000,1500};int k = 1;while(true){System.out.println("当前是第轮插入数据:"+ k);int al_intdex = (int)Math.floor(Math.random() * altitudes.length);int ar_index = (int)Math.floor(Math.random() * areas.length);HashMap<String, String> hashMap = new HashMap<String, String>();hashMap.put("altitude", altitudes[al_intdex].toString());hashMap.put("area", areas[ar_index].toString());HashMap<String, Object> stringObjectHashMap = new HashMap<String, Object>();stringObjectHashMap.put("temperature", faker.number().numberBetween(10,30));stringObjectHashMap.put("humidity", faker.number().numberBetween(10,30));influxDBConnection.insert("weather", hashMap, stringObjectHashMap, System.currentTimeMillis(), TimeUnit.MILLISECONDS);Thread.sleep(faker.number().numberBetween(500,1000));k++;}}
}

这篇关于influxdb产生实时数据的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1122763

相关文章

Redis的数据过期策略和数据淘汰策略

《Redis的数据过期策略和数据淘汰策略》本文主要介绍了Redis的数据过期策略和数据淘汰策略,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录一、数据过期策略1、惰性删除2、定期删除二、数据淘汰策略1、数据淘汰策略概念2、8种数据淘汰策略

轻松上手MYSQL之JSON函数实现高效数据查询与操作

《轻松上手MYSQL之JSON函数实现高效数据查询与操作》:本文主要介绍轻松上手MYSQL之JSON函数实现高效数据查询与操作的相关资料,MySQL提供了多个JSON函数,用于处理和查询JSON数... 目录一、jsON_EXTRACT 提取指定数据二、JSON_UNQUOTE 取消双引号三、JSON_KE

Python给Excel写入数据的四种方法小结

《Python给Excel写入数据的四种方法小结》本文主要介绍了Python给Excel写入数据的四种方法小结,包含openpyxl库、xlsxwriter库、pandas库和win32com库,具有... 目录1. 使用 openpyxl 库2. 使用 xlsxwriter 库3. 使用 pandas 库

SpringBoot定制JSON响应数据的实现

《SpringBoot定制JSON响应数据的实现》本文主要介绍了SpringBoot定制JSON响应数据的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们... 目录前言一、如何使用@jsonView这个注解?二、应用场景三、实战案例注解方式编程方式总结 前言

使用Python在Excel中创建和取消数据分组

《使用Python在Excel中创建和取消数据分组》Excel中的分组是一种通过添加层级结构将相邻行或列组织在一起的功能,当分组完成后,用户可以通过折叠或展开数据组来简化数据视图,这篇博客将介绍如何使... 目录引言使用工具python在Excel中创建行和列分组Python在Excel中创建嵌套分组Pyt

在Rust中要用Struct和Enum组织数据的原因解析

《在Rust中要用Struct和Enum组织数据的原因解析》在Rust中,Struct和Enum是组织数据的核心工具,Struct用于将相关字段封装为单一实体,便于管理和扩展,Enum用于明确定义所有... 目录为什么在Rust中要用Struct和Enum组织数据?一、使用struct组织数据:将相关字段绑

在Mysql环境下对数据进行增删改查的操作方法

《在Mysql环境下对数据进行增删改查的操作方法》本文介绍了在MySQL环境下对数据进行增删改查的基本操作,包括插入数据、修改数据、删除数据、数据查询(基本查询、连接查询、聚合函数查询、子查询)等,并... 目录一、插入数据:二、修改数据:三、删除数据:1、delete from 表名;2、truncate

Java实现Elasticsearch查询当前索引全部数据的完整代码

《Java实现Elasticsearch查询当前索引全部数据的完整代码》:本文主要介绍如何在Java中实现查询Elasticsearch索引中指定条件下的全部数据,通过设置滚动查询参数(scrol... 目录需求背景通常情况Java 实现查询 Elasticsearch 全部数据写在最后需求背景通常情况下

Java中注解与元数据示例详解

《Java中注解与元数据示例详解》Java注解和元数据是编程中重要的概念,用于描述程序元素的属性和用途,:本文主要介绍Java中注解与元数据的相关资料,文中通过代码介绍的非常详细,需要的朋友可以参... 目录一、引言二、元数据的概念2.1 定义2.2 作用三、Java 注解的基础3.1 注解的定义3.2 内

将sqlserver数据迁移到mysql的详细步骤记录

《将sqlserver数据迁移到mysql的详细步骤记录》:本文主要介绍将SQLServer数据迁移到MySQL的步骤,包括导出数据、转换数据格式和导入数据,通过示例和工具说明,帮助大家顺利完成... 目录前言一、导出SQL Server 数据二、转换数据格式为mysql兼容格式三、导入数据到MySQL数据