【flink番外篇】1、flink的23种常用算子介绍及详细示例(1)- map、flatmap和filter

2023-12-05 05:20

本文主要是介绍【flink番外篇】1、flink的23种常用算子介绍及详细示例(1)- map、flatmap和filter,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Flink 系列文章

1、Flink 专栏等系列综合文章链接


文章目录

  • Flink 系列文章
  • 一、Flink的23种算子说明及示例
    • 1、maven依赖
    • 2、java bean
    • 3、map
    • 4、flatmap
    • 5、Filter


本文主要介绍Flink 的3种常用的operator(map、flatmap和filter)及以具体可运行示例进行说明.
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本文除了maven依赖外,没有其他依赖。

本专题分为五篇,即:
【flink番外篇】1、flink的23种常用算子介绍及详细示例(1)- map、flatmap和filter
【flink番外篇】1、flink的23种常用算子介绍及详细示例(2)- keyby、reduce和Aggregations
【flink番外篇】1、flink的23种常用算子介绍及详细示例(3)-window、distinct、join等
【flink番外篇】1、flink的23种常用算子介绍及详细示例(4)- union、window join、connect、outputtag、cache、iterator、project
【flink番外篇】1、flink的23种常用算子介绍及详细示例(完整版)

一、Flink的23种算子说明及示例

1、maven依赖

下文中所有示例都是用该maven依赖,除非有特殊说明的情况。

<properties><encoding>UTF-8</encoding><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><java.version>1.8</java.version><scala.version>2.12</scala.version><flink.version>1.17.0</flink.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><!-- 日志 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.7</version><scope>runtime</scope></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version><scope>runtime</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.2</version><scope>provided</scope></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.1.4</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.4</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>3.1.4</version></dependency></dependencies>

2、java bean

下文所依赖的User如下

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/*** @author alanchan**/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {private int id;private String name;private String pwd;private String email;private int age;private double balance;
}

3、map

