本文主要是介绍51、Flink 窗口 Join 之滑动窗口事件时间 Join 代码示例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
1、概述
窗口中的水位线取的是两条流中的最小值;
一个流中的元素如果没有与另一个流中的元素组合起来,它就不会被输出;
2、代码示例
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;import java.time.Duration;/*** 注意:* <p>* 窗口中的水位线取的是两条流中的最小值;* 一个流中的元素如果没有与另一个流中的元素组合起来,它就不会被输出;*/
public class _02_WindowSlidingEventJoin {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 测试时限制了分区数,生产中需要设置空闲数据源env.setParallelism(2);env.disableOperatorChaining();DataStreamSource<String> inputLeft = env.socketTextStream("localhost", 8888);// 事件时间需要设置水位线策略和时间戳SingleOutputStreamOperator<Tuple2<String, Long>> mapLeft = inputLeft.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String input) throws Exception {String[] fields = input.split(",");return new Tuple2<>(fields[0], Long.parseLong(fields[1]));}});SingleOutputStreamOperator<Tuple2<String, Long>> watermarkLeft = mapLeft.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {@Overridepublic long extractTimestamp(Tuple2<String, Long> input, long l) {return input.f1;}}));DataStreamSource<String> inputRight = env.socketTextStream("localhost", 9999);// 事件时间需要设置水位线策略和时间戳SingleOutputStreamOperator<Tuple2<String, Long>> mapRight = inputRight.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String input) throws Exception {String[] fields = input.split(",");return new Tuple2<>(fields[0], Long.parseLong(fields[1]));}});SingleOutputStreamOperator<Tuple2<String, Long>> watermarkRight = mapRight.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {@Overridepublic long extractTimestamp(Tuple2<String, Long> input, long l) {return input.f1;}}));watermarkLeft.join(watermarkRight).where(e -> e.f0).equalTo(e -> e.f0).window(SlidingEventTimeWindows.of(Duration.ofSeconds(6), Duration.ofSeconds(3))).apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, Tuple3<String, Long, Long>>() {@Overridepublic Tuple3<String, Long, Long> join(Tuple2<String, Long> t1, Tuple2<String, Long> t2) throws Exception {return new Tuple3<>(t1.f0, t1.f1, t2.f1);}}).print();env.execute();}
}
3、测试用例
left-1a,1718089200000b,1718089200000c,1718089200000left-watermark=1718089199999window-watermark=no_watermarkright-2a,1718089201000b,1718089201000c,1718089201000right-watermark=1718089200999window-watermark=1718089199999[两条流中最小的]left-3a,1718089204000b,1718089204000c,1718089204000left-watermark=1718089203999window-watermark=1718089200999right-4a,1718089205000b,1718089205000c,1718089205000right-watermark=1718089204999window-watermark=1718089203999res:[1718089197000~1718089203000]2> (a,1718089200000,1718089201000)1> (b,1718089200000,1718089201000)1> (c,1718089200000,1718089201000)left-5a,1718089209000b,1718089209000c,1718089209000left-watermark=1718089208999window-watermark=1718089204999right-6a,1718089209000b,1718089209000c,1718089209000right-watermark=1718089208999window-watermark=1718089208999res[1718089200000~1718089206000]2> (a,1718089200000,1718089201000)2> (a,1718089200000,1718089205000)2> (a,1718089204000,1718089201000)2> (a,1718089204000,1718089205000)1> (b,1718089200000,1718089201000)1> (b,1718089200000,1718089205000)1> (b,1718089204000,1718089201000)1> (b,1718089204000,1718089205000)1> (c,1718089200000,1718089201000)1> (c,1718089200000,1718089205000)1> (c,1718089204000,1718089201000)1> (c,1718089204000,1718089205000)res[1718089203000~1718089209000]2> (a,1718089204000,1718089205000)1> (b,1718089204000,1718089205000)1> (c,1718089204000,1718089205000)
这篇关于51、Flink 窗口 Join 之滑动窗口事件时间 Join 代码示例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!