51、Flink 窗口 Join 之滑动窗口事件时间 Join 代码示例

2024-06-20 11:20

本文主要是介绍51、Flink 窗口 Join 之滑动窗口事件时间 Join 代码示例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1、概述

窗口中的水位线取的是两条流中的最小值;

一个流中的元素如果没有与另一个流中的元素组合起来,它就不会被输出;

2、代码示例

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;import java.time.Duration;/*** 注意:* <p>* 窗口中的水位线取的是两条流中的最小值;* 一个流中的元素如果没有与另一个流中的元素组合起来,它就不会被输出;*/
public class _02_WindowSlidingEventJoin {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 测试时限制了分区数,生产中需要设置空闲数据源env.setParallelism(2);env.disableOperatorChaining();DataStreamSource<String> inputLeft = env.socketTextStream("localhost", 8888);// 事件时间需要设置水位线策略和时间戳SingleOutputStreamOperator<Tuple2<String, Long>> mapLeft = inputLeft.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String input) throws Exception {String[] fields = input.split(",");return new Tuple2<>(fields[0], Long.parseLong(fields[1]));}});SingleOutputStreamOperator<Tuple2<String, Long>> watermarkLeft = mapLeft.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {@Overridepublic long extractTimestamp(Tuple2<String, Long> input, long l) {return input.f1;}}));DataStreamSource<String> inputRight = env.socketTextStream("localhost", 9999);// 事件时间需要设置水位线策略和时间戳SingleOutputStreamOperator<Tuple2<String, Long>> mapRight = inputRight.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String input) throws Exception {String[] fields = input.split(",");return new Tuple2<>(fields[0], Long.parseLong(fields[1]));}});SingleOutputStreamOperator<Tuple2<String, Long>> watermarkRight = mapRight.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {@Overridepublic long extractTimestamp(Tuple2<String, Long> input, long l) {return input.f1;}}));watermarkLeft.join(watermarkRight).where(e -> e.f0).equalTo(e -> e.f0).window(SlidingEventTimeWindows.of(Duration.ofSeconds(6), Duration.ofSeconds(3))).apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, Tuple3<String, Long, Long>>() {@Overridepublic Tuple3<String, Long, Long> join(Tuple2<String, Long> t1, Tuple2<String, Long> t2) throws Exception {return new Tuple3<>(t1.f0, t1.f1, t2.f1);}}).print();env.execute();}
}

3、测试用例

		  left-1a,1718089200000b,1718089200000c,1718089200000left-watermark=1718089199999window-watermark=no_watermarkright-2a,1718089201000b,1718089201000c,1718089201000right-watermark=1718089200999window-watermark=1718089199999[两条流中最小的]left-3a,1718089204000b,1718089204000c,1718089204000left-watermark=1718089203999window-watermark=1718089200999right-4a,1718089205000b,1718089205000c,1718089205000right-watermark=1718089204999window-watermark=1718089203999res:[1718089197000~1718089203000]2> (a,1718089200000,1718089201000)1> (b,1718089200000,1718089201000)1> (c,1718089200000,1718089201000)left-5a,1718089209000b,1718089209000c,1718089209000left-watermark=1718089208999window-watermark=1718089204999right-6a,1718089209000b,1718089209000c,1718089209000right-watermark=1718089208999window-watermark=1718089208999res[1718089200000~1718089206000]2> (a,1718089200000,1718089201000)2> (a,1718089200000,1718089205000)2> (a,1718089204000,1718089201000)2> (a,1718089204000,1718089205000)1> (b,1718089200000,1718089201000)1> (b,1718089200000,1718089205000)1> (b,1718089204000,1718089201000)1> (b,1718089204000,1718089205000)1> (c,1718089200000,1718089201000)1> (c,1718089200000,1718089205000)1> (c,1718089204000,1718089201000)1> (c,1718089204000,1718089205000)res[1718089203000~1718089209000]2> (a,1718089204000,1718089205000)1> (b,1718089204000,1718089205000)1> (c,1718089204000,1718089205000)

