53、Flink Interval Join 代码示例

2024-06-24 11:20

本文主要是介绍53、Flink Interval Join 代码示例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1、概述

interval Join 默认会根据 keyBy 的条件进行 Join 此时为 Inner Join;

interval Join 算子的水位线会取两条流中水位线的最小值;

interval Join 迟到数据的判定是以 interval Join 算子的水位线为基准;

interval Join 可以分别输出两条流中迟到的数据-[sideOutputLeftLateData,sideOutputRightLateData];

2、代码示例

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.time.Duration;/*** interval Join 默认会根据 keyBy 的条件进行 Join 此时为 Inner Join* interval Join 算子的水位线会取两条流中水位线的最小值;* interval Join 迟到数据的判定是以 interval Join 算子的水位线为基准;* interval Join 可以分别输出两条流中迟到的数据-[sideOutputLeftLateData,sideOutputRightLateData];*/
public class _04_IntervalInnerJoin {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 测试时限制了分区数,生产中需要设置空闲数据源env.setParallelism(2);env.disableOperatorChaining();DataStreamSource<String> inputLeft = env.socketTextStream("localhost", 8888);// 事件时间需要设置水位线策略和时间戳SingleOutputStreamOperator<Tuple2<String, Long>> mapLeft = inputLeft.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String input) throws Exception {String[] fields = input.split(",");return new Tuple2<>(fields[0], Long.parseLong(fields[1]));}});SingleOutputStreamOperator<Tuple2<String, Long>> watermarkLeft = mapLeft.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {@Overridepublic long extractTimestamp(Tuple2<String, Long> input, long l) {return input.f1;}}));DataStreamSource<String> inputRight = env.socketTextStream("localhost", 9999);OutputTag<Tuple2<String, Long>> leftLateTag = new OutputTag<Tuple2<String, Long>>("left-late") {};OutputTag<Tuple2<String, Long>> rightLateTag = new OutputTag<Tuple2<String, Long>>("right-late") {};// 事件时间需要设置水位线策略和时间戳SingleOutputStreamOperator<Tuple2<String, Long>> mapRight = inputRight.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String input) throws Exception {String[] fields = input.split(",");return new Tuple2<>(fields[0], Long.parseLong(fields[1]));}});SingleOutputStreamOperator<Tuple2<String, Long>> watermarkRight = mapRight.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {@Overridepublic long extractTimestamp(Tuple2<String, Long> input, long l) {return input.f1;}}));/*** left-1** a,1718089200000* b,1718089200000* c,1718089200000** interval_join_watermark=No Watermark** right-2** a,1718089201000* b,1718089201000* c,1718089201000** interval_join_watermark=1718089199999** res=:2> (a,1718089200000,1718089201000)* res=:1> (b,1718089200000,1718089201000)* res=:1> (c,1718089200000,1718089201000)** left-3** a,1718089203000* b,1718089203000* c,1718089203000** interval_join_watermark=1718089200999** right-4** a,1718089204000* b,1718089204000* c,1718089204000** interval_join_watermark=1718089202999** res=:2> (a,1718089203000,1718089204000)* res=:1> (b,1718089203000,1718089204000)* res=:1> (c,1718089203000,1718089204000)** left-right-5** a,1718089202000* b,1718089202000* c,1718089202000** left-late=:1> (b,1718089202000)* left-late=:2> (a,1718089202000)* left-late=:1> (c,1718089202000)* right-late=:1> (b,1718089202000)* right-late=:2> (a,1718089202000)* right-late=:1> (c,1718089202000)*/SingleOutputStreamOperator<Tuple3<String, Long, Long>> resStream = watermarkLeft.keyBy(e -> e.f0).intervalJoin(watermarkRight.keyBy(e -> e.f0)).between(Duration.ofSeconds(-1), Duration.ofSeconds(1)).sideOutputLeftLateData(leftLateTag).sideOutputRightLateData(rightLateTag).process(new ProcessJoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, Tuple3<String, Long, Long>>() {@Overridepublic void processElement(Tuple2<String, Long> t1, Tuple2<String, Long> t2, ProcessJoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, Tuple3<String, Long, Long>>.Context context, Collector<Tuple3<String, Long, Long>> collector) throws Exception {collector.collect(new Tuple3<>(t1.f0, t1.f1, t2.f1));}});resStream.print("res=");resStream.getSideOutput(leftLateTag).print("left-late=");resStream.getSideOutput(rightLateTag).print("right-late=");env.execute();}
}

3、测试用例

		  left-1a,1718089200000b,1718089200000c,1718089200000interval_join_watermark=No Watermarkright-2a,1718089201000b,1718089201000c,1718089201000interval_join_watermark=1718089199999res=:2> (a,1718089200000,1718089201000)res=:1> (b,1718089200000,1718089201000)res=:1> (c,1718089200000,1718089201000)left-3a,1718089203000b,1718089203000c,1718089203000interval_join_watermark=1718089200999right-4a,1718089204000b,1718089204000c,1718089204000interval_join_watermark=1718089202999res=:2> (a,1718089203000,1718089204000)res=:1> (b,1718089203000,1718089204000)res=:1> (c,1718089203000,1718089204000)left-right-5a,1718089202000b,1718089202000c,1718089202000left-late=:1> (b,1718089202000)left-late=:2> (a,1718089202000)left-late=:1> (c,1718089202000)right-late=:1> (b,1718089202000)right-late=:2> (a,1718089202000)right-late=:1> (c,1718089202000)