[DataStream->DataStream]
这是最简单的转换之一,其中输入是一个数据流,输出的也是一个数据流。
在这里插入图片描述

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.datastreamapi.User;/*** @author alanchan**/
public class TestMapDemo {/*** @param args* @throws Exception*/public static void main(String[] args) throws Exception {// envStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// source// transformationmapFunction5(env);// sink// executeenv.execute();}// 构造一个list,然后将list中数字乘以2输出,内部匿名类实现public static void mapFunction1(StreamExecutionEnvironment env) throws Exception {List<Integer> data = new ArrayList<Integer>();for (int i = 1; i <= 10; i++) {data.add(i);}DataStreamSource<Integer> source = env.fromCollection(data);SingleOutputStreamOperator<Integer> sink = source.map(new MapFunction<Integer, Integer>() {@Overridepublic Integer map(Integer inValue) throws Exception {return inValue * 2;}});sink.print();
//		9> 12
//		4> 2
//		10> 14
//		8> 10
//		13> 20
//		7> 8
//		12> 18
//		11> 16
//		5> 4
//		6> 6}// 构造一个list,然后将list中数字乘以2输出,lambda实现public static void mapFunction2(StreamExecutionEnvironment env) throws Exception {List<Integer> data = new ArrayList<Integer>();for (int i = 1; i <= 10; i++) {data.add(i);}DataStreamSource<Integer> source = env.fromCollection(data);SingleOutputStreamOperator<Integer> sink = source.map(i -> 2 * i);sink.print();
//		3> 4
//		4> 6
//		9> 16
//		7> 12
//		10> 18
//		2> 2
//		6> 10
//		5> 8
//		8> 14
//		11> 20}// 构造User数据源public static DataStreamSource<User> source(StreamExecutionEnvironment env) {DataStreamSource<User> source = env.fromCollection(Arrays.asList(new User(1, "alan1", "1", "1@1.com", 12, 1000), new User(2, "alan2", "2", "2@2.com", 19, 200),new User(3, "alan1", "3", "3@3.com", 28, 1500), new User(5, "alan1", "5", "5@5.com", 15, 500), new User(4, "alan2", "4", "4@4.com", 30, 400)));return source;}// lambda实现用户对象的balance×2和age+5功能public static SingleOutputStreamOperator<User> mapFunction3(StreamExecutionEnvironment env) throws Exception {DataStreamSource<User> source = source(env);SingleOutputStreamOperator<User> sink = source.map((MapFunction<User, User>) user -> {User user2 = user;user2.setAge(user.getAge() + 5);user2.setBalance(user.getBalance() * 2);return user2;});sink.print();
//		10> User(id=1, name=alan1, pwd=1, email=1@1.com, age=17, balance=2000.0)
//		14> User(id=4, name=alan2, pwd=4, email=4@4.com, age=35, balance=800.0)
//		11> User(id=2, name=alan2, pwd=2, email=2@2.com, age=24, balance=400.0)
//		12> User(id=3, name=alan1, pwd=3, email=3@3.com, age=33, balance=3000.0)
//		13> User(id=5, name=alan1, pwd=5, email=5@5.com, age=20, balance=1000.0)return sink;}// lambda实现balance*2和age+5后,balance》=2000和age》=20的数据过滤出来public static SingleOutputStreamOperator<User> mapFunction4(StreamExecutionEnvironment env) throws Exception {SingleOutputStreamOperator<User> sink = mapFunction3(env).filter(user -> user.getBalance() >= 2000 && user.getAge() >= 20);sink.print();
//		15> User(id=1, name=alan1, pwd=1, email=1@1.com, age=17, balance=2000.0)
//		1> User(id=3, name=alan1, pwd=3, email=3@3.com, age=33, balance=3000.0)
//		16> User(id=2, name=alan2, pwd=2, email=2@2.com, age=24, balance=400.0)
//		3> User(id=4, name=alan2, pwd=4, email=4@4.com, age=35, balance=800.0)
//		2> User(id=5, name=alan1, pwd=5, email=5@5.com, age=20, balance=1000.0)
//		1> User(id=3, name=alan1, pwd=3, email=3@3.com, age=33, balance=3000.0)return sink;}// lambda实现balance*2和age+5后,balance》=2000和age》=20的数据过滤出来并通过flatmap收集public static SingleOutputStreamOperator<User> mapFunction5(StreamExecutionEnvironment env) throws Exception {SingleOutputStreamOperator<User> sink = mapFunction4(env).flatMap((FlatMapFunction<User, User>) (user, out) -> {if (user.getBalance() >= 3000) {out.collect(user);}}).returns(User.class);sink.print();
//		8> User(id=5, name=alan1, pwd=5, email=5@5.com, age=20, balance=1000.0)
//		7> User(id=3, name=alan1, pwd=3, email=3@3.com, age=33, balance=3000.0)
//		6> User(id=2, name=alan2, pwd=2, email=2@2.com, age=24, balance=400.0)
//		9> User(id=4, name=alan2, pwd=4, email=4@4.com, age=35, balance=800.0)
//		5> User(id=1, name=alan1, pwd=1, email=1@1.com, age=17, balance=2000.0)
//		7> User(id=3, name=alan1, pwd=3, email=3@3.com, age=33, balance=3000.0)
//		7> User(id=3, name=alan1, pwd=3, email=3@3.com, age=33, balance=3000.0)return sink;}}

4、flatmap

[DataStream->DataStream]
FlatMap 采用一条记录并输出零个,一个或多个记录。将集合中的每个元素变成一个或多个元素,并返回扁平化之后的结果。
在这里插入图片描述

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;/*** @author alanchan**/
public class TestFlatMapDemo {/*** @param args* @throws Exception*/public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();flatMapFunction3(env);env.execute();}// 构造User数据源public static DataStreamSource<String> source(StreamExecutionEnvironment env) {List<String> info = new ArrayList<>();info.add("i am alanchan");info.add("i like hadoop");info.add("i like flink");info.add("and you ?");DataStreamSource<String> dataSource = env.fromCollection(info);return dataSource;}// 将句子以空格进行分割-内部匿名类实现public static void flatMapFunction1(StreamExecutionEnvironment env) throws Exception {DataStreamSource<String> source = source(env);SingleOutputStreamOperator<String> sink = source.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] splits = value.split(" ");for (String split : splits) {out.collect(split);}}});sink.print();
//		11> and
//		10> i
//		8> i
//		9> i
//		8> am
//		10> like
//		11> you
//		10> flink
//		8> alanchan
//		9> like
//		11> ?
//		9> hadoop}// lambda实现public static void flatMapFunction2(StreamExecutionEnvironment env) throws Exception {DataStreamSource<String> source = source(env);SingleOutputStreamOperator<String> sink = source.flatMap((FlatMapFunction<String, String>) (input, out) -> {String[] splits = input.split(" ");for (String split : splits) {out.collect(split);}}).returns(String.class);sink.print();
//		6> i
//		8> and
//		8> you
//		8> ?
//		5> i
//		7> i
//		5> am
//		5> alanchan
//		6> like
//		7> like
//		6> hadoop
//		7> flink}// lambda实现public static void flatMapFunction3(StreamExecutionEnvironment env) throws Exception {DataStreamSource<String> source = source(env);SingleOutputStreamOperator<String> sink = source.flatMap((String input, Collector<String> out) -> Arrays.stream(input.split(" ")).forEach(out::collect)).returns(String.class);sink.print();
//		8> i
//		11> and
//		10> i
//		9> i
//		10> like
//		11> you
//		8> am
//		11> ?
//		10> flink
//		9> like
//		9> hadoop
//		8> alanchan}}

5、Filter

DataStream → DataStream
Filter 函数根据条件判断出结果。按照指定的条件对集合中的元素进行过滤,过滤出返回true/符合条件的元素。
在这里插入图片描述

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.datastreamapi.User;/*** @author alanchan**/
public class TestFilterDemo {// 构造User数据源public static DataStreamSource<User> sourceUser(StreamExecutionEnvironment env) {DataStreamSource<User> source = env.fromCollection(Arrays.asList(new User(1, "alan1", "1", "1@1.com", 12, 1000), new User(2, "alan2", "2", "2@2.com", 19, 200),new User(3, "alan1", "3", "3@3.com", 28, 1500), new User(5, "alan1", "5", "5@5.com", 15, 500), new User(4, "alan2", "4", "4@4.com", 30, 400)));return source;}// 构造User数据源public static DataStreamSource<Integer> sourceList(StreamExecutionEnvironment env) {List<Integer> data = new ArrayList<Integer>();for (int i = 1; i <= 10; i++) {data.add(i);}DataStreamSource<Integer> source = env.fromCollection(data);return source;}// 过滤出大于5的数字,内部匿名类public static void filterFunction1(StreamExecutionEnvironment env) throws Exception {DataStream<Integer> source = sourceList(env);SingleOutputStreamOperator<Integer> sink = source.map(new MapFunction<Integer, Integer>() {public Integer map(Integer value) throws Exception {return value + 1;}}).filter(new FilterFunction<Integer>() {@Overridepublic boolean filter(Integer value) throws Exception {return value > 5;}});sink.print();
//		1> 10
//		14> 7
//		16> 9
//		13> 6
//		2> 11
//		15> 8}// lambda实现public static void filterFunction2(StreamExecutionEnvironment env) throws Exception {DataStream<Integer> source = sourceList(env);SingleOutputStreamOperator<Integer> sink = source.map(i -> i + 1).filter(value -> value > 5);sink.print();
//		12> 7
//		15> 10
//		11> 6
//		13> 8
//		14> 9
//		16> 11}// 查询user id大于3的记录public static void filterFunction3(StreamExecutionEnvironment env) throws Exception {DataStream<User> source = sourceUser(env);SingleOutputStreamOperator<User> sink = source.filter(user -> user.getId() > 3);sink.print();
//		14> User(id=5, name=alan1, pwd=5, email=5@5.com, age=15, balance=500.0)
//		15> User(id=4, name=alan2, pwd=4, email=4@4.com, age=30, balance=400.0)}/*** @param args*/public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();filterFunction3(env);env.execute();}}

本文主要介绍Flink 的3种常用的operator及以具体可运行示例进行说明。
如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本专题分为五篇,即:
【flink番外篇】1、flink的23种常用算子介绍及详细示例(1)- map、flatmap和filter
【flink番外篇】1、flink的23种常用算子介绍及详细示例(2)- keyby、reduce和Aggregations
【flink番外篇】1、flink的23种常用算子介绍及详细示例(3)-window、distinct、join等
【flink番外篇】1、flink的23种常用算子介绍及详细示例(4)- union、window join、connect、outputtag、cache、iterator、project
【flink番外篇】1、flink的23种常用算子介绍及详细示例(完整版)

这篇关于【flink番外篇】1、flink的23种常用算子介绍及详细示例(1)- map、flatmap和filter的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/456306

相关文章

HarmonyOS学习(七)——UI(五)常用布局总结

自适应布局 1.1、线性布局(LinearLayout) 通过线性容器Row和Column实现线性布局。Column容器内的子组件按照垂直方向排列,Row组件中的子组件按照水平方向排列。 属性说明space通过space参数设置主轴上子组件的间距,达到各子组件在排列上的等间距效果alignItems设置子组件在交叉轴上的对齐方式,且在各类尺寸屏幕上表现一致,其中交叉轴为垂直时,取值为Vert

JS常用组件收集

收集了一些平时遇到的前端比较优秀的组件,方便以后开发的时候查找!!! 函数工具: Lodash 页面固定: stickUp、jQuery.Pin 轮播: unslider、swiper 开关: switch 复选框: icheck 气泡: grumble 隐藏元素: Headroom

性能测试介绍

性能测试是一种测试方法,旨在评估系统、应用程序或组件在现实场景中的性能表现和可靠性。它通常用于衡量系统在不同负载条件下的响应时间、吞吐量、资源利用率、稳定性和可扩展性等关键指标。 为什么要进行性能测试 通过性能测试,可以确定系统是否能够满足预期的性能要求,找出性能瓶颈和潜在的问题,并进行优化和调整。 发现性能瓶颈:性能测试可以帮助发现系统的性能瓶颈,即系统在高负载或高并发情况下可能出现的问题

水位雨量在线监测系统概述及应用介绍

在当今社会,随着科技的飞速发展,各种智能监测系统已成为保障公共安全、促进资源管理和环境保护的重要工具。其中,水位雨量在线监测系统作为自然灾害预警、水资源管理及水利工程运行的关键技术,其重要性不言而喻。 一、水位雨量在线监测系统的基本原理 水位雨量在线监测系统主要由数据采集单元、数据传输网络、数据处理中心及用户终端四大部分构成,形成了一个完整的闭环系统。 数据采集单元:这是系统的“眼睛”,

Hadoop数据压缩使用介绍

一、压缩原则 (1)运算密集型的Job,少用压缩 (2)IO密集型的Job,多用压缩 二、压缩算法比较 三、压缩位置选择 四、压缩参数配置 1)为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器 2)要在Hadoop中启用压缩,可以配置如下参数

【C++】_list常用方法解析及模拟实现

相信自己的力量,只要对自己始终保持信心,尽自己最大努力去完成任何事,就算事情最终结果是失败了,努力了也不留遗憾。💓💓💓 目录   ✨说在前面 🍋知识点一:什么是list? •🌰1.list的定义 •🌰2.list的基本特性 •🌰3.常用接口介绍 🍋知识点二:list常用接口 •🌰1.默认成员函数 🔥构造函数(⭐) 🔥析构函数 •🌰2.list对象

常用的jdk下载地址

jdk下载地址 安装方式可以看之前的博客: mac安装jdk oracle 版本:https://www.oracle.com/java/technologies/downloads/ Eclipse Temurin版本:https://adoptium.net/zh-CN/temurin/releases/ 阿里版本: github:https://github.com/

安卓链接正常显示,ios#符被转义%23导致链接访问404

原因分析: url中含有特殊字符 中文未编码 都有可能导致URL转换失败,所以需要对url编码处理  如下: guard let allowUrl = webUrl.addingPercentEncoding(withAllowedCharacters: .urlQueryAllowed) else {return} 后面发现当url中有#号时,会被误伤转义为%23,导致链接无法访问

30常用 Maven 命令

Maven 是一个强大的项目管理和构建工具,它广泛用于 Java 项目的依赖管理、构建流程和插件集成。Maven 的命令行工具提供了大量的命令来帮助开发人员管理项目的生命周期、依赖和插件。以下是 常用 Maven 命令的使用场景及其详细解释。 1. mvn clean 使用场景:清理项目的生成目录,通常用于删除项目中自动生成的文件(如 target/ 目录)。共性规律:清理操作

图神经网络模型介绍(1)

我们将图神经网络分为基于谱域的模型和基于空域的模型,并按照发展顺序详解每个类别中的重要模型。 1.1基于谱域的图神经网络         谱域上的图卷积在图学习迈向深度学习的发展历程中起到了关键的作用。本节主要介绍三个具有代表性的谱域图神经网络:谱图卷积网络、切比雪夫网络和图卷积网络。 (1)谱图卷积网络 卷积定理:函数卷积的傅里叶变换是函数傅里叶变换的乘积,即F{f*g}