influxdb产生实时数据

2024-08-31 03:32
文章标签 数据 实时 influxdb 产生

本文主要是介绍influxdb产生实时数据,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

开发工具:idea 简历maven工程

<dependency><groupId>org.influxdb</groupId><artifactId>influxdb-java</artifactId><version>2.17</version>
</dependency>
<dependency><groupId>com.github.javafaker</groupId><artifactId>javafaker</artifactId><version>1.0.2</version>
</dependency>

 java工程钟创立两个java文件,目录结构如下

influxDBConnection文件中添加如下代码

package com.influx;import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.*;import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;public class InfluxDBConnection {// 用户名private String username;// 密码private String password;// 连接地址private String openurl;// 数据库private String database;// 保留策略private String retentionPolicy;private InfluxDB influxDB;public InfluxDBConnection(String username, String password, String openurl, String database,String retentionPolicy) {this.username = username;this.password = password;this.openurl = openurl;this.database = database;this.retentionPolicy = retentionPolicy == null || retentionPolicy.equals("") ? "autogen" : retentionPolicy;influxDbBuild();}/*** 创建数据库** @param dbName*/@SuppressWarnings("deprecation")public void createDB(String dbName) {influxDB.createDatabase(dbName);}/*** 删除数据库** @param dbName*/@SuppressWarnings("deprecation")public void deleteDB(String dbName) {influxDB.deleteDatabase(dbName);}/*** 测试连接是否正常** @return true 正常*/public boolean ping() {boolean isConnected = false;Pong pong;try {pong = influxDB.ping();if (pong != null) {isConnected = true;}} catch (Exception e) {e.printStackTrace();}return isConnected;}/*** 连接时序数据库 ,若不存在则创建** @return*/public InfluxDB influxDbBuild() {if (influxDB == null) {influxDB = InfluxDBFactory.connect(openurl, username, password);}try {// if (!influxDB.databaseExists(database)) {// influxDB.createDatabase(database);// }} catch (Exception e) {// 该数据库可能设置动态代理,不支持创建数据库// e.printStackTrace();} finally {influxDB.setRetentionPolicy(retentionPolicy);}influxDB.setLogLevel(InfluxDB.LogLevel.NONE);return influxDB;}/*** 创建自定义保留策略** @param policyName*            策略名* @param duration*            保存天数* @param replication*            保存副本数量* @param isDefault*            是否设为默认保留策略*/public void createRetentionPolicy(String policyName, String duration, int replication, Boolean isDefault) {String sql = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s ", policyName,database, duration, replication);if (isDefault) {sql = sql + " DEFAULT";}this.query(sql);}/*** 创建默认的保留策略** @param :default,保存天数:30天,保存副本数量:1*            设为默认保留策略*/public void createDefaultRetentionPolicy() {String command = String.format("CREATE RETENTION POLICY \"%s\" ON \"%s\" DURATION %s REPLICATION %s DEFAULT","default", database, "30d", 1);this.query(command);}/*** 查询** @param command*            查询语句* @return*/public QueryResult query(String command) {return influxDB.query(new Query(command, database));}/*** 插入** @param measurement*            表* @param tags*            标签* @param fields*            字段*/public void insert(String measurement, Map<String, String> tags, Map<String, Object> fields, long time,TimeUnit timeUnit) {Point.Builder builder = Point.measurement(measurement);builder.tag(tags);builder.fields(fields);if (0 != time) {builder.time(time, timeUnit);}influxDB.write(database, retentionPolicy, builder.build());}/*** 批量写入测点** @param batchPoints*/public void batchInsert(BatchPoints batchPoints) {influxDB.write(batchPoints);// influxDB.enableGzip();// influxDB.enableBatch(2000,100,TimeUnit.MILLISECONDS);// influxDB.disableGzip();// influxDB.disableBatch();}/*** 批量写入数据** @param database*            数据库* @param retentionPolicy*            保存策略* @param consistency*            一致性* @param records*            要保存的数据(调用BatchPoints.lineProtocol()可得到一条record)*/public void batchInsert(final String database, final String retentionPolicy, final InfluxDB.ConsistencyLevel consistency,final List<String> records) {influxDB.write(database, retentionPolicy, consistency, records);}/*** 删除** @param command*            删除语句* @return 返回错误信息*/public String deleteMeasurementData(String command) {QueryResult result = influxDB.query(new Query(command, database));return result.getError();}/*** 关闭数据库*/public void close() {influxDB.close();}/*** 构建Point** @param measurement* @param time* @param fields* @return*/public Point pointBuilder(String measurement, long time, Map<String, String> tags, Map<String, Object> fields) {Point point = Point.measurement(measurement).time(time, TimeUnit.MILLISECONDS).tag(tags).fields(fields).build();return point;}
}

 InfluxDBManager中编写数据写入代码

