本文主要是介绍快速入门Flink (6) —— Flink的广播变量、累加器与分布式缓存,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
写在前面: 博主是一名大数据的初学者,昵称来源于《爱丽丝梦游仙境》中的Alice和自己的昵称。作为一名互联网小白,
写博客一方面是为了记录自己的学习历程,一方面是希望能够帮助到很多和自己一样处于起步阶段的萌新
。由于水平有限,博客中难免会有一些错误,有纰漏之处恳请各位大佬不吝赐教!个人小站:http://alices.ibilibili.xyz/ , 博客主页:https://alice.blog.csdn.net/
尽管当前水平可能不及各位大佬,但我还是希望自己能够做得更好,因为一天的生活就是一生的缩影
。我希望在最美的年华,做最好的自己
!
通过快速入门Flink系列的(1-5)篇博客,博主已经为大家介绍了一些Flink中常见的概念与一些基础的操作,感兴趣的朋友们可以收藏一下菌哥的Flink专栏哟(👉快速入门Flink)。本篇博客,博主为大家介绍的是Flink的广播变量、累加器与分布式缓存。
码字不易,先赞后看
文章目录
- 1.5 Flink的广播变量
- 1.6 Flink的分布式缓存
- 1.7 Flink Accumulators & Counters
- 小结
1.5 Flink的广播变量
Flink支持广播变量,就是将数据广播到具体的 taskmanager 上,数据存储在内存中, 这样可以减缓大量的 shuffle 操作; 比如在数据 join 阶段,不可避免的就是大量的 shuffle 操作,我们可以把其中一个 dataSet 广播出去,一直加载到 taskManager 的内存 中,可以直接在内存中拿数据,避免了大量的 shuffle, 导致集群性能下降; 广播变量创建后,它可以运行在集群中的任何 function 上,而不需要多次传递给集群节点。另外需要 记住,不应该修改广播变量,这样才能确保每个节点获取到的值都是一致的。
一句话解释,可以理解为是一个公共的共享变量,我们可以把一个 dataset 数据集广播出去, 然后不同的 task 在节点上都能够获取到,这个数据在每个节点上只会存在一份。 如果不使用 broadcast,则在每个节点中的每个 task 中都需要拷贝一份 dataset 数据集, 比较浪费内存(也就是一个节点中可能会存在多份 dataset 数据)。
注意:因为广播变量是要把 dataset 广播到内存中,所以广播的数据量不能太大,否则会出现OOM这样的问题。
- Broadcast:Broadcast 是通过 withBroadcastSet(dataset,string)来注册的
- Access:通过 getRuntimeContext().getBroadcastVariable(String)访问广播变量
让我们来通过一张图来感受下,使用广播变量和不使用广播变量,到底差在哪里。
小结一下:
■ 可以理解广播就是一个公共的共享变量
■ 将一个数据集广播后,不同的Task 都可以在节点上获取到
■ 每个节点只存一份
■ 如果不使用广播,每一个 Task 都会拷贝一份数据集,造成内存资源浪费
用法:
在需要使用广播的操作后,使用withBroadcastSet 创建广播
在操作中,使用 getRuntimeContext.getBroadcastVariable [广播数据类型] ( 广播名 ) 获取广播变量
示例:
创建一个学生数据集,包含以下数据:
|学生 ID | 姓名 |
|------|------|
List((1, “张三”), (2, “李四”), (3, “王五”))
将该数据,发布到广播。再创建一个 成绩 数据集。
|学生 ID | 学科 | 成绩 |
|------|------|-----|
List( (1, “语文”, 50),(2, “数学”, 70), (3, “英文”, 86))
请通过广播获取到学生姓名,将数据转换为
List( (“张三”, “语文”, 50),(“李四”, “数学”, 70), (“王五”, “英文”, 86))
步骤
1) 获取批处理运行环境
2) 分别创建两个数据集
3) 使用 RichMapFunction 对 成绩 数据集进行 map 转换
4) 在数据集调用 map 方法后,调用 withBroadcastSet 将 学生 数据集创建广播
5) 实现 RichMapFunction
a. 将成绩数据(学生 ID,学科,成绩) -> (学生姓名,学科,成绩)
b. 重写 open 方法中,获取广播数据
c. 导入 scala.collection.JavaConverters._ 隐式转换
d. 将广播数据使用 asScala 转换为 Scala 集合,再使用 toList 转换为 scala List 集合
e. 在 map 方法中使用广播进行转换
6) 打印测试
参考代码
import java.utilimport org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
import org.apache.flink.api.scala._
/** @Author: Alice菌* @Date: 2020/8/1 20:30* @Description: */
object BatchBroadcastDemo {def main(args: Array[String]): Unit = {// 1、创建批处理运行环境val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment// 2、分别创建两个数据集// 创建学生数据集val stuDataSet: DataSet[(Int, String)] = env.fromCollection(List((1, "张三"), (2, "李四"), (3, "王五")))// 创建成绩数据集val scoreDataSet: DataSet[(Int, String, Int)] = env.fromCollection(List((1, "语文", 50), (2, "数学", 70), (3, "英文", 86)))// 3、使用RichMapFunction 对成绩数据集进行map转换// 返回值类型(学生名字,学科称名,成绩)val result: DataSet[(String, String, Int)] = scoreDataSet.map(new RichMapFunction[(Int, String, Int), (String, String, Int)] {// 定义获取学生数据集的集合var studentMap: Map[Int, String] = _// 初始化的时候被执行一次,在对象的生命周期中只被执行一次override def open(parameters: Configuration): Unit = {// 因为获取到的广播变量中的数据类型是java的集合类型,但是我们的代码是scala,因此需要将java的集合转换成scala的集合// 我们这里将list转换成了map对象,之所以能够转换是因为list中的元素是对偶元组,因此可以转换成 kv 键值对类型// 之所以要转换,是因为后面好用,传递一个学生id,可以直接获取到学生的名字import scala.collection.JavaConversions._// 获取到广播变量的内容val studentList: util.List[(Int, String)] = getRuntimeContext.getBroadcastVariable[(Int, String)]("student")studentMap = studentList.toMap}// 要对集合中的每个元素执行map操作,也就是说集合中有多少元素,就被执行多少次override def map(value: (Int, String, Int)): (String, String, Int) = {//(Int, String, Int)=》(学生id,学科名字,学生成绩)//返回值类型(学生名字,学科名,成绩)val stuId: Int = value._1val stuName: String = studentMap.getOrElse(stuId, "")//(学生名字,学科名,成绩)(stuName, value._2, value._3)}}).withBroadcastSet(stuDataSet,"student")result.print()//(张三,语文,50)//(李四,数学,70)//(王五,英文,86)}
}
1.6 Flink的分布式缓存
Flink 提供了一个类似于 Hadoop 的分布式缓存,让并行运行实例的函数可以在本地访 问。这 个功能可以被使用来分享外部静态的数据,例如:机器学习的逻辑回归模型等! 缓存的使用流程:
使用 ExecutionEnvironment实例对本地的或者远程的文件(例如:HDFS 上的文件),为缓存文件指定一个名字注册该缓存文件!当程序执行时候,Flink 会自动将复制文件或者目录到所有 worker 节点的本地文件系统中,函数可以根据名字去该节点的本地文件系统中检索该文件!
【注意】广播是将变量分发到各个 worker 节点的内存上,分布式缓存是将文件缓存到各个 worker 节点上;
用法
- 使用 Flink 运行时环境的 registerCachedFile
- 在操作中,使用 getRuntimeContext.getDistributedCache.getFile ( 文件名 )获取分布式缓存
示例
创建一个成绩数据集
List( (1, “语文”, 50),(2, “数学”, 70), (3, “英文”, 86))
请通过分布式缓存获取到学生姓名,将数据转换为
List( (“张三”, “语文”, 50),(“李四”, “数学”, 70), (“王五”, “英文”, 86))
注: distribute_cache_student 测试文件保存了学生 ID 以及学生姓名
1,张三
2,李四
3,王五
操作步骤
-
将 distribute_cache_student 文件上传到 HDFS / 目录下
-
获取批处理运行环境
-
创建成绩数据集
-
对成绩数据集进行 map 转换,将(学生 ID, 学科, 分数)转换为(学生姓名,学科, 分数)
a. RichMapFunction 的 open 方法中,获取分布式缓存数据
b. 在 map 方法中进行转换
- 实现 open 方法
a. 使用 getRuntimeContext.getDistributedCache.getFile 获取分布式缓存文件
b. 使用 Scala.fromFile 读取文件,并获取行
c. 将文本转换为元组(学生 ID,学生姓名),再转换为 List
- 实现 map 方法
a. 从分布式缓存中根据学生 ID 过滤出来学生
b. 获取学生姓名
c. 构建最终结果元组
参考代码
import java.io.Fileimport org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.Configuration
import org.apache.flink.api.scala._import scala.io.Source
/** @Author: Alice菌* @Date: 2020/8/1 22:40* @Description: */
object BatchDisCachedFile {def main(args: Array[String]): Unit = {// 获取批处理运行环境val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment// 注册一个分布式缓存env.registerCachedFile("hdfs://node01:8020/test/input/distribute_cache_student","student")/*1,张三2,李四3,王五*/// 创建成绩数据集val scoreDataSet: DataSet[(Int, String, Int)] = env.fromCollection(List((1, "语文", 50), (2, "数学", 70), (3, "英文", 86)))val resultDataSet: DataSet[(String, String, Int)] = scoreDataSet.map(new RichMapFunction[(Int, String, Int), (String, String, Int)] {var studentMap: Map[Int, String] = _// 初始化的时候被调用一次override def open(parameters: Configuration): Unit = {// 获取分布式缓存的文件val studentFile: File = getRuntimeContext.getDistributedCache.getFile("student")val linesIter: Iterator[String] = Source.fromFile(studentFile).getLines()studentMap = linesIter.map(lines => {val words: Array[String] = lines.split(",")(words(0).toInt, words(1))}).toMap}override def map(value: (Int, String, Int)): (String, String, Int) = {val stuName: String = studentMap.getOrElse(value._1, "")(stuName, value._2, value._3)}})// 输出结果resultDataSet.print()//(张三,语文,50)//(李四,数学,70)//(王五,英文,86)}
}
1.7 Flink Accumulators & Counters
Accumulator 即累加器,与 Mapreduce counter 的应用场景差不多,都能很好地观察 task 在运行期间的数据变化 可以在 Flink job 任务中的算子函数中操作累加器,但是只能在任务执行结束之后才能获得累加器的最终结果。
Counter 是一个具体的累加器 (Accumulator) ,我们可以实 现 IntCounter, LongCounter 和 DoubleCounter。
步骤
1) 创建累加器
private IntCounter numLines = new IntCounter();
2) 注册累加器
getRuntimeContext().addAccumulator("num-lines", this.numLines);
3) 使用累加器
this.numLines.add(1);
4) 获取累加器的结果
myJobExecutionResult.getAccumulatorResult("num-lines")
示例:
需求: 给定一个数据源 “a”,“b”,“c”,“d” 通过累加器打印出多少个元素
参考代码
import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.common.accumulators.IntCounter
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala._
import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.FileSystem.WriteMode/** @Author: Alice菌* @Date: 2020/8/1 23:26* @Description: */
object BatchCounterDemo {def main(args: Array[String]): Unit = {//1、创建执行环境val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment//2、创建执行环境val sourceDataSet: DataSet[String] = env.fromElements("a","b","c","d")//3、对sourceDataSet 进行map操作val resultDataSet: DataSet[String] = sourceDataSet.map(new RichMapFunction[String, String] {// 创建累加器val counter: IntCounter = new IntCounter// 初始化的时候执行一次override def open(parameters: Configuration): Unit = {// 注册累加器getRuntimeContext.addAccumulator("MyAccumulator", this.counter)}// 初始化的时候被执行一次override def map(value: String): String = {counter.add(1)value}})resultDataSet.writeAsText("data/output/Accumulators",WriteMode.OVERWRITE)val result: JobExecutionResult = env.execute("BatchCounterDemo")val MyAccumlatorValue: Int = result.getAccumulatorResult[Int]("MyAccumulator")println("累加器的值:"+MyAccumlatorValue)//累加器的值:4}
}
Flink Broadcast 和 Accumulators 的区别:
Broadcast(广播变量)允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量。广播变量可以进行共享,但是不可以进行修改
Accumulators(累加器)是可以在不同任务中对同一个变量进行累加操作。
小结
本篇博客所讲述的内容,与前几篇博客相比,就有点类似于拓展的感觉。大家对于新的知识点一定要在理解的程度上再去进行复习回顾,而不是单纯地靠硬记。
如果以上过程中出现了任何的纰漏错误,烦请大佬们指正😅
受益的朋友或对大数据技术感兴趣的伙伴记得点赞关注支持一波🙏
希望我们都能在学习的道路上越走越远😉
这篇关于快速入门Flink (6) —— Flink的广播变量、累加器与分布式缓存的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!