这篇关于53、Flink Interval Join 代码示例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1090005

相关文章

活用c4d官方开发文档查询代码

当你问AI助手比如豆包,如何用python禁止掉xpresso标签时候,它会提示到 这时候要用到两个东西。https://developers.maxon.net/论坛搜索和开发文档 比如这里我就在官方找到正确的id描述 然后我就把参数标签换过来

poj 1258 Agri-Net(最小生成树模板代码)

感觉用这题来当模板更适合。 题意就是给你邻接矩阵求最小生成树啦。~ prim代码:效率很高。172k...0ms。 #include<stdio.h>#include<algorithm>using namespace std;const int MaxN = 101;const int INF = 0x3f3f3f3f;int g[MaxN][MaxN];int n

计算机毕业设计 大学志愿填报系统 Java+SpringBoot+Vue 前后端分离 文档报告 代码讲解 安装调试

🍊作者:计算机编程-吉哥 🍊简介:专业从事JavaWeb程序开发,微信小程序开发,定制化项目、 源码、代码讲解、文档撰写、ppt制作。做自己喜欢的事,生活就是快乐的。 🍊心愿:点赞 👍 收藏 ⭐评论 📝 🍅 文末获取源码联系 👇🏻 精彩专栏推荐订阅 👇🏻 不然下次找不到哟~Java毕业设计项目~热门选题推荐《1000套》 目录 1.技术选型 2.开发工具 3.功能

代码随想录冲冲冲 Day39 动态规划Part7

198. 打家劫舍 dp数组的意义是在第i位的时候偷的最大钱数是多少 如果nums的size为0 总价值当然就是0 如果nums的size为1 总价值是nums[0] 遍历顺序就是从小到大遍历 之后是递推公式 对于dp[i]的最大价值来说有两种可能 1.偷第i个 那么最大价值就是dp[i-2]+nums[i] 2.不偷第i个 那么价值就是dp[i-1] 之后取这两个的最大值就是d

pip-tools:打造可重复、可控的 Python 开发环境,解决依赖关系,让代码更稳定

在 Python 开发中,管理依赖关系是一项繁琐且容易出错的任务。手动更新依赖版本、处理冲突、确保一致性等等,都可能让开发者感到头疼。而 pip-tools 为开发者提供了一套稳定可靠的解决方案。 什么是 pip-tools? pip-tools 是一组命令行工具,旨在简化 Python 依赖关系的管理,确保项目环境的稳定性和可重复性。它主要包含两个核心工具:pip-compile 和 pip

D4代码AC集

贪心问题解决的步骤: (局部贪心能导致全局贪心)    1.确定贪心策略    2.验证贪心策略是否正确 排队接水 #include<bits/stdc++.h>using namespace std;int main(){int w,n,a[32000];cin>>w>>n;for(int i=1;i<=n;i++){cin>>a[i];}sort(a+1,a+n+1);int i=1

zeroclipboard 粘贴板的应用示例, 兼容 Chrome、IE等多浏览器

zeroclipboard单个复制按钮和多个复制按钮的实现方法 最近网站改版想让复制代码功能在多个浏览器上都可以实现,最近看网上不少说我们的代码复制功能不好用的,我们最近将会增加代码高亮等功能,希望大家多多支持我们 zeroclipboard是一个跨浏览器的库类 它利用 Flash 进行复制,所以只要浏览器装有 Flash 就可以运行,而且比 IE 的

html css jquery选项卡 代码练习小项目

在学习 html 和 css jquery 结合使用的时候 做好是能尝试做一些简单的小功能,来提高自己的 逻辑能力,熟悉代码的编写语法 下面分享一段代码 使用html css jquery选项卡 代码练习 <div class="box"><dl class="tab"><dd class="active">手机</dd><dd>家电</dd><dd>服装</dd><dd>数码</dd><dd

生信代码入门:从零开始掌握生物信息学编程技能

少走弯路,高效分析;了解生信云,访问 【生信圆桌x生信专用云服务器】 : www.tebteb.cc 介绍 生物信息学是一个高度跨学科的领域,结合了生物学、计算机科学和统计学。随着高通量测序技术的发展,海量的生物数据需要通过编程来进行处理和分析。因此,掌握生信编程技能,成为每一个生物信息学研究者的必备能力。 生信代码入门,旨在帮助初学者从零开始学习生物信息学中的编程基础。通过学习常用

husky 工具配置代码检查工作流:提交代码至仓库前做代码检查

提示:这篇博客以我前两篇博客作为先修知识,请大家先去看看我前两篇博客 博客指路:前端 ESlint 代码规范及修复代码规范错误-CSDN博客前端 Vue3 项目开发—— ESLint & prettier 配置代码风格-CSDN博客 husky 工具配置代码检查工作流的作用 在工作中,我们经常需要将写好的代码提交至代码仓库 但是由于程序员疏忽而将不规范的代码提交至仓库,显然是不合理的 所