Flink之DataStream API的转换算子

2023-12-06 23:20

本文主要是介绍Flink之DataStream API的转换算子,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

简单转换算子

函数的实现方式

  1. 自定义类,实现函数接口:编码麻烦,使用灵活
  2. 匿名内部类:编码简单
  3. Lambda:编码简洁
public class Flink02_FunctionImplement {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);env.socketTextStream("hadoop102",8888).flatMap((String line, Collector<Tuple2<String, Integer>> out)->{String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word,1));}}).returns(Types.TUPLE(Types.STRING,Types.INT)).keyBy(0).sum(1).print();try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}public static class MyFlatMapFunction implements FlatMapFunction<String, Tuple2<String,Integer>> {private String Operator;@Overridepublic void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {String[] words = line.split(" ");for (String word : words) {out.collect(Tuple2.of(word,1));}}}
}

Reduce规约聚合

  1. reduce:规约聚合
    • 聚合的原理:两两聚合,上一次的聚合值与本次新到的值进行聚合
    • 泛型 T : 流中的数据类型, 从方法声明中可以看到,输入输出类型一直
    • 方法: T reduce(T value1, T value2) throws Exception
      • value1:上一次的聚合值
      • value2:本次新到的值
public class Flink04_ReduceAggOpterator {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);DataStreamSource<Event> ds = Flink06_EventSource.getEventSource(env);ds.print("input================");//reduce:每个用户的点击次数ds.map(event->new WordCount(event.getUser(),1)).keyBy(WordCount::getWord).reduce(new ReduceFunction<WordCount>() {/**** @param value1 上次聚合的结果,第一个数据不参与聚合,直接输出* @param value2 新来的值* @return* @throws Exception*/@Overridepublic WordCount reduce(WordCount value1, WordCount value2) throws Exception {System.out.println("测试");return new WordCount(value1.getWord(),value1.getCount()+value2.getCount());}}).print("reduce");try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}

富函数

调用算子的时候,需要传入对应的用户自定义函数来完成具体的功能

  • 函数:
    • 普通函数
      Map
      filter
      flatMap
      reduce
      富函数:基本上每个普通函数都有对应的富函数
      统一接口interface RichFunction extends Function
      具体使用的富函数类:
      - RichMapFunction
      - RichFilterFunction
      - RichFlatMapFunction
      - RichReduceFunction
      - …
      富函数功能:
      • 生命周期方法:
        • open(): 当前算子的每个并行子任务的实例创建时会调用一次
        • close():当前算子的每个并行子任务的实例销毁时(有界流),调用一次
      • 获取运行时上下文对象 getRuntimeContext
        • 可以获取当前作业,当前task的相关信息
        • 获取不同类型的状态,进行状态编程*
          getState | getListState | getReducingState | getMapState
