Kafka 流式计算工具 ksqlDB 笔记:Pull Query 的用途及特性

2024-02-27 02:38

本文主要是介绍Kafka 流式计算工具 ksqlDB 笔记:Pull Query 的用途及特性,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

ksqlDB 是学习和开发 kafka 流式计算的很方便的工具。它支持 Push Query 和 Pull Query。下面是一些 Pull Query 的测试。

测试对象

我建立了下面的 stream 作为测试对象:

CREATE OR REPLACE STREAM tagvalue (tagId INT, value DOUBLE)WITH (kafka_topic='tagvalue', value_format='json', partitions=1);

插入数据

INSERT INTO tagvalue (tagId, value) VALUES (1, 11000);
INSERT INTO tagvalue (tagId, value) VALUES (2, 10000);

执行 Pull Query

直接对 stream 执行 pull query

SELECT *
FROM tagvalue;

系统如下。系统要求必须有 where 语句。
在这里插入图片描述
增加 where 语句:

SELECT *
FROM tagvalue
WHERE tagId = 1;

系统提示如下。系统说我们的 stream 没有主键。
在这里插入图片描述
由于不可能修改 stream 的 schema,我们使用系统推荐的方法,改变如下配置为 true,再次执行查询得到如下提示:

在这里插入图片描述
系统的提示是 不能对 stream 执行 pull query.

对 table 执行 pull query

创建基于 tagvalue topic 的 table:

CREATE OR REPLACE TABLE tagvalueview (tagId INT PRIMARY KEY, value DOUBLE)WITH (kafka_topic='tagvalue', value_format='json', partitions=1);

执行 pull query:

select *
from tagvalueview;

得到如下结果。系统不能直接查询基于 topic 创建的 table
在这里插入图片描述
按照系统提示创建能查询的 table:

CREATE TABLE QUERYABLE_TAGVALUEVIEW AS SELECT * FROM TAGVALUEVIEW

这时候系统增加了一个新的topic:
在这里插入图片描述

select *
from QUERYABLE_TAGVALUEVIEW ;

以下是执行结果。我们可以看到,系统什么都没返回:

在这里插入图片描述
我们再创建一个实时统计 tag 数值数量的 table:

CREATE OR REPLACE TABLE tagvalueview ASSELECT tagId, count(*)FROM tagvalueGROUP BY tagIdEMIT CHANGES;

执行以下查询:

select *
from tagvalueview;

得到查询结果:
在这里插入图片描述

结论

Pull Query 只能在 table 上执行,而且是 queryable table. Pull Query 结合 table 可以帮助开发者统计已有数据的结果。

这篇关于Kafka 流式计算工具 ksqlDB 笔记:Pull Query 的用途及特性的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/750962

相关文章

Python pyinstaller实现图形化打包工具

《Pythonpyinstaller实现图形化打包工具》:本文主要介绍一个使用PythonPYQT5制作的关于pyinstaller打包工具,代替传统的cmd黑窗口模式打包页面,实现更快捷方便的... 目录1.简介2.运行效果3.相关源码1.简介一个使用python PYQT5制作的关于pyinstall

如何在一台服务器上使用docker运行kafka集群

《如何在一台服务器上使用docker运行kafka集群》文章详细介绍了如何在一台服务器上使用Docker运行Kafka集群,包括拉取镜像、创建网络、启动Kafka容器、检查运行状态、编写启动和关闭脚本... 目录1.拉取镜像2.创建集群之间通信的网络3.将zookeeper加入到网络中4.启动kafka集群

如何用Java结合经纬度位置计算目标点的日出日落时间详解

《如何用Java结合经纬度位置计算目标点的日出日落时间详解》这篇文章主详细讲解了如何基于目标点的经纬度计算日出日落时间,提供了在线API和Java库两种计算方法,并通过实际案例展示了其应用,需要的朋友... 目录前言一、应用示例1、天安门升旗时间2、湖南省日出日落信息二、Java日出日落计算1、在线API2

IDEA中的Kafka管理神器详解

《IDEA中的Kafka管理神器详解》这款基于IDEA插件实现的Kafka管理工具,能够在本地IDE环境中直接运行,简化了设置流程,为开发者提供了更加紧密集成、高效且直观的Kafka操作体验... 目录免安装:IDEA中的Kafka管理神器!简介安装必要的插件创建 Kafka 连接第一步:创建连接第二步:选

使用Python制作一个PDF批量加密工具

《使用Python制作一个PDF批量加密工具》PDF批量加密‌是一种保护PDF文件安全性的方法,通过为多个PDF文件设置相同的密码,防止未经授权的用户访问这些文件,下面我们来看看如何使用Python制... 目录1.简介2.运行效果3.相关源码1.简介一个python写的PDF批量加密工具。PDF批量加密

使用Java编写一个文件批量重命名工具

《使用Java编写一个文件批量重命名工具》这篇文章主要为大家详细介绍了如何使用Java编写一个文件批量重命名工具,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录背景处理1. 文件夹检查与遍历2. 批量重命名3. 输出配置代码片段完整代码背景在开发移动应用时,UI设计通常会提供不

Python按条件批量删除TXT文件行工具

《Python按条件批量删除TXT文件行工具》这篇文章主要为大家详细介绍了Python如何实现按条件批量删除TXT文件中行的工具,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1.简介2.运行效果3.相关源码1.简介一个由python编写android的可根据TXT文件按条件批

详解Python中通用工具类与异常处理

《详解Python中通用工具类与异常处理》在Python开发中,编写可重用的工具类和通用的异常处理机制是提高代码质量和开发效率的关键,本文将介绍如何将特定的异常类改写为更通用的ValidationEx... 目录1. 通用异常类:ValidationException2. 通用工具类:Utils3. 示例文

高效录音转文字:2024年四大工具精选!

在快节奏的工作生活中,能够快速将录音转换成文字是一项非常实用的能力。特别是在需要记录会议纪要、讲座内容或者是采访素材的时候,一款优秀的在线录音转文字工具能派上大用场。以下推荐几个好用的录音转文字工具! 365在线转文字 直达链接:https://www.pdf365.cn/ 365在线转文字是一款提供在线录音转文字服务的工具,它以其高效、便捷的特点受到用户的青睐。用户无需下载安装任何软件,只

搭建Kafka+zookeeper集群调度

前言 硬件环境 172.18.0.5        kafkazk1        Kafka+zookeeper                Kafka Broker集群 172.18.0.6        kafkazk2        Kafka+zookeeper                Kafka Broker集群 172.18.0.7        kafkazk3