53、Flink Interval Join 代码示例

2024-06-24 11:20

本文主要是介绍53、Flink Interval Join 代码示例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1、概述

interval Join 默认会根据 keyBy 的条件进行 Join 此时为 Inner Join;

interval Join 算子的水位线会取两条流中水位线的最小值;

interval Join 迟到数据的判定是以 interval Join 算子的水位线为基准;

interval Join 可以分别输出两条流中迟到的数据-[sideOutputLeftLateData,sideOutputRightLateData];

2、代码示例

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.time.Duration;/*** interval Join 默认会根据 keyBy 的条件进行 Join 此时为 Inner Join* interval Join 算子的水位线会取两条流中水位线的最小值;* interval Join 迟到数据的判定是以 interval Join 算子的水位线为基准;* interval Join 可以分别输出两条流中迟到的数据-[sideOutputLeftLateData,sideOutputRightLateData];*/
public class _04_IntervalInnerJoin {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 测试时限制了分区数,生产中需要设置空闲数据源env.setParallelism(2);env.disableOperatorChaining();DataStreamSource<String> inputLeft = env.socketTextStream("localhost", 8888);// 事件时间需要设置水位线策略和时间戳SingleOutputStreamOperator<Tuple2<String, Long>> mapLeft = inputLeft.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String input) throws Exception {String[] fields = input.split(",");return new Tuple2<>(fields[0], Long.parseLong(fields[1]));}});SingleOutputStreamOperator<Tuple2<String, Long>> watermarkLeft = mapLeft.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {@Overridepublic long extractTimestamp(Tuple2<String, Long> input, long l) {return input.f1;}}));DataStreamSource<String> inputRight = env.socketTextStream("localhost", 9999);OutputTag<Tuple2<String, Long>> leftLateTag = new OutputTag<Tuple2<String, Long>>("left-late") {};OutputTag<Tuple2<String, Long>> rightLateTag = new OutputTag<Tuple2<String, Long>>("right-late") {};// 事件时间需要设置水位线策略和时间戳SingleOutputStreamOperator<Tuple2<String, Long>> mapRight = inputRight.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String input) throws Exception {String[] fields = input.split(",");return new Tuple2<>(fields[0], Long.parseLong(fields[1]));}});SingleOutputStreamOperator<Tuple2<String, Long>> watermarkRight = mapRight.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {@Overridepublic long extractTimestamp(Tuple2<String, Long> input, long l) {return input.f1;}}));/*** left-1** a,1718089200000* b,1718089200000* c,1718089200000** interval_join_watermark=No Watermark** right-2** a,1718089201000* b,1718089201000* c,1718089201000** interval_join_watermark=1718089199999** res=:2> (a,1718089200000,1718089201000)* res=:1> (b,1718089200000,1718089201000)* res=:1> (c,1718089200000,1718089201000)** left-3** a,1718089203000* b,1718089203000* c,1718089203000** interval_join_watermark=1718089200999** right-4** a,1718089204000* b,1718089204000* c,1718089204000** interval_join_watermark=1718089202999** res=:2> (a,1718089203000,1718089204000)* res=:1> (b,1718089203000,1718089204000)* res=:1> (c,1718089203000,1718089204000)** left-right-5** a,1718089202000* b,1718089202000* c,1718089202000** left-late=:1> (b,1718089202000)* left-late=:2> (a,1718089202000)* left-late=:1> (c,1718089202000)* right-late=:1> (b,1718089202000)* right-late=:2> (a,1718089202000)* right-late=:1> (c,1718089202000)*/SingleOutputStreamOperator<Tuple3<String, Long, Long>> resStream = watermarkLeft.keyBy(e -> e.f0).intervalJoin(watermarkRight.keyBy(e -> e.f0)).between(Duration.ofSeconds(-1), Duration.ofSeconds(1)).sideOutputLeftLateData(leftLateTag).sideOutputRightLateData(rightLateTag).process(new ProcessJoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, Tuple3<String, Long, Long>>() {@Overridepublic void processElement(Tuple2<String, Long> t1, Tuple2<String, Long> t2, ProcessJoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, Tuple3<String, Long, Long>>.Context context, Collector<Tuple3<String, Long, Long>> collector) throws Exception {collector.collect(new Tuple3<>(t1.f0, t1.f1, t2.f1));}});resStream.print("res=");resStream.getSideOutput(leftLateTag).print("left-late=");resStream.getSideOutput(rightLateTag).print("right-late=");env.execute();}
}

