Kafka KSQL实战

2024-09-06 20:48
文章标签 实战 kafka ksql

本文主要是介绍Kafka KSQL实战,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

点击上方蓝色字体,选择“设为星标

回复”资源“获取更多资源

大数据技术与架构

点击右侧关注,大数据开发领域最强公众号!

暴走大数据

点击右侧关注,暴走大数据!

背景

Kafka早期作为一个日志消息系统,很受运维欢迎的,配合ELK玩起来很happy,在kafka慢慢的转向流式平台的过程中,开发也慢慢介入了,一些业务系统也开始和kafka对接起来了,也还是很受大家欢迎的,由于业务需要,一部分小白也就免不了接触kafka了,这些小白总是会安奈不住好奇心,要精确的查看kafka中的某一条数据,作为服务提供方,我也很方啊,该怎么怼?业务方不敢得罪啊,只能写consumer去消费,然后人肉查询。

需求

有什么方法能直接查询kafka中已有的数据呢?那时候presto就映入眼帘了,初步探索后发现presto确实强大,和我们在用的impala有的一拼,支持的数据源也更多,什么redis、mongo、kafka都可以用sql来查询,真是救星啊,这样那群小白就可以直接使用presto来查询里面的数据了。不过presto在不开发插件的情况下,对kafka的数据有格式要求,支持json、avro。但是我只是想用sql查询kafka,而presto功能过于强大,必然整个框架就显得比较厚重了,功能多嘛。有什么轻量级的工具呢?

介绍

某一天,kafka的亲儿子KSQL就诞生了,KSQL是一个用于Apache kafka的流式SQL引擎,KSQL降低了进入流处理的门槛,提供了一个简单的、完全交互式的SQL接口,用于处理Kafka的数据,可以让我们在流数据上持续执行 SQL 查询,KSQL支持广泛的强大的流处理操作,包括聚合、连接、窗口、会话等等。

KSQL在内部使用Kafka的Streams API,并且它们共享与Kafka流处理相同的核心抽象,KSQL有两个核心抽象,它们对应于到Kafka Streams中的两个核心抽象,让你可以处理kafka的topic数据。关于这两个核心抽象下章节解读。

架构

部署架构

由一个KSQL服务器进程执行查询。一组KSQL进程可以作为集群运行。可以通过启动更多的KSQL实例来动态添加更多的处理能力。这些KSQL实例是容错的,如果一个实例失败了,其他的就会接管它的工作。查询是使用交互式的KSQL命令行客户端启动的,该客户端通过REST API向集群发送命令。命令行允许检查可用的stream和table,发出新的查询,检查状态并终止正在运行的查询。KSQL内部是使用Kafka的stream API构建的,它继承了它的弹性可伸缩性、先进的状态管理和容错功能,并支持Kafka最近引入的一次性处理语义。KSQL服务器将此嵌入到一个分布式SQL引擎中(包括一些用于查询性能的自动字节代码生成)和一个用于查询和控制的REST API。

处理架构

抽象概念

KSQL简化了流应用程序,它集成了stream和table的概念,允许使用表示现在发生的事件的stream来连接表示当前状态的table。Apache Kafka中的一个topic可以表示为KSQL中的STREAM或TABLE,具体取决于topic处理的预期语义。下面看看两个核心的解读。

stream:流是无限制的结构化数据序列,stream中的fact是不可变的,这意味着可以将新fact插入到stream中,但是现有fact永远不会被更新或删除。stream可以从Kafka topic创建,或者从现有的stream和table中派生。

table:一个table是一个stream或另一个table的视图,它代表了一个不断变化的fact的集合,它相当于传统的数据库表,但通过流化等流语义来丰富。表中的事实是可变的,这意味着可以将新的事实插入到表中,现有的事实可以被更新或删除。可以从Kafka主题中创建表,也可以从现有的流和表中派生表。

部署

ksql支持kafka0.11之后的版本,在confluent的V3和V4版本中默认并没有加入ksql server程序,当然V3和V4是支持ksql的,在V5版本中已经默认加入ksql了,为了方便演示,我们使用confluent kafka V5版本演示,zk和kafka也是单实例启动。

下载

wget https://packages.confluent.io/archive/5.0/confluent-oss-5.0.0-2.11.tar.gz
tar zxvf confluent-oss-5.0.0-2.11.tar.gz -C /opt/programs/confluent_5.0.0

启动zk

cd /opt/programs/confluent_5.0.0
bin/zookeeper-server-start -daemon etc/kafka/zookeeper.properties

启动kafka

cd /opt/programs/confluent_5.0.0
bin/kafka-server-start -daemon etc/kafka/server.properties

