Flink多流转换(1)—— 分流合流

2024-01-24 06:36

本文主要是介绍Flink多流转换(1)—— 分流合流,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

分流

代码示例

使用侧输出流

合流

联合(Union)

连接(Connect)


简单划分的话,多流转换可以分为“分流”和“合流”两大类

目前分流的操作一般是通过侧输出流(side output)来实现,而合流的算子比较丰富,根据不同的需求可以调用 union、connect、join 以及 coGroup 等接口进行连接合并操作

分流

将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,得到完全平等的多个子DataStream

代码示例

调用.filter()方法进行筛选,将符合条件的数据拣选出来放到对应的流里

public class SplitStreamByFilter {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource());// 筛选Mary的浏览行为放入MaryStream流中DataStream<Event> MaryStream = stream.filter(new FilterFunction<Event>() {@Overridepublic boolean filter(Event value) throws Exception {return value.user.equals("Mary");}});// 筛选Bob的购买行为放入BobStream流中DataStream<Event> BobStream = stream.filter(new FilterFunction<Event>() {@Overridepublic boolean filter(Event value) throws Exception {return value.user.equals("Bob");}});// 筛选其他人的浏览行为放入elseStream流中DataStream<Event> elseStream = stream.filter(new FilterFunction<Event>() {@Overridepublic boolean filter(Event value) throws Exception {return !value.user.equals("Mary") && !value.user.equals("Bob") ;}});MaryStream.print("Mary pv");BobStream.print("Bob pv");elseStream.print("else pv");env.execute();}
}

缺点:上述操作将原始流复制了三份,对每一份分别进行筛选,因此代码冗余,不够高效

解决:①.split()方法(但限制了数据类型转换,已经废弃)

②测输出流

使用侧输出流

改进后的代码如下:

public class SplitStreamByOutputTag {// 定义输出标签,侧输出流的数据类型为三元组(user, url, timestamp)private static OutputTag<Tuple3<String, String, Long>> MaryTag = new OutputTag<Tuple3<String, String, Long>>("Mary-pv"){};private static OutputTag<Tuple3<String, String, Long>> BobTag = new OutputTag<Tuple3<String, String, Long>>("Bob-pv"){};public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource());SingleOutputStreamOperator<Event> processedStream = stream.process(new ProcessFunction<Event, Event>() {@Overridepublic void processElement(Event value, Context ctx, Collector<Event> out) throws Exception {if (value.user.equals("Mary")){ctx.output(MaryTag, new Tuple3<>(value.user, value.url, value.timestamp));} else if (value.user.equals("Bob")){ctx.output(BobTag, new Tuple3<>(value.user, value.url, value.timestamp));} else {out.collect(value);}}});processedStream.getSideOutput(MaryTag).print("Mary pv");processedStream.getSideOutput(BobTag).print("Bob pv");processedStream.print("else");env.execute();}
}

①定义OutputTag作为标签

②使用ctx.output方法将符合筛选条件的数据写入侧输出流

③使用getSideOutput方法从侧输出流中获得数据

合流

对于来源不同的多条流中的数据进行联合处理(与分流相比,合流操作更为普遍)

联合(Union)

直接将多条流合在一起(要求必须流中的数据类型必须相同),合并之后的新流会包括所有流中的元素,数据类型不变

操作:基于 DataStream 直接调用.union()方法

参数:其他 DataStream

返回值:一个 DataStream

stream1.union(stream2, stream3, ...)

水位线时效性:多流合并时处理的时效性是以最慢的那个流为准的(多条流的合并,某种意义上也可以看作是多个并行任务向同一个下游任务汇合的过程)

代码示例:

public class UnionTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);SingleOutputStreamOperator<Event> stream1 = env.socketTextStream("hadoop102", 7777).map(data -> {String[] field = data.split(",");return new Event(field[0].trim(), field[1].trim(), Long.valueOf(field[2].trim()));}).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));stream1.print("stream1");SingleOutputStreamOperator<Event> stream2 = env.socketTextStream("hadoop103", 7777).map(data -> {String[] field = data.split(",");return new Event(field[0].trim(), field[1].trim(), Long.valueOf(field[2].trim()));}).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp;}}));stream2.print("stream2");// 合并两条流stream1.union(stream2).process(new ProcessFunction<Event, String>() {@Overridepublic void processElement(Event value, Context ctx, Collector<String> out) throws Exception {out.collect("水位线:" + ctx.timerService().currentWatermark());}}).print();env.execute();}
}

测试:

分别在两台机器上输入以下数据:

