本文主要是介绍Flink DataSet广播变量,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
专栏原创出处:github-源笔记文件 ,github-源码 ,欢迎 Star,转载请附上原文出处链接和本声明。
本节内容对应官方文档 ,本节内容对应示例源码
DataSet广播变量
重要信息:一台计算机上的并行任务之间共享广播变量数据结构。修改其内部状态的任何访问都需要由调用者手动同步
示例代码:
/** 广播变量** @author Li.Wei by 2019/11/4*/
object Broadcast extends BatchExecutionEnvironmentApp {// 用户登录数据 DataSetval userLoginDs = DataSet.userLogin(this)// 角色登录数据 DataSet 对应用户 ID,去重val roleLoginDs = DataSet.roleLogin(this).map(_.uid).distinct()userLoginDs.map(new MyBroadcastMap()).withBroadcastSet(roleLoginDs, "roleLoginDataSet") // 将数据集作为广播集.first(10).withForwardedFields().print()/* print(none,LOGOUT)(2|2946,LOGIN)(0|1082,LOGOUT)(2|2892,LOGOUT)(none,LOGIN)(2|1835,LOGIN)(none,LOGOUT)(none,LOGOUT)(0|489,LOGOUT)(none,LOGOUT)*/
}/*** 自定义 map 实现函数,[[RichMapFunction]] 中可获取 flink 上下文及执行前后的打开关闭操作*/
class MyBroadcastMap extends RichMapFunction[UserLogin, (String, String)] {var broadcastSet: Traversable[String] = _ // 声明广播变量override def open(config: Configuration): Unit = {// 赋值广播变量import scala.collection.JavaConverters._broadcastSet = getRuntimeContext.getBroadcastVariable[String]("roleLoginDataSet").asScala}// 判断当前用户对应的 ID 在该用户对应角色中是否登录过override def map(value: UserLogin): (String, String) =if (broadcastSet.exists(_ == value.uid)) (value.uid, value.status) else ("none", value.status)}
这篇关于Flink DataSet广播变量的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!