创建topic和data

confluent自带了一个ksql-datagen工具,可以创建和产生相关的topic和数据,ksql-datagen可以指定的参数如下:

[bootstrap-server=<kafka bootstrap server(s)> (defaults to localhost:9092)]
[quickstart=<quickstart preset> (case-insensitive; one of 'orders', 'users', or 'pageviews')]
schema=<avro schema file>
[schemaRegistryUrl=<url for Confluent Schema Registry> (defaults to http://localhost:8081)]
format=<message format> (case-insensitive; one of 'avro', 'json', or 'delimited')
topic=<kafka topic name>
key=<name of key column>
[iterations=<number of rows> (defaults to 1,000,000)]
[maxInterval=<Max time in ms between rows> (defaults to 500)]
[propertiesFile=<file specifying Kafka client properties>]

创建pageviews,数据格式为delimited

cd /opt/programs/confluent_5.0.0/bin
./ksql-datagen quickstart=pageviews format=delimited topic=pageviews maxInterval=500

ps:以上命令会源源不断在stdin上输出数据,就是工具自己产生的数据,如下样例

8001 --> ([ 1539063767860 | 'User_6' | 'Page_77' ]) ts:1539063767860
8011 --> ([ 1539063767981 | 'User_9' | 'Page_75' ]) ts:1539063767981
8021 --> ([ 1539063768086 | 'User_5' | 'Page_16' ]) ts:1539063768086

不过使用consumer消费出来的数据是如下样式

1539066430530,User_5,Page_29
1539066430915,User_6,Page_74
1539066431192,User_4,Page_28
1539066431621,User_6,Page_38
1539066431772,User_7,Page_29
1539066432122,User_8,Page_34

创建users,数据格式为json

cd /opt/programs/confluent_5.0.0/bin
./ksql-datagen quickstart=users format=json topic=users maxInterval=100

ps:以上命令会源源不断在stdin上输出数据,就是工具自己产生的数据,如下样例

User_5 --> ([ 1517896551436 | 'User_5' | 'Region_5' | 'MALE' ]) ts:1539063787413
User_7 --> ([ 1513998830510 | 'User_7' | 'Region_4' | 'MALE' ]) ts:1539063787430
User_6 --> ([ 1514865642822 | 'User_6' | 'Region_2' | 'MALE' ]) ts:1539063787481

不过使用consumer消费出来的数据是如下样式

{"registertime":1507118206666,"userid":"User_6","regionid":"Region_7","gender":"OTHER"}
{"registertime":1506192314325,"userid":"User_1","regionid":"Region_1","gender":"MALE"}
{"registertime":1489277749526,"userid":"User_6","regionid":"Region_4","gender":"FEMALE"}
{"registertime":1497188917765,"userid":"User_9","regionid":"Region_3","gender":"OTHER"}
{"registertime":1493121964253,"userid":"User_4","regionid":"Region_3","gender":"MALE"}
{"registertime":1515609444511,"userid":"User_5","regionid":"Region_9","gender":"FEMALE"}

启动ksql

cd /opt/programs/confluent_5.0.0
bin/ksql-server-start -daemon etc/ksql/ksql-server.properties

连接ksql

cd /opt/programs/confluent_5.0.0
bin/ksql http://10.205.151.145:8088

    

创建stream和table

stream

根据topic pageviews创建一个stream pageviews_original,value_format为DELIMITED

ksql>CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH \
(kafka_topic='pageviews', value_format='DELIMITED');

table

根据topic users创建一个table users_original,value_format为json

ksql>CREATE TABLE users_original (registertime BIGINT, gender VARCHAR, regionid VARCHAR, userid VARCHAR) WITH \
(kafka_topic='users', value_format='JSON', key = 'userid');

查询数据

ksql> SELECT * FROM USERS_ORIGINAL LIMIT 3;
ksql> SELECT * FROM pageviews_original LIMIT 3;

ps:ksql默认是从kafka最新的数据查询消费的,如果你想从开头查询,则需要在会话上进行设置:SET 'auto.offset.reset' = 'earliest';

持久化查询

持久化查询可以源源不断的把查询出的数据发送到你指定的topic中去,查询的时候在select前面添加create stream关键字即可创建持久化查询。

创建查询

ksql> CREATE STREAM pageviews2 AS SELECT userid FROM pageviews_original;

查询新stream

ksql> SHOW STREAMS;

ps:可以看到新创建了stream PAGEVIEWS2,并且创建了topic PAGEVIEWS2

查询执行任务

ksql> SHOW QUERIES;

ps:可以看到ID为CSAS_PAGEVIEWS2_0的任务在执行,并且有显示执行的语句

消费新数据

cd /opt/programs/confluent_5.0.0/bin
./kafka-console-consumer --bootstrap-server 10.205.151.145:9092 --from-beginning --topic
PAGEVIEWS2

ps:可以看到PAGEVIEWS2 topic里面正是我们通过select筛选出来的数据

终止查询任务

ksql> TERMINATE CSAS_PAGEVIEWS2_0;

欢迎点赞+收藏+转发朋友圈素质三连

文章不错?点个【在看】吧! ????

这篇关于Kafka KSQL实战的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1143079

相关文章

Python实战之屏幕录制功能的实现

《Python实战之屏幕录制功能的实现》屏幕录制,即屏幕捕获,是指将计算机屏幕上的活动记录下来,生成视频文件,本文主要为大家介绍了如何使用Python实现这一功能,希望对大家有所帮助... 目录屏幕录制原理图像捕获音频捕获编码压缩输出保存完整的屏幕录制工具高级功能实时预览增加水印多平台支持屏幕录制原理屏幕

最新Spring Security实战教程之Spring Security安全框架指南

《最新SpringSecurity实战教程之SpringSecurity安全框架指南》SpringSecurity是Spring生态系统中的核心组件,提供认证、授权和防护机制,以保护应用免受各种安... 目录前言什么是Spring Security?同类框架对比Spring Security典型应用场景传统

最新Spring Security实战教程之表单登录定制到处理逻辑的深度改造(最新推荐)

《最新SpringSecurity实战教程之表单登录定制到处理逻辑的深度改造(最新推荐)》本章节介绍了如何通过SpringSecurity实现从配置自定义登录页面、表单登录处理逻辑的配置,并简单模拟... 目录前言改造准备开始登录页改造自定义用户名密码登陆成功失败跳转问题自定义登出前后端分离适配方案结语前言

OpenManus本地部署实战亲测有效完全免费(最新推荐)

《OpenManus本地部署实战亲测有效完全免费(最新推荐)》文章介绍了如何在本地部署OpenManus大语言模型,包括环境搭建、LLM编程接口配置和测试步骤,本文给大家讲解的非常详细,感兴趣的朋友一... 目录1.概况2.环境搭建2.1安装miniconda或者anaconda2.2 LLM编程接口配置2

一文详解kafka开启kerberos认证的完整步骤

《一文详解kafka开启kerberos认证的完整步骤》这篇文章主要为大家详细介绍了kafka开启kerberos认证的完整步骤,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、kerberos安装部署二、准备机器三、Kerberos Server 安装1、配置krb5.con

基于Canvas的Html5多时区动态时钟实战代码

《基于Canvas的Html5多时区动态时钟实战代码》:本文主要介绍了如何使用Canvas在HTML5上实现一个多时区动态时钟的web展示,通过Canvas的API,可以绘制出6个不同城市的时钟,并且这些时钟可以动态转动,每个时钟上都会标注出对应的24小时制时间,详细内容请阅读本文,希望能对你有所帮助...

Spring AI与DeepSeek实战一之快速打造智能对话应用

《SpringAI与DeepSeek实战一之快速打造智能对话应用》本文详细介绍了如何通过SpringAI框架集成DeepSeek大模型,实现普通对话和流式对话功能,步骤包括申请API-KEY、项目搭... 目录一、概述二、申请DeepSeek的API-KEY三、项目搭建3.1. 开发环境要求3.2. mav

Python与DeepSeek的深度融合实战

《Python与DeepSeek的深度融合实战》Python作为最受欢迎的编程语言之一,以其简洁易读的语法、丰富的库和广泛的应用场景,成为了无数开发者的首选,而DeepSeek,作为人工智能领域的新星... 目录一、python与DeepSeek的结合优势二、模型训练1. 数据准备2. 模型架构与参数设置3

Java实战之利用POI生成Excel图表

《Java实战之利用POI生成Excel图表》ApachePOI是Java生态中处理Office文档的核心工具,这篇文章主要为大家详细介绍了如何在Excel中创建折线图,柱状图,饼图等常见图表,需要的... 目录一、环境配置与依赖管理二、数据源准备与工作表构建三、图表生成核心步骤1. 折线图(Line Ch

Java使用Tesseract-OCR实战教程

《Java使用Tesseract-OCR实战教程》本文介绍了如何在Java中使用Tesseract-OCR进行文本提取,包括Tesseract-OCR的安装、中文训练库的配置、依赖库的引入以及具体的代... 目录Java使用Tesseract-OCRTesseract-OCR安装配置中文训练库引入依赖代码实