Flink之DataStream API的转换算子

2023-12-06 23:20

本文主要是介绍Flink之DataStream API的转换算子,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

简单转换算子

函数的实现方式

  1. 自定义类,实现函数接口:编码麻烦,使用灵活
  2. 匿名内部类:编码简单
  3. Lambda:编码简洁
public class Flink02_FunctionImplement {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);env.socketTextStream("hadoop102",8888).flatMap((String line, Collector<Tuple2<String, Integer>> out)->{String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word,1));}}).returns(Types.TUPLE(Types.STRING,Types.INT)).keyBy(0).sum(1).print();try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}public static class MyFlatMapFunction implements FlatMapFunction<String, Tuple2<String,Integer>> {private String Operator;@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word,1));}}}
}

Reduce规约聚合

  1. reduce:规约聚合
    • 聚合的原理:两两聚合,上一次的聚合值与本次新到的值进行聚合
    • 泛型 T : 流中的数据类型, 从方法声明中可以看到,输入输出类型一直
    • 方法: T reduce(T value1, T value2) throws Exception
      • value1:上一次的聚合值
      • value2:本次新到的值
public class Flink04_ReduceAggOpterator {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);DataStreamSource<Event> ds = Flink06_EventSource.getEventSource(env);ds.print("input================");//reduce:每个用户的点击次数ds.map(event->new WordCount(event.getUser(),1)).keyBy(WordCount::getWord).reduce(new ReduceFunction<WordCount>() {/**** @param value1 上次聚合的结果,第一个数据不参与聚合,直接输出* @param value2 新来的值* @return* @throws Exception*/@Overridepublic WordCount reduce(WordCount value1, WordCount value2) throws Exception {System.out.println("测试");return new WordCount(value1.getWord(),value1.getCount()+value2.getCount());}}).print("reduce");try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}

富函数

调用算子的时候,需要传入对应的用户自定义函数来完成具体的功能

  • 函数:
    • 普通函数
      Map
      filter
      flatMap
      reduce
      富函数:基本上每个普通函数都有对应的富函数
      统一接口interface RichFunction extends Function
      具体使用的富函数类:
      - RichMapFunction
      - RichFilterFunction
      - RichFlatMapFunction
      - RichReduceFunction
      - …
      富函数功能:
      • 生命周期方法:
        • open(): 当前算子的每个并行子任务的实例创建时会调用一次
        • close():当前算子的每个并行子任务的实例销毁时(有界流),调用一次
      • 获取运行时上下文对象 getRuntimeContext
        • 可以获取当前作业,当前task的相关信息
        • 获取不同类型的状态,进行状态编程*
          getState | getListState | getReducingState | getMapState
public class Flink05_RichFunction {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);//        DataStreamSource<Event> ds = Flink06_EventSource.getEventSource(env);FileSource<String> fileSource = FileSource.<String>forRecordStreamFormat(new TextLineInputFormat(),new Path("input/enents.txt")).build();DataStreamSource<String> fileDs = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource");SingleOutputStreamOperator<Event> ds = fileDs.map(new MapFunction<String, Event>() {@Overridepublic Event map(String value) throws Exception {String[] valueArr = value.split("");return new Event(valueArr[0], valueArr[1], Long.valueOf(valueArr[2]));}});ds.print("input================");ds.map(new RichMapFunction<Event, WordCount>() {/*** 生命周期open方法,当前算子实例创建时执行一次,只执行一次* @param parameters The configuration containing the parameters attached to the contract.* @throws Exception*/@Overridepublic void open(Configuration parameters) throws Exception {System.out.println("创建Redis的连接对象");}@Overridepublic WordCount map(Event value) throws Exception {System.out.println("每条数据执行一次");return new WordCount(value.getUser(),1);}/*** 生命周期的close方法* 当前算子实例销毁时执行一次* @throws Exception*/@Overridepublic void close() throws Exception {System.out.println("关闭连接对象");}}).print("map");try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}

这篇关于Flink之DataStream API的转换算子的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/463724

相关文章

JAVA中整型数组、字符串数组、整型数和字符串 的创建与转换的方法

《JAVA中整型数组、字符串数组、整型数和字符串的创建与转换的方法》本文介绍了Java中字符串、字符数组和整型数组的创建方法,以及它们之间的转换方法,还详细讲解了字符串中的一些常用方法,如index... 目录一、字符串、字符数组和整型数组的创建1、字符串的创建方法1.1 通过引用字符数组来创建字符串1.2

Java将时间戳转换为Date对象的方法小结

《Java将时间戳转换为Date对象的方法小结》在Java编程中,处理日期和时间是一个常见需求,特别是在处理网络通信或者数据库操作时,本文主要为大家整理了Java中将时间戳转换为Date对象的方法... 目录1. 理解时间戳2. Date 类的构造函数3. 转换示例4. 处理可能的异常5. 考虑时区问题6.

基于C#实现将图片转换为PDF文档

《基于C#实现将图片转换为PDF文档》将图片(JPG、PNG)转换为PDF文件可以帮助我们更好地保存和分享图片,所以本文将介绍如何使用C#将JPG/PNG图片转换为PDF文档,需要的可以参考下... 目录介绍C# 将单张图片转换为PDF文档C# 将多张图片转换到一个PDF文档介绍将图片(JPG、PNG)转

【LabVIEW学习篇 - 21】:DLL与API的调用

文章目录 DLL与API调用DLLAPIDLL的调用 DLL与API调用 LabVIEW虽然已经足够强大,但不同的语言在不同领域都有着自己的优势,为了强强联合,LabVIEW提供了强大的外部程序接口能力,包括DLL、CIN(C语言接口)、ActiveX、.NET、MATLAB等等。通过DLL可以使用户很方便地调用C、C++、C#、VB等编程语言写的程序以及windows自带的大

如何更优雅地对接第三方API

如何更优雅地对接第三方API 本文所有示例完整代码地址:https://github.com/yu-linfeng/BlogRepositories/tree/master/repositories/third 我们在日常开发过程中,有不少场景会对接第三方的API,例如第三方账号登录,第三方服务等等。第三方服务会提供API或者SDK,我依稀记得早些年Maven还没那么广泛使用,通常要对接第三方

PDF 软件如何帮助您编辑、转换和保护文件。

如何找到最好的 PDF 编辑器。 无论您是在为您的企业寻找更高效的 PDF 解决方案,还是尝试组织和编辑主文档,PDF 编辑器都可以在一个地方提供您需要的所有工具。市面上有很多 PDF 编辑器 — 在决定哪个最适合您时,请考虑这些因素。 1. 确定您的 PDF 文档软件需求。 不同的 PDF 文档软件程序可以具有不同的功能,因此在决定哪个是最适合您的 PDF 软件之前,请花点时间评估您的

C# double[] 和Matlab数组MWArray[]转换

C# double[] 转换成MWArray[], 直接赋值就行             MWNumericArray[] ma = new MWNumericArray[4];             double[] dT = new double[] { 0 };             double[] dT1 = new double[] { 0,2 };

Java基础回顾系列-第五天-高级编程之API类库

Java基础回顾系列-第五天-高级编程之API类库 Java基础类库StringBufferStringBuilderStringCharSequence接口AutoCloseable接口RuntimeSystemCleaner对象克隆 数字操作类Math数学计算类Random随机数生成类BigInteger/BigDecimal大数字操作类 日期操作类DateSimpleDateForma

Restful API 原理以及实现

先说说API 再说啥是RESRFUL API之前,咱先说说啥是API吧。API大家应该都知道吧,简称接口嘛。随着现在移动互联网的火爆,手机软件,也就是APP几乎快爆棚了。几乎任何一个网站或者应用都会出一款iOS或者Android APP,相比网页版的体验,APP确实各方面性能要好很多。 那么现在问题来了。比如QQ空间网站,如果我想获取一个用户发的说说列表。 QQ空间网站里面需要这个功能。

京东物流查询|开发者调用API接口实现

快递聚合查询的优势 1、高效整合多种快递信息。2、实时动态更新。3、自动化管理流程。 聚合国内外1500家快递公司的物流信息查询服务,使用API接口查询京东物流的便捷步骤,首先选择专业的数据平台的快递API接口:物流快递查询API接口-单号查询API - 探数数据 以下示例是参考的示例代码: import requestsurl = "http://api.tanshuapi.com/a