从Apache Kafka读数据写入TimescaleDB的案例

2024-03-22 14:50

本文主要是介绍从Apache Kafka读数据写入TimescaleDB的案例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

原文链接:https://streamsets.com/blog/ingesting-data-apache-kafka-timescaledb/

  • 作者:Pat Patterson
  • 2019年5月28日
  • StreamSets新闻

时间序列数据库

时间序列数据库经过优化,可以处理按时间索引的数据,有效地处理特定时间范围内的数据查询。市场上有几个时间序列数据库,事实上,Data Collector长期以来一直有能力写入InfluxDB,但是我对TimescaleDB感兴趣的是它建立在PostgreSQL之上。完全披露:我作为Salesforce的开发人员传播者花了五年半的时间,而PostgreSQL曾经是,现在仍然是Heroku平台的核心部分,但我也喜欢PostgreSQL作为MySQL的强大替代品。

TimescaleDB入门

在听Diana的演示时,我运行了TimescaleDB Docker镜像,将笔记本电脑上的端口54321映射到Docker容器中的5432,这样就不会与我现有的PostgreSQL部署发生冲突。一旦戴安娜离开舞台,我就会浏览TimescaleDB快速入门的“创建Hypertables”部分,创建一个PostgreSQL数据库,为TimescaleDB启用它,并为其写入一行数据:

tutorial=# INSERT INTO conditions(time, location, temperature, humidity)
tutorial-#   VALUES (NOW(), 'office', 70.0, 50.0);
INSERT 0 1
tutorial=# SELECT * FROM conditions ORDER BY time DESC LIMIT 10;time              | location | temperature | humidity 
-------------------------------+----------+-------------+----------2019-05-25 00:37:11.288536+00 | office   |          70 |       50
(1 row)

第一个TimescaleDB Pipeline

由于TimescaleDB是基于PostgreSQL构建的,因此标准的PostgreSQL JDBC驱动程序可以直接使用它。由于我已经在Data Collector中安装了驱动程序,因此我花了大约两分钟构建一个简单的测试管道来将第二行数据写入闪亮的新TimescaleDB服务器:

æ¶é´å°ºåº¦æµè¯ç®¡é

生成的测试结果数据

tutorial=# SELECT * FROM conditions ORDER BY time DESC LIMIT 10;time            |         location          |    temperature     |      humidity      
----------------------------+---------------------------+--------------------+--------------------2020-12-25 23:35:43.889+00 | Grocery                   |  0.806543707847595 | 0.08446377515792852020-10-27 02:20:47.905+00 | Shoes                     | 0.0802439451217651 |  0.3988062143325812020-10-24 01:15:15.903+00 | Games & Industrial        |  0.577536821365356 |  0.4052745103836062020-10-22 02:32:21.916+00 | Baby                      | 0.0524919033050537 |  0.4990888833999632020-09-12 10:30:53.905+00 | Electronics & Garden      |  0.679168224334717 |  0.4276011586189272020-08-25 19:39:50.895+00 | Baby & Electronics        |  0.265614211559296 |  0.2746958136558532020-08-15 15:53:02.906+00 | Home                      | 0.0492082238197327 |  0.0466884374618532020-08-10 08:56:03.889+00 | Electronics, Home & Tools |  0.336894452571869 |  0.8480106592178342020-08-02 09:48:58.918+00 | Books & Jewelry           |  0.217794299125671 |  0.7347096204757692020-08-02 08:52:31.915+00 | Home                      |  0.931948065757751 |  0.499135136604309
(10 rows)

从Kafka到数据到TimescaleDB

时间序列数据库的主要用例之一是存储来自物联网的数据。我花了几分钟来编写一个简单的Python Kafka客户端,它可以模拟一组传感器,产生比我的测试管道更真实的温度和湿度数据:

from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
import random# Create a producer that JSON-encodes the message
producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii'))# Send a quarter million data points (asynchronous)
for _ in range(250000):location = random.randint(1, 4)temperature = 95.0 + random.uniform(0, 10) + locationhumidity = 45.0 + random.uniform(0, 10) - locationproducer.send('timescale', {'location': location, 'temperature': temperature, 'humidity': humidity})# Block until all the messages have been sent
producer.flush()

请注意,模拟器会为位置发出一个整数值,而不会为数据添加时间戳。正如您所看到的,只是为了好玩,我让模拟器生成了25万个数据点。这足以使TimescaleDB运行一点,而不需要花费大量时间来生成。

我用Kafka Consumer替换了我的管道的Dev Data Generator源,并在管道中添加了几个处理器:

Kafka Timescale管é

Expression Evaluator只是为每条记录添加一个时间戳,使用一些表达式语言来创建正确的格式:

${time:extractStringFromDate(time:now(), 'yyyy-MM-dd HH:mm:ss.SSSZZ')}

静态查找处理器使用字符串替换整数位置字段以匹配TimescaleDB表模式:

éææ¥æ¾éç½®

结论

TimescaleDB给我留下了深刻的印象。拆箱经验快速无痛。虽然我只给了它最简单的轮胎踢,但一切都是第一次。TimescaleDB是基于PostgreSQL构建的,这使我可以轻松地使用现成的工具编写数据,并且我能够使用熟悉的SQL命令在数据处于超级状态时使用它们。

如果您正在使用TimescaleDB,请下载StreamSets Data Collector并尝试满足您的数据集成需求。与TimescaleDB的核心一样,它在Apache 2.0许可下以开源形式提供,可免费用于测试,开发和生产。

这篇关于从Apache Kafka读数据写入TimescaleDB的案例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/835510

相关文章

Python Transformers库(NLP处理库)案例代码讲解

《PythonTransformers库(NLP处理库)案例代码讲解》本文介绍transformers库的全面讲解,包含基础知识、高级用法、案例代码及学习路径,内容经过组织,适合不同阶段的学习者,对... 目录一、基础知识1. Transformers 库简介2. 安装与环境配置3. 快速上手示例二、核心模

深入理解Apache Kafka(分布式流处理平台)

《深入理解ApacheKafka(分布式流处理平台)》ApacheKafka作为现代分布式系统中的核心中间件,为构建高吞吐量、低延迟的数据管道提供了强大支持,本文将深入探讨Kafka的核心概念、架构... 目录引言一、Apache Kafka概述1.1 什么是Kafka?1.2 Kafka的核心概念二、Ka

Python中使用正则表达式精准匹配IP地址的案例

《Python中使用正则表达式精准匹配IP地址的案例》Python的正则表达式(re模块)是完成这个任务的利器,但你知道怎么写才能准确匹配各种合法的IP地址吗,今天我们就来详细探讨这个问题,感兴趣的朋... 目录为什么需要IP正则表达式?IP地址的基本结构基础正则表达式写法精确匹配0-255的数字验证IP地

MySQL高级查询之JOIN、子查询、窗口函数实际案例

《MySQL高级查询之JOIN、子查询、窗口函数实际案例》:本文主要介绍MySQL高级查询之JOIN、子查询、窗口函数实际案例的相关资料,JOIN用于多表关联查询,子查询用于数据筛选和过滤,窗口函... 目录前言1. JOIN(连接查询)1.1 内连接(INNER JOIN)1.2 左连接(LEFT JOI

SpringBatch数据写入实现

《SpringBatch数据写入实现》SpringBatch通过ItemWriter接口及其丰富的实现,提供了强大的数据写入能力,本文主要介绍了SpringBatch数据写入实现,具有一定的参考价值,... 目录python引言一、ItemWriter核心概念二、数据库写入实现三、文件写入实现四、多目标写入

使用Python将JSON,XML和YAML数据写入Excel文件

《使用Python将JSON,XML和YAML数据写入Excel文件》JSON、XML和YAML作为主流结构化数据格式,因其层次化表达能力和跨平台兼容性,已成为系统间数据交换的通用载体,本文将介绍如何... 目录如何使用python写入数据到Excel工作表用Python导入jsON数据到Excel工作表用

springboot循环依赖问题案例代码及解决办法

《springboot循环依赖问题案例代码及解决办法》在SpringBoot中,如果两个或多个Bean之间存在循环依赖(即BeanA依赖BeanB,而BeanB又依赖BeanA),会导致Spring的... 目录1. 什么是循环依赖?2. 循环依赖的场景案例3. 解决循环依赖的常见方法方法 1:使用 @La

MySQL中实现多表查询的操作方法(配sql+实操图+案例巩固 通俗易懂版)

《MySQL中实现多表查询的操作方法(配sql+实操图+案例巩固通俗易懂版)》本文主要讲解了MySQL中的多表查询,包括子查询、笛卡尔积、自连接、多表查询的实现方法以及多列子查询等,通过实际例子和操... 目录复合查询1. 回顾查询基本操作group by 分组having1. 显示部门号为10的部门名,员

一文详解kafka开启kerberos认证的完整步骤

《一文详解kafka开启kerberos认证的完整步骤》这篇文章主要为大家详细介绍了kafka开启kerberos认证的完整步骤,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、kerberos安装部署二、准备机器三、Kerberos Server 安装1、配置krb5.con

使用Apache POI在Java中实现Excel单元格的合并

《使用ApachePOI在Java中实现Excel单元格的合并》在日常工作中,Excel是一个不可或缺的工具,尤其是在处理大量数据时,本文将介绍如何使用ApachePOI库在Java中实现Excel... 目录工具类介绍工具类代码调用示例依赖配置总结在日常工作中,Excel 是一个不可或缺的工http://