3、测试用例

		  left-1a,1718089200000b,1718089200000c,1718089200000interval_join_watermark=No Watermarkright-2a,1718089201000b,1718089201000c,1718089201000interval_join_watermark=1718089199999res=:2> (a,1718089200000,1718089201000)res=:1> (b,1718089200000,1718089201000)res=:1> (c,1718089200000,1718089201000)left-3a,1718089203000b,1718089203000c,1718089203000interval_join_watermark=1718089200999right-4a,1718089204000b,1718089204000c,1718089204000interval_join_watermark=1718089202999res=:2> (a,1718089203000,1718089204000)res=:1> (b,1718089203000,1718089204000)res=:1> (c,1718089203000,1718089204000)left-right-5a,1718089202000b,1718089202000c,1718089202000left-late=:1> (b,1718089202000)left-late=:2> (a,1718089202000)left-late=:1> (c,1718089202000)right-late=:1> (b,1718089202000)right-late=:2> (a,1718089202000)right-late=:1> (c,1718089202000)

这篇关于53、Flink Interval Join 代码示例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1090005

相关文章

MySQL 8 中的一个强大功能 JSON_TABLE示例详解

《MySQL8中的一个强大功能JSON_TABLE示例详解》JSON_TABLE是MySQL8中引入的一个强大功能,它允许用户将JSON数据转换为关系表格式,从而可以更方便地在SQL查询中处理J... 目录基本语法示例示例查询解释应用场景不适用场景1. ‌jsON 数据结构过于复杂或动态变化‌2. ‌性能要

Python实现MQTT通信的示例代码

《Python实现MQTT通信的示例代码》本文主要介绍了Python实现MQTT通信的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录1. 安装paho-mqtt库‌2. 搭建MQTT代理服务器(Broker)‌‌3. pytho

Java中Arrays类和Collections类常用方法示例详解

《Java中Arrays类和Collections类常用方法示例详解》本文总结了Java中Arrays和Collections类的常用方法,涵盖数组填充、排序、搜索、复制、列表转换等操作,帮助开发者高... 目录Arrays.fill()相关用法Arrays.toString()Arrays.sort()A

MySQL进行数据库审计的详细步骤和示例代码

《MySQL进行数据库审计的详细步骤和示例代码》数据库审计通过触发器、内置功能及第三方工具记录和监控数据库活动,确保安全、完整与合规,Java代码实现自动化日志记录,整合分析系统提升监控效率,本文给大... 目录一、数据库审计的基本概念二、使用触发器进行数据库审计1. 创建审计表2. 创建触发器三、Java

MySQL 主从复制部署及验证(示例详解)

《MySQL主从复制部署及验证(示例详解)》本文介绍MySQL主从复制部署步骤及学校管理数据库创建脚本,包含表结构设计、示例数据插入和查询语句,用于验证主从同步功能,感兴趣的朋友一起看看吧... 目录mysql 主从复制部署指南部署步骤1.环境准备2. 主服务器配置3. 创建复制用户4. 获取主服务器状态5

Spring Boot中的路径变量示例详解

《SpringBoot中的路径变量示例详解》SpringBoot中PathVariable通过@PathVariable注解实现URL参数与方法参数绑定,支持多参数接收、类型转换、可选参数、默认值及... 目录一. 基本用法与参数映射1.路径定义2.参数绑定&nhttp://www.chinasem.cnbs

Spring StateMachine实现状态机使用示例详解

《SpringStateMachine实现状态机使用示例详解》本文介绍SpringStateMachine实现状态机的步骤,包括依赖导入、枚举定义、状态转移规则配置、上下文管理及服务调用示例,重点解... 目录什么是状态机使用示例什么是状态机状态机是计算机科学中的​​核心建模工具​​,用于描述对象在其生命

PostgreSQL中rank()窗口函数实用指南与示例

《PostgreSQL中rank()窗口函数实用指南与示例》在数据分析和数据库管理中,经常需要对数据进行排名操作,PostgreSQL提供了强大的窗口函数rank(),可以方便地对结果集中的行进行排名... 目录一、rank()函数简介二、基础示例:部门内员工薪资排名示例数据排名查询三、高级应用示例1. 每

使用Python删除Excel中的行列和单元格示例详解

《使用Python删除Excel中的行列和单元格示例详解》在处理Excel数据时,删除不需要的行、列或单元格是一项常见且必要的操作,本文将使用Python脚本实现对Excel表格的高效自动化处理,感兴... 目录开发环境准备使用 python 删除 Excphpel 表格中的行删除特定行删除空白行删除含指定

SpringBoot线程池配置使用示例详解

《SpringBoot线程池配置使用示例详解》SpringBoot集成@Async注解,支持线程池参数配置(核心数、队列容量、拒绝策略等)及生命周期管理,结合监控与任务装饰器,提升异步处理效率与系统... 目录一、核心特性二、添加依赖三、参数详解四、配置线程池五、应用实践代码说明拒绝策略(Rejected