袋鼠云的FlinkSQL插件开发

2023-10-25 07:36
文章标签 开发 插件 flinksql 袋鼠

本文主要是介绍袋鼠云的FlinkSQL插件开发,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

袋鼠云是什么

袋鼠云是一家大数据产品供应商。他开发了一个产品叫做 flinkStreamSQL。这东西是以 Flink 为基础开发的使用 SQL 来写流式计算逻辑的产品。

FlinkStreamSQL 的开源地址

什么是插件

这里所说的插件是可以理解为自定义的语法。例如下面的 SQL:

select fact.shop_id,shop.shop_name
from fact_stream as fact
left join dim_shop as shop
on fact.shop_id = shop.shopid

dim_shop 可能是一个 redis 为实体的 Table ,这袋鼠已经为我们实现了,现在我们可能从 HTTP 的接口拿到数据,此时的话,我们可以自定义一个 HTTP Table ,然后上面的代码不用修改。

整体的流程

编写、执行 FlinkStreamSQL 的流程如下所示:


CREATE TABLE source(colName colType,...function(colNameX) AS aliasName,WATERMARK FOR colName AS withOffset( colName , delayTime ))WITH(type ='kafka09',kafka.bootstrap.servers ='ip:port,ip:port...',kafka.zookeeper.quorum ='ip:port,ip:port/zkparent',kafka.auto.offset.reset ='latest',kafka.topic ='topicName',parallelism ='parllNum',--timezone='America/Los_Angeles',timezone='Asia/Shanghai',sourcedatatype ='json' #可不设置);CREATE TABLE sink(colName colType,...function(colNameX) AS aliasName,WATERMARK FOR colName AS withOffset( colName , delayTime ))WITH(type ='kafka09',kafka.bootstrap.servers ='ip:port,ip:port...',kafka.zookeeper.quorum ='ip:port,ip:port/zkparent',kafka.auto.offset.reset ='latest',kafka.topic ='topicName',parallelism ='parllNum',--timezone='America/Los_Angeles',timezone='Asia/Shanghai',sourcedatatype ='json' #可不设置);CREATE TABLE dim (columnFamily:columnName type as alias,...PRIMARY KEY(keyInfo),PERIOD FOR SYSTEM_TIME)WITH(type ='hbase',zookeeperQuorum ='ip:port',zookeeperParent ='/hbase',tableName ='tableNamae',cache ='LRU',cacheSize ='10000',cacheTTLMs ='60000',parallelism ='1',partitionedJoin='false');insert into sink(
...
)
select source.*,dim.*
from source 
left join dim 
where 

流程图
如上面画的,会将 DDL 变成 source、sink、side-put 的算子。简单的讲,执行的逻辑是
; 为分割符号,分割开 sql 语句,然后使用正则表达式识别 DDL 语句、DML 语句。
其中 DDL 语句中符合(?i)^PERIOD\s+FOR\s+SYSTEM_TIME$ 则认为是 side-input ,side-input 会别解析为异步I/O
算子。如果 DDL 语句中没有则解析为 source 算子,如果 DDL 表在 DML 中在 insert into 后面,则为 sink 表。

知道了这些之后,我们可以自己定义一种 DDL 语句,如下:

 CREATE TABLE dim (columnFamily:columnName type as alias,...PRIMARY KEY(keyInfo),PERIOD FOR SYSTEM_TIME)WITH(type ='http',url ='http://....',...);insert into sink(
...
)
select source.*,dim.*
from source 
left join dim 
where 

其他的都不变,我现在的认为是实现一些接口,让 FlinkStreamSql 能通过 SQL 找到对应算子的实现。

关键的接口

DDL 语句解析的相关接口

AbstractTableInfo|--AbstractSideTableInfo|--AbstractTargetTableInfo|--AbstractSourceTableInfoAbstractTableParser|--AbstractSideTableParser|--ClickhouseSideParser|--AbstractSourceParser

袋鼠平台是如何找到对应的 AbstractSideTableInfo 的呢?其实靠 class 的命名规则,例如,hbase side table
AbstractSideTableInfo 的实现类是 HbaseSideParser。with 中的属性 type = hbase ,然后 table 的 DDL 中有 side table 的关键配置,然后所以贫出来的 class 文件文字是 HbaseSideParser ,namespace 是 com.dtstack.flink.sql.{类型}.{type}.table, 所以全程也出来了。

转化为算子的接口

BaseAsyncReqRow 此接口是继承了 RichAsyncFunction ,重要的方法有 handleAsyncInvoke 里面实现了异步调用外表接口的东西。

我实现的是的 Http side table ,使用的是 http 异步的请求接口:

<dependency><groupId>org.asynchttpclient</groupId><artifactId>async-http-client</artifactId><version>2.12.3</version>
</dependency>

使用的接口是:

AsyncHttpClient client = Dsl.asyncHttpClient();
BoundRequestBuilder builder = client.preparePost(url)
.setHeader("Content-type","application/json")
.setBody(json.getBytes());Request r = builder.build();
ListenableFuture whenResponse = client.executeRequest(build);
whenResponse.addListener(new Runnable(){public void run(){// 编写异步的回调函数}}
)

这篇关于袋鼠云的FlinkSQL插件开发的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/281002

相关文章

Python+wxPython开发一个文件属性比对工具

《Python+wxPython开发一个文件属性比对工具》在日常的文件管理工作中,我们经常会遇到同一个文件存在多个版本,或者需要验证备份文件与源文件是否一致,下面我们就来看看如何使用wxPython模... 目录引言项目背景与需求应用场景核心需求运行结果技术选型程序设计界面布局核心功能模块关键代码解析文件大

C++多线程开发环境配置方法

《C++多线程开发环境配置方法》文章详细介绍了如何在Windows上安装MinGW-w64和VSCode,并配置环境变量和编译任务,使用VSCode创建一个C++多线程测试项目,并通过配置tasks.... 目录下载安装 MinGW-w64下载安装VS code创建测试项目配置编译任务创建 tasks.js

VS Code中的Python代码格式化插件示例讲解

《VSCode中的Python代码格式化插件示例讲解》在Java开发过程中,代码的规范性和可读性至关重要,一个团队中如果每个开发者的代码风格各异,会给代码的维护、审查和协作带来极大的困难,这篇文章主... 目录前言如何安装与配置使用建议与技巧如何选择总结前言在 VS Code 中,有几款非常出色的 pyt

一文详解Python如何开发游戏

《一文详解Python如何开发游戏》Python是一种非常流行的编程语言,也可以用来开发游戏模组,:本文主要介绍Python如何开发游戏的相关资料,文中通过代码介绍的非常详细,需要的朋友可以参考下... 目录一、python简介二、Python 开发 2D 游戏的优劣势优势缺点三、Python 开发 3D

基于Python开发Windows自动更新控制工具

《基于Python开发Windows自动更新控制工具》在当今数字化时代,操作系统更新已成为计算机维护的重要组成部分,本文介绍一款基于Python和PyQt5的Windows自动更新控制工具,有需要的可... 目录设计原理与技术实现系统架构概述数学建模工具界面完整代码实现技术深度分析多层级控制理论服务层控制注

Java中的分布式系统开发基于 Zookeeper 与 Dubbo 的应用案例解析

《Java中的分布式系统开发基于Zookeeper与Dubbo的应用案例解析》本文将通过实际案例,带你走进基于Zookeeper与Dubbo的分布式系统开发,本文通过实例代码给大家介绍的非常详... 目录Java 中的分布式系统开发基于 Zookeeper 与 Dubbo 的应用案例一、分布式系统中的挑战二

基于Go语言开发一个 IP 归属地查询接口工具

《基于Go语言开发一个IP归属地查询接口工具》在日常开发中,IP地址归属地查询是一个常见需求,本文将带大家使用Go语言快速开发一个IP归属地查询接口服务,有需要的小伙伴可以了解下... 目录功能目标技术栈项目结构核心代码(main.go)使用方法扩展功能总结在日常开发中,IP 地址归属地查询是一个常见需求:

基于 Cursor 开发 Spring Boot 项目详细攻略

《基于Cursor开发SpringBoot项目详细攻略》Cursor是集成GPT4、Claude3.5等LLM的VSCode类AI编程工具,支持SpringBoot项目开发全流程,涵盖环境配... 目录cursor是什么?基于 Cursor 开发 Spring Boot 项目完整指南1. 环境准备2. 创建

SpringBoot 多环境开发实战(从配置、管理与控制)

《SpringBoot多环境开发实战(从配置、管理与控制)》本文详解SpringBoot多环境配置,涵盖单文件YAML、多文件模式、MavenProfile分组及激活策略,通过优先级控制灵活切换环境... 目录一、多环境开发基础(单文件 YAML 版)(一)配置原理与优势(二)实操示例二、多环境开发多文件版

使用docker搭建嵌入式Linux开发环境

《使用docker搭建嵌入式Linux开发环境》本文主要介绍了使用docker搭建嵌入式Linux开发环境,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面... 目录1、前言2、安装docker3、编写容器管理脚本4、创建容器1、前言在日常开发全志、rk等不同