public class Flink05_RichFunction {public static void main(String[] args) {//1.创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();//默认是最大并行度env.setParallelism(1);//        DataStreamSource<Event> ds = Flink06_EventSource.getEventSource(env);FileSource<String> fileSource = FileSource.<String>forRecordStreamFormat(new TextLineInputFormat(),new Path("input/enents.txt")).build();DataStreamSource<String> fileDs = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource");SingleOutputStreamOperator<Event> ds = fileDs.map(new MapFunction<String, Event>() {@Overridepublic Event map(String value) throws Exception {String[] valueArr = value.split("");return new Event(valueArr[0], valueArr[1], Long.valueOf(valueArr[2]));}});ds.print("input================");ds.map(new RichMapFunction<Event, WordCount>() {/*** 生命周期open方法,当前算子实例创建时执行一次,只执行一次* @param parameters The configuration containing the parameters attached to the contract.* @throws Exception*/@Overridepublic void open(Configuration parameters) throws Exception {System.out.println("创建Redis的连接对象");}@Overridepublic WordCount map(Event value) throws Exception {System.out.println("每条数据执行一次");return new WordCount(value.getUser(),1);}/*** 生命周期的close方法* 当前算子实例销毁时执行一次* @throws Exception*/@Overridepublic void close() throws Exception {System.out.println("关闭连接对象");}}).print("map");try {env.execute();} catch (Exception e) {throw new RuntimeException(e);}}
}

这篇关于Flink之DataStream API的转换算子的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/463724

相关文章

Java对象转换的实现方式汇总

《Java对象转换的实现方式汇总》:本文主要介绍Java对象转换的多种实现方式,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录Java对象转换的多种实现方式1. 手动映射(Manual Mapping)2. Builder模式3. 工具类辅助映

python实现svg图片转换为png和gif

《python实现svg图片转换为png和gif》这篇文章主要为大家详细介绍了python如何实现将svg图片格式转换为png和gif,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录python实现svg图片转换为png和gifpython实现图片格式之间的相互转换延展:基于Py

C#实现将Excel表格转换为图片(JPG/ PNG)

《C#实现将Excel表格转换为图片(JPG/PNG)》Excel表格可能会因为不同设备或字体缺失等问题,导致格式错乱或数据显示异常,转换为图片后,能确保数据的排版等保持一致,下面我们看看如何使用C... 目录通过C# 转换Excel工作表到图片通过C# 转换指定单元格区域到图片知识扩展C# 将 Excel

C++使用printf语句实现进制转换的示例代码

《C++使用printf语句实现进制转换的示例代码》在C语言中,printf函数可以直接实现部分进制转换功能,通过格式说明符(formatspecifier)快速输出不同进制的数值,下面给大家分享C+... 目录一、printf 原生支持的进制转换1. 十进制、八进制、十六进制转换2. 显示进制前缀3. 指

使用Python开发一个带EPUB转换功能的Markdown编辑器

《使用Python开发一个带EPUB转换功能的Markdown编辑器》Markdown因其简单易用和强大的格式支持,成为了写作者、开发者及内容创作者的首选格式,本文将通过Python开发一个Markd... 目录应用概览代码结构与核心组件1. 初始化与布局 (__init__)2. 工具栏 (setup_t

springboot项目中常用的工具类和api详解

《springboot项目中常用的工具类和api详解》在SpringBoot项目中,开发者通常会依赖一些工具类和API来简化开发、提高效率,以下是一些常用的工具类及其典型应用场景,涵盖Spring原生... 目录1. Spring Framework 自带工具类(1) StringUtils(2) Coll

Java中Date、LocalDate、LocalDateTime、LocalTime、时间戳之间的相互转换代码

《Java中Date、LocalDate、LocalDateTime、LocalTime、时间戳之间的相互转换代码》:本文主要介绍Java中日期时间转换的多种方法,包括将Date转换为LocalD... 目录一、Date转LocalDateTime二、Date转LocalDate三、LocalDateTim

Python实现AVIF图片与其他图片格式间的批量转换

《Python实现AVIF图片与其他图片格式间的批量转换》这篇文章主要为大家详细介绍了如何使用Pillow库实现AVIF与其他格式的相互转换,即将AVIF转换为常见的格式,比如JPG或PNG,需要的小... 目录环境配置1.将单个 AVIF 图片转换为 JPG 和 PNG2.批量转换目录下所有 AVIF 图

详解如何通过Python批量转换图片为PDF

《详解如何通过Python批量转换图片为PDF》:本文主要介绍如何基于Python+Tkinter开发的图片批量转PDF工具,可以支持批量添加图片,拖拽等操作,感兴趣的小伙伴可以参考一下... 目录1. 概述2. 功能亮点2.1 主要功能2.2 界面设计3. 使用指南3.1 运行环境3.2 使用步骤4. 核

Java实现时间与字符串互相转换详解

《Java实现时间与字符串互相转换详解》这篇文章主要为大家详细介绍了Java中实现时间与字符串互相转换的相关方法,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、日期格式化为字符串(一)使用预定义格式(二)自定义格式二、字符串解析为日期(一)解析ISO格式字符串(二)解析自定义