这篇关于51、Flink 窗口 Join 之滑动窗口事件时间 Join 代码示例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1077986

相关文章

MySQL 8 中的一个强大功能 JSON_TABLE示例详解

《MySQL8中的一个强大功能JSON_TABLE示例详解》JSON_TABLE是MySQL8中引入的一个强大功能,它允许用户将JSON数据转换为关系表格式,从而可以更方便地在SQL查询中处理J... 目录基本语法示例示例查询解释应用场景不适用场景1. ‌jsON 数据结构过于复杂或动态变化‌2. ‌性能要

Python实现MQTT通信的示例代码

《Python实现MQTT通信的示例代码》本文主要介绍了Python实现MQTT通信的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录1. 安装paho-mqtt库‌2. 搭建MQTT代理服务器(Broker)‌‌3. pytho

Java中Arrays类和Collections类常用方法示例详解

《Java中Arrays类和Collections类常用方法示例详解》本文总结了Java中Arrays和Collections类的常用方法,涵盖数组填充、排序、搜索、复制、列表转换等操作,帮助开发者高... 目录Arrays.fill()相关用法Arrays.toString()Arrays.sort()A

MySQL进行数据库审计的详细步骤和示例代码

《MySQL进行数据库审计的详细步骤和示例代码》数据库审计通过触发器、内置功能及第三方工具记录和监控数据库活动,确保安全、完整与合规,Java代码实现自动化日志记录,整合分析系统提升监控效率,本文给大... 目录一、数据库审计的基本概念二、使用触发器进行数据库审计1. 创建审计表2. 创建触发器三、Java

MySQL 主从复制部署及验证(示例详解)

《MySQL主从复制部署及验证(示例详解)》本文介绍MySQL主从复制部署步骤及学校管理数据库创建脚本,包含表结构设计、示例数据插入和查询语句,用于验证主从同步功能,感兴趣的朋友一起看看吧... 目录mysql 主从复制部署指南部署步骤1.环境准备2. 主服务器配置3. 创建复制用户4. 获取主服务器状态5

Spring Boot中的路径变量示例详解

《SpringBoot中的路径变量示例详解》SpringBoot中PathVariable通过@PathVariable注解实现URL参数与方法参数绑定,支持多参数接收、类型转换、可选参数、默认值及... 目录一. 基本用法与参数映射1.路径定义2.参数绑定&nhttp://www.chinasem.cnbs

Spring StateMachine实现状态机使用示例详解

《SpringStateMachine实现状态机使用示例详解》本文介绍SpringStateMachine实现状态机的步骤,包括依赖导入、枚举定义、状态转移规则配置、上下文管理及服务调用示例,重点解... 目录什么是状态机使用示例什么是状态机状态机是计算机科学中的​​核心建模工具​​,用于描述对象在其生命

PostgreSQL中rank()窗口函数实用指南与示例

《PostgreSQL中rank()窗口函数实用指南与示例》在数据分析和数据库管理中,经常需要对数据进行排名操作,PostgreSQL提供了强大的窗口函数rank(),可以方便地对结果集中的行进行排名... 目录一、rank()函数简介二、基础示例:部门内员工薪资排名示例数据排名查询三、高级应用示例1. 每

使用Python删除Excel中的行列和单元格示例详解

《使用Python删除Excel中的行列和单元格示例详解》在处理Excel数据时,删除不需要的行、列或单元格是一项常见且必要的操作,本文将使用Python脚本实现对Excel表格的高效自动化处理,感兴... 目录开发环境准备使用 python 删除 Excphpel 表格中的行删除特定行删除空白行删除含指定

SpringBoot线程池配置使用示例详解

《SpringBoot线程池配置使用示例详解》SpringBoot集成@Async注解,支持线程池参数配置(核心数、队列容量、拒绝策略等)及生命周期管理,结合监控与任务装饰器,提升异步处理效率与系统... 目录一、核心特性二、添加依赖三、参数详解四、配置线程池五、应用实践代码说明拒绝策略(Rejected