Flink旁路输出OutputTag

2024-01-16 15:36
文章标签 输出 flink 旁路 outputtag

本文主要是介绍Flink旁路输出OutputTag,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 前言
  • 代码示例
    • 1.流复制
    • 2.条件分流
    • 3.迟到数据分流


前言

除了由 DataStream 操作产生的主要流之外,还可以产生任意数量的旁路输出结果流。结果流中的数据类型不必与主要流中的数据类型相匹配,并且不同旁路输出的类型也可以不同。当你需要拆分数据流时,通常必须复制该数据流,然后从每个流中过滤掉不需要的数据。

使用旁路输出时,首先需要定义用于标识旁路输出流的 OutputTag:

//需要使用匿名内部类,其中T是泛型
OutputTag<T> outputTag = new OutputTag<T>("side-output") {};

可以通过以下方法将数据发送到旁路输出:

  • ProcessFunction
  • KeyedProcessFunction
  • CoProcessFunction
  • KeyedCoProcessFunction
  • ProcessWindowFunction
  • ProcessAllWindowFunction

代码示例

1.流复制

将流复制两份 发到测输出流stream1 和stream2,代码如下(示例):


import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;public class SideOutputTest {public static final String TYPE = "type";public static void main(String[] args) throws Exception {//获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();final ParameterTool params = ParameterTool.fromArgs(args);String hostName = params.get("hostname", "10.68.8.59");int port = params.getInt("port", 9999);// nc -l 9999DataStream<String> sourceStream = env.socketTextStream(hostName, port, "\n");SingleOutputStreamOperator<JSONObject> jsonObjectStream = sourceStream.map(s -> JSONObject.parseObject(s));//定义OutputTagOutputTag<JSONObject> outputTag1 = new OutputTag<JSONObject>("stream1") {};OutputTag<JSONObject> outputTag2 = new OutputTag<JSONObject>("stream2") {};//将流复制两份 发到测输出流stream1 和stream2SingleOutputStreamOperator<JSONObject> outputStream = jsonObjectStream.process(new ProcessFunction<JSONObject, JSONObject>() {@Overridepublic void processElement(JSONObject jsonObject, Context context, Collector<JSONObject> collector)throws Exception {context.output(outputTag1, jsonObject);context.output(outputTag2, jsonObject);}});DataStream<JSONObject> stream1 = outputStream.getSideOutput(outputTag1);DataStream<JSONObject> stream2 = outputStream.getSideOutput(outputTag2);//数据去向//stream1stream1.map(e -> {e.put("stream", "stream1");return e;}).print();//stream2stream2.map(e -> {e.put("stream", "stream2");return e;}).print();env.execute("SocketStreamTest");}
}

2.条件分流

可以根据自定义条件将数据分流。

public class SplitDemo {public static final OutputTag<Integer> evenTag = new OutputTag<Integer>("even"){};public static final OutputTag<Integer> oddTag = new OutputTag<Integer>("odd"){};public static void main(String[] args) throws Exception {StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Integer> source = executionEnvironment.fromElements(1, 2, 3, 4, 5);SingleOutputStreamOperator<Integer> process = source.process(new ProcessFunction<Integer, Integer>() {@Overridepublic void processElement(Integer value, ProcessFunction<Integer, Integer>.Context ctx, Collector<Integer> out) throws Exception {if (value % 2 == 0) {// 这里不使用out.collect,而是使用ctx.output// 这个方法多了一个参数,可以指定output tag,从而实现数据分流ctx.output(evenTag, value);} else {ctx.output(oddTag, value);}}});// 依赖OutputTag获取对应的旁路输出DataStream<Integer> evenStream = process.getSideOutput(evenTag);DataStream<Integer> oddStream = process.getSideOutput(oddTag);// 分别打印两个旁路输出流中的数据evenStream.process(new ProcessFunction<Integer, String>() {@Overridepublic void processElement(Integer value, ProcessFunction<Integer, String>.Context ctx, Collector<String> out) throws Exception {out.collect("Even: " + value);}}).print();oddStream.process(new ProcessFunction<Integer, String>() {@Overridepublic void processElement(Integer value, ProcessFunction<Integer, String>.Context ctx, Collector<String> out) throws Exception {out.collect("Odd: " + value);}}).print();executionEnvironment.execute();}
}

3.迟到数据分流

public class OutOfOrderDemo {// 创建tagpublic static final OutputTag<Tuple2<String, Integer>> lateTag = new OutputTag<Tuple2<String, Integer>>("late"){};public static void main(String[] args) throws Exception {StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();// 示例数据,其中D乱序,I来迟(H到来的时候认为15000ms之前的数据已经到齐)SingleOutputStreamOperator<Tuple2<String, Integer>> source = executionEnvironment.fromElements(new Tuple2<>("A", 0),new Tuple2<>("B", 1000),new Tuple2<>("C", 2000),new Tuple2<>("D", 7000),new Tuple2<>("E", 3000),new Tuple2<>("F", 4000),new Tuple2<>("G", 5000),new Tuple2<>("H", 20000),new Tuple2<>("I", 8000)).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forGenerator(new WatermarkGeneratorSupplier<Tuple2<String, Integer>>() {// 这里自定义WatermarkGenerator的原因是Flink按照运行时间周期发送watermark,但我们的例子是单次执行的,可以认为数据是一瞬间到来// 因此我们改写为每到来一条数据发送一次watermark,watermark的时间戳为数据的事件事件减去5000毫秒,意思是最多容忍数据来迟5000毫秒@Overridepublic WatermarkGenerator<Tuple2<String, Integer>> createWatermarkGenerator(Context context) {return new WatermarkGenerator<Tuple2<String, Integer>>() {@Overridepublic void onEvent(Tuple2<String, Integer> event, long eventTimestamp, WatermarkOutput output) {long watermark = eventTimestamp - 5000L < 0 ? 0L : eventTimestamp - 5000L;output.emitWatermark(new Watermark(watermark));}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {}};}// 取第二个字段为watermark}).withTimestampAssigner((element, timestamp) -> element.f1));// 窗口大小5秒,允许延迟5秒// watermark和allowedLateness的区别是,watermark决定了什么时候窗口数据触发计算,allowedLateness决定什么数据被认为是lateElement,从而发送到sideOutput// 设置side output tagsource.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))).allowedLateness(Time.seconds(5)).sideOutputLateData(lateTag).process(new ProcessAllWindowFunction<Tuple2<String, Integer>, Object, TimeWindow>() {@Overridepublic void process(ProcessAllWindowFunction<Tuple2<String, Integer>, Object, TimeWindow>.Context context, Iterable<Tuple2<String, Integer>> elements, Collector<Object> out) throws Exception {Iterator<Tuple2<String, Integer>> iterator = elements.iterator();System.out.println("--------------------");while(iterator.hasNext()) {System.out.println(iterator.next());}}// 打印sideoutput流内容}).getSideOutput(lateTag).process(new ProcessFunction<Tuple2<String, Integer>, Object>() {@Overridepublic void processElement(Tuple2<String, Integer> value, ProcessFunction<Tuple2<String, Integer>, Object>.Context ctx, Collector<Object> out) throws Exception {System.out.println("Late element: " + value);}});executionEnvironment.execute();}
}

这篇关于Flink旁路输出OutputTag的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/613054

相关文章

顺序表之创建,判满,插入,输出

文章目录 🍊自我介绍🍊创建一个空的顺序表,为结构体在堆区分配空间🍊插入数据🍊输出数据🍊判断顺序表是否满了,满了返回值1,否则返回0🍊main函数 你的点赞评论就是对博主最大的鼓励 当然喜欢的小伙伴可以:点赞+关注+评论+收藏(一键四连)哦~ 🍊自我介绍   Hello,大家好,我是小珑也要变强(也是小珑),我是易编程·终身成长社群的一名“创始团队·嘉宾”

AI(文生语音)-TTS 技术线路探索学习:从拼接式参数化方法到Tacotron端到端输出

AI(文生语音)-TTS 技术线路探索学习:从拼接式参数化方法到Tacotron端到端输出 在数字化时代,文本到语音(Text-to-Speech, TTS)技术已成为人机交互的关键桥梁,无论是为视障人士提供辅助阅读,还是为智能助手注入声音的灵魂,TTS 技术都扮演着至关重要的角色。从最初的拼接式方法到参数化技术,再到现今的深度学习解决方案,TTS 技术经历了一段长足的进步。这篇文章将带您穿越时

如何将一个文件里不包含某个字符的行输出到另一个文件?

第一种: grep -v 'string' filename > newfilenamegrep -v 'string' filename >> newfilename 第二种: sed -n '/string/!'p filename > newfilenamesed -n '/string/!'p filename >> newfilename

Detectorn2预训练模型复现:数据准备、训练命令、日志分析与输出目录

Detectorn2预训练模型复现:数据准备、训练命令、日志分析与输出目录 在深度学习项目中,目标检测是一项重要的任务。本文将详细介绍如何使用Detectron2进行目标检测模型的复现训练,涵盖训练数据准备、训练命令、训练日志分析、训练指标以及训练输出目录的各个文件及其作用。特别地,我们将演示在训练过程中出现中断后,如何使用 resume 功能继续训练,并将我们复现的模型与Model Zoo中的

Flink任务重启策略

概述 Flink支持不同的重启策略,以在故障发生时控制作业如何重启集群在启动时会伴随一个默认的重启策略,在没有定义具体重启策略时会使用该默认策略。如果在工作提交时指定了一个重启策略,该策略会覆盖集群的默认策略默认的重启策略可以通过 Flink 的配置文件 flink-conf.yaml 指定。配置参数 restart-strategy 定义了哪个策略被使用。常用的重启策略: 固定间隔 (Fixe

第六章习题11.输出以下图形

🌏个人博客:尹蓝锐的博客 希望文章能够给到初学的你一些启发~ 如果觉得文章对你有帮助的话,点赞 + 关注+ 收藏支持一下笔者吧~ 1、题目要求: 输出以下图形

LibSVM学习(五)——分界线的输出

对于学习SVM人来说,要判断SVM效果,以图形的方式输出的分解线是最直观的。LibSVM自带了一个可视化的程序svm-toy,用来输出类之间的分界线。他是先把样本文件载入,然后进行训练,通过对每个像素点的坐标进行判断,看属于哪一类,就附上那类的颜色,从而使类与类之间形成分割线。我们这一节不讨论svm-toy怎么使用,因为这个是“傻瓜”式的,没什么好讨论的。这一节我们主要探讨怎么结合训练结果文件

下载/保存/读取 文件,并转成流输出

最近对文件的操作又熟悉了下;现在记载下来:学习在于 坚持!!!不以细小而不为。 实现的是:文件的下载、文件的保存到SD卡、文件的读取输出String 类型、最后是文件转换成流输出;一整套够用了; 重点: 1:   操作网络要记得开线程; 2:更新网络获取的数据 切记用Handler机制; 3:注意代码的可读性(这里面只是保存到SD卡,在项目中切记要对SD卡的有无做判断,然后再获取路径!)

彻底解决win10系统Tomcat10控制台输出中文乱码

彻底解决Tomcat10控制台输出中文乱码 首先乱码问题的原因通俗的讲就是读的编码格式和写的解码格式不一致,比如最常见的两种中文编码UTF-8和GBK,UTF-8一个汉字占三个字节,GBK一个汉字占两个字节,所以当编码与解码格式不一致时,输出端当然无法识别这是啥,所以只能以乱码代替。 值得一提的是GBK不是国家标准编码,常用的国标有两,一个是GB2312,一个是GB18030 GB1

lesson1 输出出现重复行的文件名称

lesson1 输出出现重复行的文件名称 1. 代码 package mainimport ("bufio""fmt""io""os")/*** @Author: jiaona.chen* @Description:* @File: main* @Version: 1.0.0* @Date: 2024/09/07 15:25*/// 输出出现重复行的文件名称func main() {ty