【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本)

2024-01-02 05:04

本文主要是介绍【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引


文章目录

  • Flink 系列文章
  • 一、maven依赖
  • 二、示例:时态表的join(scala版本)
      • 1)、统计需求对应的SQL
      • 2)、Without connnector 实现代码
      • 3)、With CSVConnector 实现代码


本文给以scala的语言给出来Table API 针对时态表的join操作。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

本文需要有kafka的运行环境。

本文更详细的内容可参考文章:

17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)

本专题分为以下几篇文章:
【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表
【flink番外篇】9、Flink Table API 支持的操作示例(2)- 通过Table API 和 SQL 创建视图
【flink番外篇】9、Flink Table API 支持的操作示例(3)- 通过API查询表和使用窗口函数的查询
【flink番外篇】9、Flink Table API 支持的操作示例(4)- Table API 对表的查询、过滤操作
【flink番外篇】9、Flink Table API 支持的操作示例(5)- 表的列操作
【flink番外篇】9、Flink Table API 支持的操作示例(6)- 表的聚合(group by、Distinct、GroupBy/Over Window Aggregation)操作
【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等)
【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本)
【flink番外篇】9、Flink Table API 支持的操作示例(9)- 表的union、unionall、intersect、intersectall、minus、minusall和in的操作
【flink番外篇】9、Flink Table API 支持的操作示例(10)- 表的OrderBy、Offset 和 Fetch、insert操作
【flink番外篇】9、Flink Table API 支持的操作示例(11)- Group Windows(tumbling、sliding和session)操作
【flink番外篇】9、Flink Table API 支持的操作示例(12)- Over Windows(有界和无界的over window)操作
【flink番外篇】9、Flink Table API 支持的操作示例(13)- Row-based(map、flatmap、aggregate、group window aggregate等)操作
【flink番外篇】9、Flink Table API 支持的操作示例(14)- 时态表的join(java版本)
【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版
【flink番外篇】9、Flink Table API 支持的操作示例(2)-完整版

一、maven依赖

本文maven依赖参考文章:【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表 中的依赖,为节省篇幅不再赘述。

二、示例:时态表的join(scala版本)

该示例来源于:https://developer.aliyun.com/article/679659
假设有一张订单表Orders和一张汇率表Rates,那么订单来自于不同的地区,所以支付的币种各不一样,那么假设需要统计每个订单在下单时候Yen币种对应的金额。
在这里插入图片描述

1)、统计需求对应的SQL

SELECT o.currency, o.amount, r.rateo.amount * r.rate AS yen_amount
FROMOrders AS o,LATERAL TABLE (Rates(o.rowtime)) AS r
WHERE r.currency = o.currency

2)、Without connnector 实现代码

object TemporalTableJoinTest {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentval tEnv = TableEnvironment.getTableEnvironment(env)env.setParallelism(1)
// 设置时间类型是 event-time  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)// 构造订单数据val ordersData = new mutable.MutableList[(Long, String, Timestamp)]ordersData.+=((2L, "Euro", new Timestamp(2L)))ordersData.+=((1L, "US Dollar", new Timestamp(3L)))ordersData.+=((50L, "Yen", new Timestamp(4L)))ordersData.+=((3L, "Euro", new Timestamp(5L)))//构造汇率数据val ratesHistoryData = new mutable.MutableList[(String, Long, Timestamp)]ratesHistoryData.+=(("US Dollar", 102L, new Timestamp(1L)))ratesHistoryData.+=(("Euro", 114L, new Timestamp(1L)))ratesHistoryData.+=(("Yen", 1L, new Timestamp(1L)))ratesHistoryData.+=(("Euro", 116L, new Timestamp(5L)))ratesHistoryData.+=(("Euro", 119L, new Timestamp(7L)))// 进行订单表 event-time 的提取val orders = env.fromCollection(ordersData).assignTimestampsAndWatermarks(new OrderTimestampExtractor[Long, String]()).toTable(tEnv, 'amount, 'currency, 'rowtime.rowtime)// 进行汇率表 event-time 的提取val ratesHistory = env.fromCollection(ratesHistoryData).assignTimestampsAndWatermarks(new OrderTimestampExtractor[String, Long]()).toTable(tEnv, 'currency, 'rate, 'rowtime.rowtime)// 注册订单表和汇率表tEnv.registerTable("Orders", orders)tEnv.registerTable("RatesHistory", ratesHistory)val tab = tEnv.scan("RatesHistory");
// 创建TemporalTableFunctionval temporalTableFunction = tab.createTemporalTableFunction('rowtime, 'currency)
//注册TemporalTableFunction
tEnv.registerFunction("Rates",temporalTableFunction)val SQLQuery ="""|SELECT o.currency, o.amount, r.rate,|  o.amount * r.rate AS yen_amount|FROM|  Orders AS o,|  LATERAL TABLE (Rates(o.rowtime)) AS r|WHERE r.currency = o.currency|""".stripMargintEnv.registerTable("TemporalJoinResult", tEnv.SQLQuery(SQLQuery))val result = tEnv.scan("TemporalJoinResult").toAppendStream[Row]// 打印查询结果result.print()env.execute()}}
  • OrderTimestampExtractor 实现如下
import java.SQL.Timestampimport org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.windowing.time.Timeclass OrderTimestampExtractor[T1, T2]extends BoundedOutOfOrdernessTimestampExtractor[(T1, T2, Timestamp)](Time.seconds(10)) {override def extractTimestamp(element: (T1, T2, Timestamp)): Long = {element._3.getTime}
}

3)、With CSVConnector 实现代码