package com.influx;import com.github.javafaker.Faker;
import com.mysql.cj.x.protobuf.MysqlxDatatypes;
import org.influxdb.InfluxDB;
import org.influxdb.dto.QueryResult;import java.util.HashMap;
import java.util.Locale;
import java.util.concurrent.TimeUnit;public class InfluxDBManager {public static void main(String[] args) throws InterruptedException {//这一行代码数据库名,表名,ip地址,还有持久策略要根据每个人实际情况填写InfluxDBConnection influxDBConnection = new InfluxDBConnection("influx", "influx", "http://192.168.224.129:8086", "influxdb", "influx_retention");Faker faker = new Faker(new Locale("zh-CN"));String[] areas = {"南","北","东","西"};Integer[] altitudes = new Integer[]{500,800,1000,1500};int k = 1;while(true){System.out.println("当前是第轮插入数据:"+ k);int al_intdex = (int)Math.floor(Math.random() * altitudes.length);int ar_index = (int)Math.floor(Math.random() * areas.length);HashMap<String, String> hashMap = new HashMap<String, String>();hashMap.put("altitude", altitudes[al_intdex].toString());hashMap.put("area", areas[ar_index].toString());HashMap<String, Object> stringObjectHashMap = new HashMap<String, Object>();stringObjectHashMap.put("temperature", faker.number().numberBetween(10,30));stringObjectHashMap.put("humidity", faker.number().numberBetween(10,30));influxDBConnection.insert("weather", hashMap, stringObjectHashMap, System.currentTimeMillis(), TimeUnit.MILLISECONDS);Thread.sleep(faker.number().numberBetween(500,1000));k++;}}
}

这篇关于influxdb产生实时数据的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1122763

相关文章

MySQL大表数据的分区与分库分表的实现

《MySQL大表数据的分区与分库分表的实现》数据库的分区和分库分表是两种常用的技术方案,本文主要介绍了MySQL大表数据的分区与分库分表的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有... 目录1. mysql大表数据的分区1.1 什么是分区?1.2 分区的类型1.3 分区的优点1.4 分

Mysql删除几亿条数据表中的部分数据的方法实现

《Mysql删除几亿条数据表中的部分数据的方法实现》在MySQL中删除一个大表中的数据时,需要特别注意操作的性能和对系统的影响,本文主要介绍了Mysql删除几亿条数据表中的部分数据的方法实现,具有一定... 目录1、需求2、方案1. 使用 DELETE 语句分批删除2. 使用 INPLACE ALTER T

Python Dash框架在数据可视化仪表板中的应用与实践记录

《PythonDash框架在数据可视化仪表板中的应用与实践记录》Python的PlotlyDash库提供了一种简便且强大的方式来构建和展示互动式数据仪表板,本篇文章将深入探讨如何使用Dash设计一... 目录python Dash框架在数据可视化仪表板中的应用与实践1. 什么是Plotly Dash?1.1

Redis 中的热点键和数据倾斜示例详解

《Redis中的热点键和数据倾斜示例详解》热点键是指在Redis中被频繁访问的特定键,这些键由于其高访问频率,可能导致Redis服务器的性能问题,尤其是在高并发场景下,本文给大家介绍Redis中的热... 目录Redis 中的热点键和数据倾斜热点键(Hot Key)定义特点应对策略示例数据倾斜(Data S

Python实现将MySQL中所有表的数据都导出为CSV文件并压缩

《Python实现将MySQL中所有表的数据都导出为CSV文件并压缩》这篇文章主要为大家详细介绍了如何使用Python将MySQL数据库中所有表的数据都导出为CSV文件到一个目录,并压缩为zip文件到... python将mysql数据库中所有表的数据都导出为CSV文件到一个目录,并压缩为zip文件到另一个

SpringBoot整合jasypt实现重要数据加密

《SpringBoot整合jasypt实现重要数据加密》Jasypt是一个专注于简化Java加密操作的开源工具,:本文主要介绍详细介绍了如何使用jasypt实现重要数据加密,感兴趣的小伙伴可... 目录jasypt简介 jasypt的优点SpringBoot使用jasypt创建mapper接口配置文件加密

使用Python高效获取网络数据的操作指南

《使用Python高效获取网络数据的操作指南》网络爬虫是一种自动化程序,用于访问和提取网站上的数据,Python是进行网络爬虫开发的理想语言,拥有丰富的库和工具,使得编写和维护爬虫变得简单高效,本文将... 目录网络爬虫的基本概念常用库介绍安装库Requests和BeautifulSoup爬虫开发发送请求解

Oracle存储过程里操作BLOB的字节数据的办法

《Oracle存储过程里操作BLOB的字节数据的办法》该篇文章介绍了如何在Oracle存储过程中操作BLOB的字节数据,作者研究了如何获取BLOB的字节长度、如何使用DBMS_LOB包进行BLOB操作... 目录一、缘由二、办法2.1 基本操作2.2 DBMS_LOB包2.3 字节级操作与RAW数据类型2.

MySQL使用binlog2sql工具实现在线恢复数据功能

《MySQL使用binlog2sql工具实现在线恢复数据功能》binlog2sql是大众点评开源的一款用于解析MySQLbinlog的工具,根据不同选项,可以得到原始SQL、回滚SQL等,下面我们就来... 目录背景目标步骤准备工作恢复数据结果验证结论背景生产数据库执行 SQL 脚本,一般会经过正规的审批

kotlin中的数据转换方法(示例详解)

《kotlin中的数据转换方法(示例详解)》这篇文章介绍了Kotlin中将数字转换为字符串和字符串转换为数字的多种方法,包括使用`toString()`、字符串模板、格式化字符串、处理可空类型等,同时... 目录1. 直接使用 toString() 方法2. 字符串模板(自动转换)3. 格式化字符串(控制输