本文主要是介绍53、Flink Interval Join 代码示例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
1、概述
interval Join 默认会根据 keyBy 的条件进行 Join 此时为 Inner Join;
interval Join 算子的水位线会取两条流中水位线的最小值;
interval Join 迟到数据的判定是以 interval Join 算子的水位线为基准;
interval Join 可以分别输出两条流中迟到的数据-[sideOutputLeftLateData,sideOutputRightLateData];
2、代码示例
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;import java.time.Duration;/*** interval Join 默认会根据 keyBy 的条件进行 Join 此时为 Inner Join* interval Join 算子的水位线会取两条流中水位线的最小值;* interval Join 迟到数据的判定是以 interval Join 算子的水位线为基准;* interval Join 可以分别输出两条流中迟到的数据-[sideOutputLeftLateData,sideOutputRightLateData];*/
public class _04_IntervalInnerJoin {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 测试时限制了分区数,生产中需要设置空闲数据源env.setParallelism(2);env.disableOperatorChaining();DataStreamSource<String> inputLeft = env.socketTextStream("localhost", 8888);// 事件时间需要设置水位线策略和时间戳SingleOutputStreamOperator<Tuple2<String, Long>> mapLeft = inputLeft.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String input) throws Exception {String[] fields = input.split(",");return new Tuple2<>(fields[0], Long.parseLong(fields[1]));}});SingleOutputStreamOperator<Tuple2<String, Long>> watermarkLeft = mapLeft.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {@Overridepublic long extractTimestamp(Tuple2<String, Long> input, long l) {return input.f1;}}));DataStreamSource<String> inputRight = env.socketTextStream("localhost", 9999);OutputTag<Tuple2<String, Long>> leftLateTag = new OutputTag<Tuple2<String, Long>>("left-late") {};OutputTag<Tuple2<String, Long>> rightLateTag = new OutputTag<Tuple2<String, Long>>("right-late") {};// 事件时间需要设置水位线策略和时间戳SingleOutputStreamOperator<Tuple2<String, Long>> mapRight = inputRight.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String input) throws Exception {String[] fields = input.split(",");return new Tuple2<>(fields[0], Long.parseLong(fields[1]));}});SingleOutputStreamOperator<Tuple2<String, Long>> watermarkRight = mapRight.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(0)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {@Overridepublic long extractTimestamp(Tuple2<String, Long> input, long l) {return input.f1;}}));/*** left-1** a,1718089200000* b,1718089200000* c,1718089200000** interval_join_watermark=No Watermark** right-2** a,1718089201000* b,1718089201000* c,1718089201000** interval_join_watermark=1718089199999** res=:2> (a,1718089200000,1718089201000)* res=:1> (b,1718089200000,1718089201000)* res=:1> (c,1718089200000,1718089201000)** left-3** a,1718089203000* b,1718089203000* c,1718089203000** interval_join_watermark=1718089200999** right-4** a,1718089204000* b,1718089204000* c,1718089204000** interval_join_watermark=1718089202999** res=:2> (a,1718089203000,1718089204000)* res=:1> (b,1718089203000,1718089204000)* res=:1> (c,1718089203000,1718089204000)** left-right-5** a,1718089202000* b,1718089202000* c,1718089202000** left-late=:1> (b,1718089202000)* left-late=:2> (a,1718089202000)* left-late=:1> (c,1718089202000)* right-late=:1> (b,1718089202000)* right-late=:2> (a,1718089202000)* right-late=:1> (c,1718089202000)*/SingleOutputStreamOperator<Tuple3<String, Long, Long>> resStream = watermarkLeft.keyBy(e -> e.f0).intervalJoin(watermarkRight.keyBy(e -> e.f0)).between(Duration.ofSeconds(-1), Duration.ofSeconds(1)).sideOutputLeftLateData(leftLateTag).sideOutputRightLateData(rightLateTag).process(new ProcessJoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, Tuple3<String, Long, Long>>() {@Overridepublic void processElement(Tuple2<String, Long> t1, Tuple2<String, Long> t2, ProcessJoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, Tuple3<String, Long, Long>>.Context context, Collector<Tuple3<String, Long, Long>> collector) throws Exception {collector.collect(new Tuple3<>(t1.f0, t1.f1, t2.f1));}});resStream.print("res=");resStream.getSideOutput(leftLateTag).print("left-late=");resStream.getSideOutput(rightLateTag).print("right-late=");env.execute();}
}
3、测试用例
left-1a,1718089200000b,1718089200000c,1718089200000interval_join_watermark=No Watermarkright-2a,1718089201000b,1718089201000c,1718089201000interval_join_watermark=1718089199999res=:2> (a,1718089200000,1718089201000)res=:1> (b,1718089200000,1718089201000)res=:1> (c,1718089200000,1718089201000)left-3a,1718089203000b,1718089203000c,1718089203000interval_join_watermark=1718089200999right-4a,1718089204000b,1718089204000c,1718089204000interval_join_watermark=1718089202999res=:2> (a,1718089203000,1718089204000)res=:1> (b,1718089203000,1718089204000)res=:1> (c,1718089203000,1718089204000)left-right-5a,1718089202000b,1718089202000c,1718089202000left-late=:1> (b,1718089202000)left-late=:2> (a,1718089202000)left-late=:1> (c,1718089202000)right-late=:1> (b,1718089202000)right-late=:2> (a,1718089202000)right-late=:1> (c,1718089202000)
这篇关于53、Flink Interval Join 代码示例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!