本文主要是介绍如何使用 Side Output 来分流?,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
通常,在 Kafka 的 topic 中会有很多数据,这些数据虽然结构是一致的,但是类型可能不一致,举个例子:Kafka 中的监控数据有很多种:机器、容器、应用、中间件等,如果要对这些数据分别处理,就需要对这些数据流进行一个拆分,那么在 Flink 中该怎么完成这需求呢,有如下这些方法。
使用 Filter 分流
使用 filter 算子根据数据的字段进行过滤分成机器、容器、应用、中间件等。伪代码如下:
DataStreamSource<MetricEvent> data = KafkaConfigUtil.buildSource(env); //从 Kafka 获取到所有的数据流
SingleOutputStreamOperator<MetricEvent> machineData = data.filter(m -> "machine".equals(m.getTags().get("type"))); //过滤出机器的数据
SingleOutputStreamOperator<MetricEvent> dockerData = data.filter(m -> "docker".equals(m.getTags().get("type"))); //过滤出容器的数据
SingleOutputStreamOperator<MetricEvent> applicationData = data.filter(m -> "application".equals(m.getTags().get("type"))); //过滤出应用的数据
SingleOutputStreamOperator<MetricEvent> middlewareData = data.filter(m -> "middleware&#
这篇关于如何使用 Side Output 来分流?的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!