hadoop102 :Alice, ./home, 1000
hadoop103 :Alice, ./home, 2000
hadoop102 :Alice, ./home, 3000

水位线的推进如下:

连接(Connect)

连接操作允许流的数据类型不同

连接流(ConnectedStreams)

连接流可以看成是两条流形式上的“统一”,被放在了一个同一个流中;事实上内部仍保持各自的数据形式不变,彼此之间是相互独立的

要想得到新的 DataStream,还需要进一步定义一个“同处理”(co-process)转换操作,用来说明对于不同来源、不同类型的数据,怎样分别进行处理转换、得到统一的输出类型

代码实现

①基于一条 DataStream 调用.connect()方法,传入另外一条 DataStream 作为参数,将两条流连接起来,得到一个 ConnectedStreams

②调用同处理方法得到 DataStream(可以的调用的同处理方法有.map()/.flatMap(),以及.process()方法)

public class ConnectTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStream<Integer> stream1 = env.fromElements(1,2,3);DataStream<Long> stream2 = env.fromElements(1L,2L,3L);ConnectedStreams<Integer, Long> connectedStreams = stream1.connect(stream2);SingleOutputStreamOperator<String> result = connectedStreams.map(new CoMapFunction<Integer, Long, String>() {@Overridepublic String map1(Integer value) {return "Integer: " + value;}@Overridepublic String map2(Long value) {return "Long: " + value;}});result.print();env.execute();}
}

①ConnectedStreams有两个类型参数,分别是stream1和stream2的类型;

②map方法中实现了一个CoMapFunction,表示分别对两条流中的数据执行 map 操作

类型参数<IN1, IN2, OUT>,分别表示第一条流、第二条流,以及合并后的流中的数据类型

这里我们将一条 Integer 流和一条 Long 流合并,转换成 String 输出。所以当遇到第一条流输入的整型值时,调用.map1();而遇到第二条流输入的长整型数据时,调用.map2():最终都转换为字符串输出,合并成了一条字符串流

③补充:ConnectedStreams 也可以直接调用.keyBy()进行按键分区的操作,得到的还是一个 ConnectedStreams

connectedStreams.keyBy(keySelector1, keySelector2);

传入的参数是两条流中各自的键选择器

这样的操作就是把两条流中key相同的数据放到了一起,然后针对来源的流各自进行处理;

同样也可以在合并之前先使用KeyBy进行分区,然后基于两条KeyedStream进行连接操作;

要注意两条流定义的键的类型必须相同,否则会抛出异常


CoProcessFunction

对于连接流 ConnectedStreams 的处理操作,需要分别定义对两条流的处理转换,因此接口中就会有两个相同的方法需要实现,用数字“1”“2”区分,在两条流中的数据到来时分别调用。我们把这种接口叫作“协同处理函数”(co-process function)

例如:CoMapFunction、CoFlatMapFunction、CoProcessFunction

CoProcessFunction源码如下:

public abstract class CoProcessFunction<IN1, IN2, OUT> extends AbstractRichFunction {
...
public abstract void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;public abstract void processElement2(IN2 value, Context ctx, Collector<OUT> out) throws Exception;public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception {}public abstract class Context {...}
...
}

简单示例:实现一个实时对账的需求,也就是app 的支付操作和第三方的支付操作的一个双流 Join。App 的支付事件和第三方的支付事件将会互相等待 5 秒钟,如果等不来对应的支付事件,那么就输出报警信息