在实际的生产开发中,都需要实际的Connector的定义,下面我们以CSV格式的Connector定义来开发Temporal Table JOIN Demo。

1、genEventRatesHistorySource

def genEventRatesHistorySource: CsvTableSource = {val csvRecords = Seq("ts#currency#rate","1#US Dollar#102","1#Euro#114","1#Yen#1","3#Euro#116","5#Euro#119","7#Pounds#108")// 测试数据写入临时文件val tempFilePath =FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), "csv_source_rate", "tmp")// 创建Source connectornew CsvTableSource(tempFilePath,Array("ts","currency","rate"),Array(Types.LONG,Types.STRING,Types.LONG),fieldDelim = "#",rowDelim = CommonUtils.line,ignoreFirstLine = true,ignoreComments = "%")}

2、genRatesOrderSource


def genRatesOrderSource: CsvTableSource = {val csvRecords = Seq("ts#currency#amount","2#Euro#10","4#Euro#10")// 测试数据写入临时文件val tempFilePath =FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), "csv_source_order", "tmp")// 创建Source connectornew CsvTableSource(tempFilePath,Array("ts","currency", "amount"),Array(Types.LONG,Types.STRING,Types.LONG),fieldDelim = "#",rowDelim = CommonUtils.line,ignoreFirstLine = true,ignoreComments = "%")}

3、主程序

import java.io.Fileimport org.apache.flink.api.common.typeinfo.{TypeInformation, Types}
import org.apache.flink.book.utils.{CommonUtils, FileUtils}
import org.apache.flink.table.sinks.{CsvTableSink, TableSink}
import org.apache.flink.table.sources.CsvTableSource
import org.apache.flink.types.Rowobject CsvTableSourceUtils {def genWordCountSource: CsvTableSource = {val csvRecords = Seq("words","Hello Flink","Hi, Apache Flink","Apache FlinkBook")// 测试数据写入临时文件val tempFilePath =FileUtils.writeToTempFile(csvRecords.mkString("$"), "csv_source_", "tmp")// 创建Source connectornew CsvTableSource(tempFilePath,Array("words"),Array(Types.STRING),fieldDelim = "#",rowDelim = "$",ignoreFirstLine = true,ignoreComments = "%")}def genRatesHistorySource: CsvTableSource = {val csvRecords = Seq("rowtime ,currency   ,rate","09:00:00   ,US Dollar  , 102","09:00:00   ,Euro       , 114","09:00:00  ,Yen        ,   1","10:45:00   ,Euro       , 116","11:15:00   ,Euro       , 119","11:49:00   ,Pounds     , 108")// 测试数据写入临时文件val tempFilePath =FileUtils.writeToTempFile(csvRecords.mkString("$"), "csv_source_", "tmp")// 创建Source connectornew CsvTableSource(tempFilePath,Array("rowtime","currency","rate"),Array(Types.STRING,Types.STRING,Types.STRING),fieldDelim = ",",rowDelim = "$",ignoreFirstLine = true,ignoreComments = "%")}def genEventRatesHistorySource: CsvTableSource = {val csvRecords = Seq("ts#currency#rate","1#US Dollar#102","1#Euro#114","1#Yen#1","3#Euro#116","5#Euro#119","7#Pounds#108")// 测试数据写入临时文件val tempFilePath =FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), "csv_source_rate", "tmp")// 创建Source connectornew CsvTableSource(tempFilePath,Array("ts","currency","rate"),Array(Types.LONG,Types.STRING,Types.LONG),fieldDelim = "#",rowDelim = CommonUtils.line,ignoreFirstLine = true,ignoreComments = "%")}def genRatesOrderSource: CsvTableSource = {val csvRecords = Seq("ts#currency#amount","2#Euro#10","4#Euro#10")// 测试数据写入临时文件val tempFilePath =FileUtils.writeToTempFile(csvRecords.mkString(CommonUtils.line), "csv_source_order", "tmp")// 创建Source connectornew CsvTableSource(tempFilePath,Array("ts","currency", "amount"),Array(Types.LONG,Types.STRING,Types.LONG),fieldDelim = "#",rowDelim = CommonUtils.line,ignoreFirstLine = true,ignoreComments = "%")}/*** Example:* genCsvSink(*   Array[String]("word", "count"),*   Array[TypeInformation[_] ](Types.STRING, Types.LONG))*/def genCsvSink(fieldNames: Array[String], fieldTypes: Array[TypeInformation[_]]): TableSink[Row] = {val tempFile = File.createTempFile("csv_sink_", "tem")if (tempFile.exists()) {tempFile.delete()}new CsvTableSink(tempFile.getAbsolutePath).configure(fieldNames, fieldTypes)}}

4、运行结果
在这里插入图片描述

以上,本文给以scala的语言给出来Table API 针对时态表的join操作。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文更详细的内容可参考文章:

17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)

本专题分为以下几篇文章:
【flink番外篇】9、Flink Table API 支持的操作示例(1)-通过Table API和SQL创建表
【flink番外篇】9、Flink Table API 支持的操作示例(2)- 通过Table API 和 SQL 创建视图
【flink番外篇】9、Flink Table API 支持的操作示例(3)- 通过API查询表和使用窗口函数的查询
【flink番外篇】9、Flink Table API 支持的操作示例(4)- Table API 对表的查询、过滤操作
【flink番外篇】9、Flink Table API 支持的操作示例(5)- 表的列操作
【flink番外篇】9、Flink Table API 支持的操作示例(6)- 表的聚合(group by、Distinct、GroupBy/Over Window Aggregation)操作
【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(内联接、外联接以及联接自定义函数等)
【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本)
【flink番外篇】9、Flink Table API 支持的操作示例(9)- 表的union、unionall、intersect、intersectall、minus、minusall和in的操作
【flink番外篇】9、Flink Table API 支持的操作示例(10)- 表的OrderBy、Offset 和 Fetch、insert操作
【flink番外篇】9、Flink Table API 支持的操作示例(11)- Group Windows(tumbling、sliding和session)操作
【flink番外篇】9、Flink Table API 支持的操作示例(12)- Over Windows(有界和无界的over window)操作
【flink番外篇】9、Flink Table API 支持的操作示例(13)- Row-based(map、flatmap、aggregate、group window aggregate等)操作
【flink番外篇】9、Flink Table API 支持的操作示例(14)- 时态表的join(java版本)
【flink番外篇】9、Flink Table API 支持的操作示例(1)-完整版
【flink番外篇】9、Flink Table API 支持的操作示例(2)-完整版

这篇关于【flink番外篇】9、Flink Table API 支持的操作示例(8)- 时态表的join(scala版本)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/561410

相关文章

Android实现任意版本设置默认的锁屏壁纸和桌面壁纸(两张壁纸可不一致)

客户有些需求需要设置默认壁纸和锁屏壁纸  在默认情况下 这两个壁纸是相同的  如果需要默认的锁屏壁纸和桌面壁纸不一样 需要额外修改 Android13实现 替换默认桌面壁纸: 将图片文件替换frameworks/base/core/res/res/drawable-nodpi/default_wallpaper.*  (注意不能是bmp格式) 替换默认锁屏壁纸: 将图片资源放入vendo

zeroclipboard 粘贴板的应用示例, 兼容 Chrome、IE等多浏览器

zeroclipboard单个复制按钮和多个复制按钮的实现方法 最近网站改版想让复制代码功能在多个浏览器上都可以实现,最近看网上不少说我们的代码复制功能不好用的,我们最近将会增加代码高亮等功能,希望大家多多支持我们 zeroclipboard是一个跨浏览器的库类 它利用 Flash 进行复制,所以只要浏览器装有 Flash 就可以运行,而且比 IE 的

【LabVIEW学习篇 - 21】:DLL与API的调用

文章目录 DLL与API调用DLLAPIDLL的调用 DLL与API调用 LabVIEW虽然已经足够强大,但不同的语言在不同领域都有着自己的优势,为了强强联合,LabVIEW提供了强大的外部程序接口能力,包括DLL、CIN(C语言接口)、ActiveX、.NET、MATLAB等等。通过DLL可以使用户很方便地调用C、C++、C#、VB等编程语言写的程序以及windows自带的大

动手学深度学习【数据操作+数据预处理】

import osos.makedirs(os.path.join('.', 'data'), exist_ok=True)data_file = os.path.join('.', 'data', 'house_tiny.csv')with open(data_file, 'w') as f:f.write('NumRooms,Alley,Price\n') # 列名f.write('NA

如何更优雅地对接第三方API

如何更优雅地对接第三方API 本文所有示例完整代码地址:https://github.com/yu-linfeng/BlogRepositories/tree/master/repositories/third 我们在日常开发过程中,有不少场景会对接第三方的API,例如第三方账号登录,第三方服务等等。第三方服务会提供API或者SDK,我依稀记得早些年Maven还没那么广泛使用,通常要对接第三方

线程的四种操作

所属专栏:Java学习        1. 线程的开启 start和run的区别: run:描述了线程要执行的任务,也可以称为线程的入口 start:调用系统函数,真正的在系统内核中创建线程(创建PCB,加入到链表中),此处的start会根据不同的系统,分别调用不同的api,创建好之后的线程,再单独去执行run(所以说,start的本质是调用系统api,系统的api

Java IO 操作——个人理解

之前一直Java的IO操作一知半解。今天看到一个便文章觉得很有道理( 原文章),记录一下。 首先,理解Java的IO操作到底操作的什么内容,过程又是怎么样子。          数据来源的操作: 来源有文件,网络数据。使用File类和Sockets等。这里操作的是数据本身,1,0结构。    File file = new File("path");   字

MySQL——表操作

目录 一、创建表 二、查看表 2.1 查看表中某成员的数据 2.2 查看整个表中的表成员 2.3 查看创建表时的句柄 三、修改表 alter 3.1 重命名 rename 3.2 新增一列 add 3.3 更改列属性 modify 3.4 更改列名称 change 3.5 删除某列 上一篇博客介绍了库的操作,接下来来看一下表的相关操作。 一、创建表 create

基于SpringBoot的宠物服务系统+uniapp小程序+LW参考示例

系列文章目录 1.基于SSM的洗衣房管理系统+原生微信小程序+LW参考示例 2.基于SpringBoot的宠物摄影网站管理系统+LW参考示例 3.基于SpringBoot+Vue的企业人事管理系统+LW参考示例 4.基于SSM的高校实验室管理系统+LW参考示例 5.基于SpringBoot的二手数码回收系统+原生微信小程序+LW参考示例 6.基于SSM的民宿预订管理系统+LW参考示例 7.基于

Spring Roo 实站( 一 )部署安装 第一个示例程序

转自:http://blog.csdn.net/jun55xiu/article/details/9380213 一:安装 注:可以参与官网spring-roo: static.springsource.org/spring-roo/reference/html/intro.html#intro-exploring-sampleROO_OPTS http://stati