本文主要是介绍Flink DataSet分布式缓冲,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
专栏原创出处:github-源笔记文件 ,github-源码 ,欢迎 Star,转载请附上原文出处链接和本声明。
本节内容对应官方文档 ,本节内容对应示例源码
[[toc]]
DataSet 分布式缓冲
Flink 提供了一个分布式缓存,类似于 hadoop,可以使用户在并行函数中很方便的读取本地文件,并把它放在 taskManager 节点中,防止 task 重复拉取。
执行机制如下:
程序注册一个文件或者目录 (本地或者远程文件系统,例如 hdfs 或者 s3),通过 ExecutionEnvironment
注册缓存文件并为它起一个名称
当程序执行,Flink 自动将文件或者目录复制到所有 taskManager 节点的本地文件系统,仅会执行一次。
用户可以通过这个指定的名称查找文件或者目录,然后从 taskManager 节点的本地文件系统访问它。
示例代码:
/** 分布式缓存* =示例说明=* 分布式载入用户 ID 黑名单文件,针对用户登录数据匹配在黑名单 ID 及对应登录状态** @author Li.Wei by 2019/11/4*/
object DistributedCache extends BatchExecutionEnvironmentApp {private val path = getClass.getClassLoader.getResource("data/game/blacklist-uid.txt").getPathbEnv.registerCachedFile(path, "blacklist-uid", executable = false)// 用户登录数据 DataSetval userLoginDataSet = DataSet.userLogin(this)import org.apache.flink.api.scala.extensions._ // use filterWithuserLoginDataSet.map(new BlacklistMap()).filterWith(_._1 != "none").distinct().print()}class BlacklistMap extends RichMapFunction[UserLogin, (String, String)] {var source: BufferedSource = _ // 读取文件流,函数结束后执行关闭操作var blackUid: Seq[String] = _ // 黑名单数据,从分布式缓冲文件中载入override def open(config: Configuration): Unit = {val file: File = getRuntimeContext.getDistributedCache.getFile("blacklist-uid")import scala.io.Sourcesource = Source.fromFile(file, "UTF-8")blackUid = source.getLines().toSeq}// 判断当前用户对应的 ID 在该用户对应角色中是否登录过override def map(value: UserLogin): (String, String) =if (blackUid.contains(value.uid)) (value.uid, value.status) else ("none", value.status)override def close(): Unit = source.close()
}
这篇关于Flink DataSet分布式缓冲的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!