// 实时对账
public class BillCheckExample {public static void main(String[] args) throws Exception{StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 来自app的支付日志SingleOutputStreamOperator<Tuple3<String, String, Long>> appStream = env.fromElements(Tuple3.of("order-1", "app", 1000L),Tuple3.of("order-2", "app", 2000L)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {@Overridepublic long extractTimestamp(Tuple3<String, String, Long> element, long recordTimestamp) {return element.f2;}}));// 来自第三方支付平台的支付日志SingleOutputStreamOperator<Tuple4<String, String, String, Long>> thirdpartStream = env.fromElements(Tuple4.of("order-1", "third-party", "success", 3000L),Tuple4.of("order-3", "third-party", "success", 4000L)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple4<String, String, String, Long>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple4<String, String, String, Long>>() {@Overridepublic long extractTimestamp(Tuple4<String, String, String, Long> element, long recordTimestamp) {return element.f3;}}));// 检测同一支付单在两条流中是否匹配,不匹配就报警appStream.connect(thirdpartStream).keyBy(data -> data.f0, data -> data.f0).process(new OrderMatchResult()).print();env.execute();}// 自定义实现CoProcessFunctionpublic static class OrderMatchResult extends CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String>{// 定义状态变量,用来保存已经到达的事件private ValueState<Tuple3<String, String, Long>> appEventState;private ValueState<Tuple4<String, String, String, Long>> thirdPartyEventState;@Overridepublic void open(Configuration parameters) throws Exception {appEventState = getRuntimeContext().getState(new ValueStateDescriptor<Tuple3<String, String, Long>>("app-event", Types.TUPLE(Types.STRING, Types.STRING, Types.LONG)));thirdPartyEventState = getRuntimeContext().getState(new ValueStateDescriptor<Tuple4<String, String, String, Long>>("thirdparty-event", Types.TUPLE(Types.STRING, Types.STRING, Types.STRING,Types.LONG)));}@Overridepublic void processElement1(Tuple3<String, String, Long> value, Context ctx, Collector<String> out) throws Exception {// 看另一条流中事件是否来过if (thirdPartyEventState.value() != null){out.collect("对账成功:" + value + "  " + thirdPartyEventState.value());// 清空状态thirdPartyEventState.clear();} else {// 更新状态appEventState.update(value);// 注册一个5秒后的定时器,开始等待另一条流的事件ctx.timerService().registerEventTimeTimer(value.f2 + 5000L);}}@Overridepublic void processElement2(Tuple4<String, String, String, Long> value, Context ctx, Collector<String> out) throws Exception {if (appEventState.value() != null){out.collect("对账成功:" + appEventState.value() + "  " + value);// 清空状态appEventState.clear();} else {// 更新状态thirdPartyEventState.update(value);// 注册一个5秒后的定时器,开始等待另一条流的事件ctx.timerService().registerEventTimeTimer(value.f3 + 5000L);}}@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {// 定时器触发,判断状态,如果某个状态不为空,说明另一条流中事件没来if (appEventState.value() != null) {out.collect("对账失败:" + appEventState.value() + "  " + "第三方支付平台信息未到");}if (thirdPartyEventState.value() != null) {out.collect("对账失败:" + thirdPartyEventState.value() + "  " + "app信息未到");}appEventState.clear();thirdPartyEventState.clear();}}}

运行结果如下:

运行结果解析:
①在CoProcessFunction的实现中,声明了两个状态变量用来保存App的支付信息和第三方的支付信息

②App支付信息到达之后,触发processElement1中的操作,检查第三方的支付信息是否已经到达(如果先到达会保存在相应的状态变量中);如果已经到达,则对账成功;如果没有到达,则等待5s,仍未到达则对账失败;

③第三方支付信息到达后,流程同②

④对于order-1,时间戳为1000的数据(App)到达后,第三方支付信息未到达,等待5s,接着时间戳未3000的数据(第三方)到达后,发现App支付信息已经到达,因此对账成功

⑤对于order-2和order-3,均是等待5s后没有检测到App(第三方)数据到达而发出报警信息

广播连接流(BroadcastConnectedStream)

DataStream 调用.connect()方法时可以传入一个广播流(BroadcastConnectedStream)

这种连接方式往往用在需要动态定义某些规则或配置的场景。因为规则是实时变动的,所以我们可以用一个单独的流来获取规则数据;而这些规则或配置是对整个应用全局有效的,所以不能只把这数据传递给一个下游并行子任务处理,而是要“广播”(broadcast)给所有的并行子任务。而下游子任务收到广播出来的规则,会把它保存成一个状态,这就是所谓的“广播状态”(broadcast state)

如何创建广播流


基于DataStream调用.broadcast()方法,传入一个“映射状态描述器”(MapStateDescriptor),说明状态的名称和类型;

因为广播状态底层是用一个“映射”(map)结构来保存的

MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(...);
BroadcastStream<Rule> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);

数据流和广播流的连接

得到“广播连接流”(BroadcastConnectedStream),然后基于广播连接流调用.process()方法,就可以同时获取规则和数据,进行动态处理

DataStream<String> output = stream.connect(ruleBroadcastStream).process( new BroadcastProcessFunction<>() {...} );
BroadcastProcessFunction

BroadcastProcessFunction 与 CoProcessFunction 类似,同样是一个抽象类,需要实现两个方法,针对合并的两条流中元素分别定义处理操作。区别在于这里一条流是正常处理数据,而另一条流则是要用新规则来更新广播状态,所以对应的两个方法叫作.processElement().processBroadcastElement()

学习课程链接:【尚硅谷】Flink1.13实战教程(涵盖所有flink-Java知识点)_哔哩哔哩_bilibili 

