从Apache Kafka读数据写入TimescaleDB的案例

2024-03-22 14:50

本文主要是介绍从Apache Kafka读数据写入TimescaleDB的案例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

原文链接:https://streamsets.com/blog/ingesting-data-apache-kafka-timescaledb/

  • 作者:Pat Patterson
  • 2019年5月28日
  • StreamSets新闻

时间序列数据库

时间序列数据库经过优化,可以处理按时间索引的数据,有效地处理特定时间范围内的数据查询。市场上有几个时间序列数据库,事实上,Data Collector长期以来一直有能力写入InfluxDB,但是我对TimescaleDB感兴趣的是它建立在PostgreSQL之上。完全披露:我作为Salesforce的开发人员传播者花了五年半的时间,而PostgreSQL曾经是,现在仍然是Heroku平台的核心部分,但我也喜欢PostgreSQL作为MySQL的强大替代品。

TimescaleDB入门

在听Diana的演示时,我运行了TimescaleDB Docker镜像,将笔记本电脑上的端口54321映射到Docker容器中的5432,这样就不会与我现有的PostgreSQL部署发生冲突。一旦戴安娜离开舞台,我就会浏览TimescaleDB快速入门的“创建Hypertables”部分,创建一个PostgreSQL数据库,为TimescaleDB启用它,并为其写入一行数据:

tutorial=# INSERT INTO conditions(time, location, temperature, humidity)
tutorial-#   VALUES (NOW(), 'office', 70.0, 50.0);
INSERT 0 1
tutorial=# SELECT * FROM conditions ORDER BY time DESC LIMIT 10;time              | location | temperature | humidity 
-------------------------------+----------+-------------+----------2019-05-25 00:37:11.288536+00 | office   |          70 |       50
(1 row)

第一个TimescaleDB Pipeline

由于TimescaleDB是基于PostgreSQL构建的,因此标准的PostgreSQL JDBC驱动程序可以直接使用它。由于我已经在Data Collector中安装了驱动程序,因此我花了大约两分钟构建一个简单的测试管道来将第二行数据写入闪亮的新TimescaleDB服务器:

æ¶é´å°ºåº¦æµè¯ç®¡é

生成的测试结果数据

tutorial=# SELECT * FROM conditions ORDER BY time DESC LIMIT 10;time            |         location          |    temperature     |      humidity      
----------------------------+---------------------------+--------------------+--------------------2020-12-25 23:35:43.889+00 | Grocery                   |  0.806543707847595 | 0.08446377515792852020-10-27 02:20:47.905+00 | Shoes                     | 0.0802439451217651 |  0.3988062143325812020-10-24 01:15:15.903+00 | Games & Industrial        |  0.577536821365356 |  0.4052745103836062020-10-22 02:32:21.916+00 | Baby                      | 0.0524919033050537 |  0.4990888833999632020-09-12 10:30:53.905+00 | Electronics & Garden      |  0.679168224334717 |  0.4276011586189272020-08-25 19:39:50.895+00 | Baby & Electronics        |  0.265614211559296 |  0.2746958136558532020-08-15 15:53:02.906+00 | Home                      | 0.0492082238197327 |  0.0466884374618532020-08-10 08:56:03.889+00 | Electronics, Home & Tools |  0.336894452571869 |  0.8480106592178342020-08-02 09:48:58.918+00 | Books & Jewelry           |  0.217794299125671 |  0.7347096204757692020-08-02 08:52:31.915+00 | Home                      |  0.931948065757751 |  0.499135136604309
(10 rows)

从Kafka到数据到TimescaleDB

时间序列数据库的主要用例之一是存储来自物联网的数据。我花了几分钟来编写一个简单的Python Kafka客户端,它可以模拟一组传感器,产生比我的测试管道更真实的温度和湿度数据:

from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
import random# Create a producer that JSON-encodes the message
producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii'))# Send a quarter million data points (asynchronous)
for _ in range(250000):location = random.randint(1, 4)temperature = 95.0 + random.uniform(0, 10) + locationhumidity = 45.0 + random.uniform(0, 10) - locationproducer.send('timescale', {'location': location, 'temperature': temperature, 'humidity': humidity})# Block until all the messages have been sent
producer.flush()

请注意,模拟器会为位置发出一个整数值,而不会为数据添加时间戳。正如您所看到的,只是为了好玩,我让模拟器生成了25万个数据点。这足以使TimescaleDB运行一点,而不需要花费大量时间来生成。

我用Kafka Consumer替换了我的管道的Dev Data Generator源,并在管道中添加了几个处理器:

Kafka Timescale管é

Expression Evaluator只是为每条记录添加一个时间戳,使用一些表达式语言来创建正确的格式:

${time:extractStringFromDate(time:now(), 'yyyy-MM-dd HH:mm:ss.SSSZZ')}

静态查找处理器使用字符串替换整数位置字段以匹配TimescaleDB表模式:

éææ¥æ¾éç½®

结论

TimescaleDB给我留下了深刻的印象。拆箱经验快速无痛。虽然我只给了它最简单的轮胎踢,但一切都是第一次。TimescaleDB是基于PostgreSQL构建的,这使我可以轻松地使用现成的工具编写数据,并且我能够使用熟悉的SQL命令在数据处于超级状态时使用它们。

如果您正在使用TimescaleDB,请下载StreamSets Data Collector并尝试满足您的数据集成需求。与TimescaleDB的核心一样,它在Apache 2.0许可下以开源形式提供,可免费用于测试,开发和生产。

这篇关于从Apache Kafka读数据写入TimescaleDB的案例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/835510

相关文章

Java中Springboot集成Kafka实现消息发送和接收功能

《Java中Springboot集成Kafka实现消息发送和接收功能》Kafka是一个高吞吐量的分布式发布-订阅消息系统,主要用于处理大规模数据流,它由生产者、消费者、主题、分区和代理等组件构成,Ka... 目录一、Kafka 简介二、Kafka 功能三、POM依赖四、配置文件五、生产者六、消费者一、Kaf

Golang操作DuckDB实战案例分享

《Golang操作DuckDB实战案例分享》DuckDB是一个嵌入式SQL数据库引擎,它与众所周知的SQLite非常相似,但它是为olap风格的工作负载设计的,DuckDB支持各种数据类型和SQL特性... 目录DuckDB的主要优点环境准备初始化表和数据查询单行或多行错误处理和事务完整代码最后总结Duck

SpringBoot使用Apache Tika检测敏感信息

《SpringBoot使用ApacheTika检测敏感信息》ApacheTika是一个功能强大的内容分析工具,它能够从多种文件格式中提取文本、元数据以及其他结构化信息,下面我们来看看如何使用Ap... 目录Tika 主要特性1. 多格式支持2. 自动文件类型检测3. 文本和元数据提取4. 支持 OCR(光学

Kafka拦截器的神奇操作方法

《Kafka拦截器的神奇操作方法》Kafka拦截器是一种强大的机制,用于在消息发送和接收过程中插入自定义逻辑,它们可以用于消息定制、日志记录、监控、业务逻辑集成、性能统计和异常处理等,本文介绍Kafk... 目录前言拦截器的基本概念Kafka 拦截器的定义和基本原理:拦截器是 Kafka 消息传递的不可或缺

MySQL不使用子查询的原因及优化案例

《MySQL不使用子查询的原因及优化案例》对于mysql,不推荐使用子查询,效率太差,执行子查询时,MYSQL需要创建临时表,查询完毕后再删除这些临时表,所以,子查询的速度会受到一定的影响,本文给大家... 目录不推荐使用子查询和JOIN的原因解决方案优化案例案例1:查询所有有库存的商品信息案例2:使用EX

Apache Tomcat服务器版本号隐藏的几种方法

《ApacheTomcat服务器版本号隐藏的几种方法》本文主要介绍了ApacheTomcat服务器版本号隐藏的几种方法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需... 目录1. 隐藏HTTP响应头中的Server信息编辑 server.XML 文件2. 修China编程改错误

如何在一台服务器上使用docker运行kafka集群

《如何在一台服务器上使用docker运行kafka集群》文章详细介绍了如何在一台服务器上使用Docker运行Kafka集群,包括拉取镜像、创建网络、启动Kafka容器、检查运行状态、编写启动和关闭脚本... 目录1.拉取镜像2.创建集群之间通信的网络3.将zookeeper加入到网络中4.启动kafka集群

SpringBoot使用Apache POI库读取Excel文件的操作详解

《SpringBoot使用ApachePOI库读取Excel文件的操作详解》在日常开发中,我们经常需要处理Excel文件中的数据,无论是从数据库导入数据、处理数据报表,还是批量生成数据,都可能会遇到... 目录项目背景依赖导入读取Excel模板的实现代码实现代码解析ExcelDemoInfoDTO 数据传输

IDEA中的Kafka管理神器详解

《IDEA中的Kafka管理神器详解》这款基于IDEA插件实现的Kafka管理工具,能够在本地IDE环境中直接运行,简化了设置流程,为开发者提供了更加紧密集成、高效且直观的Kafka操作体验... 目录免安装:IDEA中的Kafka管理神器!简介安装必要的插件创建 Kafka 连接第一步:创建连接第二步:选

Hadoop企业开发案例调优场景

需求 (1)需求:从1G数据中,统计每个单词出现次数。服务器3台,每台配置4G内存,4核CPU,4线程。 (2)需求分析: 1G / 128m = 8个MapTask;1个ReduceTask;1个mrAppMaster 平均每个节点运行10个 / 3台 ≈ 3个任务(4    3    3) HDFS参数调优 (1)修改:hadoop-env.sh export HDFS_NAMENOD