flink的神奇分流器-sideoutput

2023-10-09 03:18

本文主要是介绍flink的神奇分流器-sideoutput,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

今天浪尖给大家讲讲flink的一个神奇功能,sideouptut侧输出。

为了说明侧输出(sideouptut)的作用,浪尖举个例子,比如现在有一篇文章吧,单词长度不一,但是我们想对单词长度小于5的单词进行wordcount操作,同时又想记录下来哪些单词的长度大于了5,那么我们该如何做呢?

比如,Datastream是单词流,那么一般做法(只写了代码模版)

datastream.filter(word.length>=5); //获取不统计的单词,也即是单词长度大于等于5

datastream.filter(word.length <5);// 获取需要进行wordcount的单词。

这样数据,然后每次筛选都要保留整个流,然后遍历整个流,显然很浪费性能,假如能够在一个流了多次输出就好了,flink的侧输出提供了这个功能,侧输出的输出(sideoutput)类型可以与主流不同,可以有多个侧输出(sideoutput),每个侧输出不同的类型。

下面浪尖就来具体讲一下如何使用侧输出。

1. 定义OutputTag

在使用侧输出的时候需要先定义一个OutputTag。定义方式,如下:

OutputTag<String> outputTag = newOutputTag<String>("side-output") {};

OutputTag有两个构造函数,上面例子构造函数只有一个id参数,还有一个构造函数包括两个参数,idTypeInformation信息。

OutputTag(String id)
OutputTag(String id, TypeInformation<T>typeInfo)

2. 使用特定的函数

要使用侧输出,在处理数据的时候除了要定义相应类型的OutputTag外,还要使用特定的函数,主要是有四个:

ProcessFunction

CoProcessFunction

ProcessWindowFunction

ProcessAllWindowFunction

本文主要是以ProcessFunction为例讲解如何使用flink的侧输出(sideoutput)功能,具体这几个函数的深入含义及应用,后面再出文章分析。

上述函数中暴漏了Context参数给用户,让用户可以将数据通过outputtag发给侧输出流。

3. 案例

准备数据

/**
* Provides the default data sets used for the WordCount example program.
* The default data sets are used, if no parameters are given to the program.
*
*/

public class WordCountData {

  public static final String[] WORDS = new String[] {
     "To be, or not to be,--that is the question:--",
     "Whether 'tis nobler in the mind to suffer",
     "The slings and arrows of outrageous fortune",
     "Or to take arms against a sea of troubles,",
     "And by opposing end them?--To die,--to sleep,--",
     "No more; and by a sleep to say we end",
     "The heartache, and the thousand natural shocks",
     "That flesh is heir to,--'tis a consummation",
     "Devoutly to be wish'd. To die,--to sleep;--",
     "To sleep! perchance to dream:--ay, there's the rub;",
     "For in that sleep of death what dreams may come,",
     "When we have shuffled off this mortal coil,",
     "Must give us pause: there's the respect",
     "That makes calamity of so long life;",
     "For who would bear the whips and scorns of time,",
     "The oppressor's wrong, the proud man's contumely,",
     "The pangs of despis'd love, the law's delay,",
     "The insolence of office, and the spurns",
     "That patient merit of the unworthy takes,",
     "When he himself might his quietus make",
     "With a bare bodkin? who would these fardels bear,",
     "To grunt and sweat under a weary life,",
     "But that the dread of something after death,--",
     "The undiscover'd country, from whose bourn",
     "No traveller returns,--puzzles the will,",
     "And makes us rather bear those ills we have",
     "Than fly to others that we know not of?",
     "Thus conscience does make cowards of us all;",
     "And thus the native hue of resolution",
     "Is sicklied o'er with the pale cast of thought;",
     "And enterprises of great pith and moment,",
     "With this regard, their currents turn awry,",
     "And lose the name of action.--Soft you now!",
     "The fair Ophelia!--Nymph, in thy orisons",
     "Be all my sins remember'd."
  };
}

定义OutputTag对象:

private static final OutputTag<String> rejectedWordsTag = new OutputTag<String>("rejected") {};

定义ProcessFunction函数:

/**
* 以用户自定义FlatMapFunction函数的形式来实现分词器功能,该分词器会将分词封装为(word,1),
* 同时不接受单词长度大于5的,也即是侧输出都是单词长度大于5的单词。
*/

public static final class Tokenizer extends ProcessFunction<String, Tuple2<String, Integer>> {
  private static final long serialVersionUID = 1L;

  @Override
  public void processElement(
        String value,
        Context ctx,
        Collector<Tuple2<String, Integer>> out) throws Exception {
     // normalize and split the line
     String[] tokens = value.toLowerCase().split("\\W+");

     // emit the pairs
     for (String token : tokens) {
        if (token.length() > 5) {
           ctx.output(rejectedWordsTag, token);
        } else if (token.length() > 0) {
           out.collect(new Tuple2<>(token, 1));
        }
     }

  }
}

 

初始化flink,并使用侧输出:

public static void main(String[] args) throws Exception {

  // set up the execution environment
  final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

  env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

  // 获取输入数据
  DataStream<String> text = env.fromElements(WordCountData.WORDS);

  SingleOutputStreamOperator<Tuple2<String, Integer>> tokenized = text
        .keyBy(new KeySelector<String, Integer>() {
           private static final long serialVersionUID = 1L;

           @Override
           public Integer getKey(String value) throws Exception {
              return 0;
           }
        })
        .process(new Tokenizer());

  // 获取侧输出
  DataStream<String> rejectedWords = tokenized
        .getSideOutput(rejectedWordsTag)
        .map(new MapFunction<String, String>() {
           private static final long serialVersionUID = 1L;

           @Override
           public String map(String value) throws Exception {
              return "rejected: " + value;
           }
        });
 
  DataStream<Tuple2<String, Integer>> counts = tokenized
        .keyBy(0)
        .window(TumblingEventTimeWindows.of(Time.seconds(5)))
        .sum(1);

  // wordcount结果输出
  counts.print();
  // 侧输出结果输出
  rejectedWords.print();
 
  // execute program
  env.execute("Streaming WordCount SideOutput");
}