这篇关于Flink多流转换(1)—— 分流合流的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/638799

相关文章

LangChain转换链:让数据处理更精准

1. 转换链的概念 在开发AI Agent(智能体)时,我们经常需要对输入数据进行预处理,这样可以更好地利用LLM。LangChain提供了一个强大的工具——转换链(TransformChain),它可以帮我们轻松实现这一任务。 转换链(TransformChain)主要是将 给定的数据 按照某个函数进行转换,再将 转换后的结果 输出给LLM。 所以转换链的核心是:根据业务逻辑编写合适的转换函

工程文档CAD转换必备!在 Java 中将 DWG 转换为 JPG

Aspose.CAD 是一个独立的类库,以加强Java应用程序处理和渲染CAD图纸,而不需要AutoCAD或任何其他渲染工作流程。该CAD类库允许将DWG, DWT, DWF, DWFX, IFC, PLT, DGN, OBJ, STL, IGES, CFF2文件、布局和图层高质量地转换为PDF和光栅图像格式。 Aspose API支持流行文件格式处理,并允许将各类文档导出或转换为固定布局文件格

53、Flink Interval Join 代码示例

1、概述 interval Join 默认会根据 keyBy 的条件进行 Join 此时为 Inner Join; interval Join 算子的水位线会取两条流中水位线的最小值; interval Join 迟到数据的判定是以 interval Join 算子的水位线为基准; interval Join 可以分别输出两条流中迟到的数据-[sideOutputLeftLateData,

直接得到Json串,转换为字典

0.新创建一个json文件,把json串拷贝到里面 1.先通过MainBundle找到资源对应的路径 2.将文件转换为NSData 3.通过NSJSonSerization得到字典 NSString*fileName=[[NSBundle mainBundle] pathForResource:@"myJson" ofType:@"json"];           NS

如何通过示例将旧版 C# 转换为 C# 12

随着 C# 的不断发展,每个新版本都会引入强大的新功能,从而提高语言的功能和可读性。通过从旧版本的 C# 迁移到 C# 12,您可以获得更高效、更易于维护和更具表现力的代码。 由于代码库遗留、公司限制以及对旧语言功能的熟悉,许多开发人员仍在使用旧版本的 C#。升级似乎很困难,但现代版本的 C# 具有显著的优势,例如更好的性能、增强的功能和更高的安全性。 通过增量重构、试点项目和团队培训逐步

将知乎专栏文章转换为 Markdown 文件保存到本地

一、参考内容 参考知乎文章`代码 | 将知乎专栏文章转换为 Markdown 文件保存到本地,利用代码为GitHub:https://github.com/chenluda/zhihu-download。 二、步骤 1.首先安装包flask、flask-cors、markdownify 2. 运行app.py 3.在浏览器中打开链接,并填写URL和Cookies 获取Cookies的步

Linux float int和16进制互相转换

Linux 上float int和16进制互换操作。之前把float转16进制,也就是转成4个字节,方便使用串口传输嘛。使用的方法是: //float 转 16进制float x_pid_p = 15.0;unsigned char * bValue = (unsigned char *)& x_pid_p;printf("%x\t%x\t%x\t%x\n", bValue[0], bVa

Flink SQL因类型错误导致MAX和MIN计算错误

背景 最近在做数据分析,用Flink SQL来做分析工具,因数据源的数据存在不太规范的数据格式,因此我需要通过SQL函数把我需要的数据值从VARCHAR类型的字段中把数据提取出来,然后再做MAX、MIN、SUM这些统计。怎料SUM算出来的结果准确无误,而MAX和MIN算出来的结果却始终不正确,最后发现原来是我用SQL函数提取VARCHAR类型的字段的数据,也是VARCHAR类型,所以导致MAX、

Day59 代码随想录打卡|二叉树篇---把二叉搜索树转换为累加树

题目(leecode T538): 给出二叉 搜索 树的根节点,该树的节点值各不相同,请你将其转换为累加树(Greater Sum Tree),使每个节点 node 的新值等于原树中大于或等于 node.val 的值之和。 提醒一下,二叉搜索树满足下列约束条件: 节点的左子树仅包含键 小于 节点键的节点。节点的右子树仅包含键 大于 节点键的节点。左右子树也必须是二叉搜索树。 方法:本题

金蝶KIS新建账套时 从字符串向DateTime转换时失败 从字符串转换为Datetime类型时发生语法错误

需要修改以下几点 控制面板---区域和语言选项---区域选项---自定义,修改为如下格式