直接本地运行,查看结果:

640?wx_fmt=png

推荐阅读:

调试flink源码

Flink异步IO第一讲

结合Spark讲一下Flink的runtime

干货|kafka流量监控的原理及实现

640?wx_fmt=png

这篇关于flink的神奇分流器-sideoutput的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/169956

相关文章

Kafka拦截器的神奇操作方法

《Kafka拦截器的神奇操作方法》Kafka拦截器是一种强大的机制,用于在消息发送和接收过程中插入自定义逻辑,它们可以用于消息定制、日志记录、监控、业务逻辑集成、性能统计和异常处理等,本文介绍Kafk... 目录前言拦截器的基本概念Kafka 拦截器的定义和基本原理:拦截器是 Kafka 消息传递的不可或缺

Flink任务重启策略

概述 Flink支持不同的重启策略,以在故障发生时控制作业如何重启集群在启动时会伴随一个默认的重启策略,在没有定义具体重启策略时会使用该默认策略。如果在工作提交时指定了一个重启策略,该策略会覆盖集群的默认策略默认的重启策略可以通过 Flink 的配置文件 flink-conf.yaml 指定。配置参数 restart-strategy 定义了哪个策略被使用。常用的重启策略: 固定间隔 (Fixe

Redis 管道的神奇力量

今天我们要来探索一个 Redis 中非常强大且实用的特性——管道(Pipeline)。如果你想让你的 Redis 操作更加高效,那么这篇文章绝对值得一读。 一、Redis 管道是什么 Redis 管道是一种在客户端和服务器之间批量执行命令的技术。它允许客户端将多个命令一次性发送到服务器,而不是逐个发送并等待每个命令的响应。服务器会按照顺序执行这些命令,并将所有命令的响应一次性返回给客户端。

入门篇:神奇的Annotation

涅槃1992 关注 2016.12.25 23:41* 字数 4964 阅读 1059评论 3喜欢 29 前面写了Android 开发:由模块化到组件化(一),很多小伙伴来问怎么没有Demo啊?之所以没有立刻放demo的原因在还有许多技术点没说完. 今天我们就来细细评味Java当中Annotation,也就是我们常说的注解. 本文按照以下顺序进行:元数据->元注解->运行时注解->编译时

大数据之Flink(二)

4、部署模式 flink部署模式: 会话模式(Session Mode)单作业模式(Per-Job Mode)应用模式(Application Mode) 区别在于集群的生命周期以及资源的分配方式;以及应用的main方法到底在哪里执行。 4.1、会话模式 先启动一个集群,保持一个会话,在这个会话中通过客户端提交作业。集群启动时资源已经确定,所有提交的作业会晶振集群中的资源。适合规模小、执

是谁还不会flink的checkpoint呀~

1、State Vs Checkpoint State:状态,是Flink中某一个Operator在某一个时刻的状态,如maxBy/sum,注意State存的是历史数据/状态,存在内存中。 Checkpoint:快照点, 是Flink中所有有状态的Operator在某一个时刻的State快照信息/存档信息 一句话概括: Checkpoint就是State的快照 目的:假设作业停止了,下次启动的

一个人就能干一个团队剪辑工作?云微客就是这么神奇

你知道拍摄、剪辑一条视频需要花费多长时间吗?半个小时?还是一个小时呢?如果我想一天发布上百条视频,你觉得可能吗?很显然,仅凭个人是很难办到的,那么就需要借助工具,而云微客AI批量剪辑系统正好可以解决这个难题。 在当下这个短视频风靡的时代,不管是企业还是个人创作者们都需要借助各种工具和系统来提升创作内容的生产效率和传播效果。而云微客AI批量剪辑系统凭借着批量剪辑的功能,为创作者带来了很大的

Flink读取kafka数据并以parquet格式写入HDFS

《2021年最新版大数据面试题全面开启更新》 《2021年最新版大数据面试题全面开启更新》 大数据业务场景中,经常有一种场景:外部数据发送到kafka中,flink作为中间件消费kafka数据并进行业务处理;处理完成之后的数据可能还需要写入到数据库或者文件系统中,比如写入hdfs中; 目前基于spark进行计算比较主流,需要读取hdfs上的数据,可以通过读取parquet:spark.read

Apache-Flink深度解析-State

来源:https://dwz.cn/xrMCqbk5 Flink系列精华文章合集入门篇: Flink入门Flink DataSet&DataSteam APIFlink集群部署Flink重启策略Flink分布式缓存Flink重启策略Flink中的TimeFlink中的窗口Flink的时间戳和水印Flink广播变量Flink-Kafka-connetorFlink-Table&SQLFlink

Apache-Flink深度解析-Temporal-Table-JOIN

在《JOIN LATERAL》中提到了Temporal Table JOIN,本篇就向大家详细介绍什么是Temporal Table JOIN。在ANSI-SQL 2011 中提出了Temporal 的概念,Oracle,SQLServer,DB2等大的数据库厂商也先后实现了这个标准。Temporal Table记录了历史上任何时间点所有的数据改动,Temporal Table的工作流程如下: