Flink DataSet广播变量

2024-06-03 13:48
文章标签 广播 变量 flink dataset

本文主要是介绍Flink DataSet广播变量,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

专栏原创出处:github-源笔记文件 ,github-源码 ,欢迎 Star,转载请附上原文出处链接和本声明。
本节内容对应官方文档 ,本节内容对应示例源码

DataSet广播变量

重要信息:一台计算机上的并行任务之间共享广播变量数据结构。修改其内部状态的任何访问都需要由调用者手动同步

示例代码:

/** 广播变量** @author Li.Wei by 2019/11/4*/
object Broadcast extends BatchExecutionEnvironmentApp {// 用户登录数据 DataSetval userLoginDs = DataSet.userLogin(this)// 角色登录数据 DataSet 对应用户 ID,去重val roleLoginDs = DataSet.roleLogin(this).map(_.uid).distinct()userLoginDs.map(new MyBroadcastMap()).withBroadcastSet(roleLoginDs, "roleLoginDataSet") // 将数据集作为广播集.first(10).withForwardedFields().print()/* print(none,LOGOUT)(2|2946,LOGIN)(0|1082,LOGOUT)(2|2892,LOGOUT)(none,LOGIN)(2|1835,LOGIN)(none,LOGOUT)(none,LOGOUT)(0|489,LOGOUT)(none,LOGOUT)*/
}/*** 自定义 map 实现函数,[[RichMapFunction]] 中可获取 flink 上下文及执行前后的打开关闭操作*/
class MyBroadcastMap extends RichMapFunction[UserLogin, (String, String)] {var broadcastSet: Traversable[String] = _ // 声明广播变量override def open(config: Configuration): Unit = {// 赋值广播变量import scala.collection.JavaConverters._broadcastSet = getRuntimeContext.getBroadcastVariable[String]("roleLoginDataSet").asScala}// 判断当前用户对应的 ID 在该用户对应角色中是否登录过override def map(value: UserLogin): (String, String) =if (broadcastSet.exists(_ == value.uid)) (value.uid, value.status) else ("none", value.status)}

这篇关于Flink DataSet广播变量的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1027161

相关文章

浅析Rust多线程中如何安全的使用变量

《浅析Rust多线程中如何安全的使用变量》这篇文章主要为大家详细介绍了Rust如何在线程的闭包中安全的使用变量,包括共享变量和修改变量,文中的示例代码讲解详细,有需要的小伙伴可以参考下... 目录1. 向线程传递变量2. 多线程共享变量引用3. 多线程中修改变量4. 总结在Rust语言中,一个既引人入胜又可

java如何调用kettle设置变量和参数

《java如何调用kettle设置变量和参数》文章简要介绍了如何在Java中调用Kettle,并重点讨论了变量和参数的区别,以及在Java代码中如何正确设置和使用这些变量,避免覆盖Kettle中已设置... 目录Java调用kettle设置变量和参数java代码中变量会覆盖kettle里面设置的变量总结ja

Perl 特殊变量详解

《Perl特殊变量详解》Perl语言中包含了许多特殊变量,这些变量在Perl程序的执行过程中扮演着重要的角色,:本文主要介绍Perl特殊变量,需要的朋友可以参考下... perl 特殊变量Perl 语言中包含了许多特殊变量,这些变量在 Perl 程序的执行过程中扮演着重要的角色。特殊变量通常用于存储程序的

变量与命名

引言         在前两个课时中,我们已经了解了 Python 程序的基本结构,学习了如何正确地使用缩进来组织代码,并且知道了注释的重要性。现在我们将进一步深入到 Python 编程的核心——变量与命名。变量是我们存储数据的主要方式,而合理的命名则有助于提高代码的可读性和可维护性。 变量的概念与使用         在 Python 中,变量是一种用来存储数据值的标识符。创建变量很简单,

HTML5自定义属性对象Dataset

原文转自HTML5自定义属性对象Dataset简介 一、html5 自定义属性介绍 之前翻译的“你必须知道的28个HTML5特征、窍门和技术”一文中对于HTML5中自定义合法属性data-已经做过些介绍,就是在HTML5中我们可以使用data-前缀设置我们需要的自定义属性,来进行一些数据的存放,例如我们要在一个文字按钮上存放相对应的id: <a href="javascript:" d

Flink任务重启策略

概述 Flink支持不同的重启策略,以在故障发生时控制作业如何重启集群在启动时会伴随一个默认的重启策略,在没有定义具体重启策略时会使用该默认策略。如果在工作提交时指定了一个重启策略,该策略会覆盖集群的默认策略默认的重启策略可以通过 Flink 的配置文件 flink-conf.yaml 指定。配置参数 restart-strategy 定义了哪个策略被使用。常用的重启策略: 固定间隔 (Fixe

JS_变量

二、JS的变量 JS中的变量具有如下特征 1 弱类型变量,可以统一声明成var 2 var声明的变量可以再次声明 3 变量可以使用不同的数据类型多次赋值 4 JS的语句可以以; 结尾,也可以不用;结尾 5 变量标识符严格区分大小写 6 标识符的命名规则参照JAVA 7 如果使用了 一个没有声明的变量,那么运行时会报uncaught ReferenceError: *** is not de

使用条件变量实现线程同步:C++实战指南

使用条件变量实现线程同步:C++实战指南 在多线程编程中,线程同步是确保程序正确性和稳定性的关键。条件变量(condition variable)是一种强大的同步原语,用于在线程之间进行协调,避免数据竞争和死锁。本文将详细介绍如何在C++中使用条件变量实现线程同步,并提供完整的代码示例和详细的解释。 什么是条件变量? 条件变量是一种同步机制,允许线程在某个条件满足之前进入等待状态,并在条件满

axure之变量

一、设置我们的第一个变量 1、点击axure上方设置一个全局变量a = 3 2、加入按钮、文本框元件点击按钮文档框展示变量值。 交互选择【单击时】【设置文本】再点击函数。 点击插入变量和函数直接选择刚刚定义的全局变量,也可以直接手动写入函数(注意写入格式。) 这样点击按钮时就直接展示刚刚设置的全局变量3了。 2、更改变量值 在新建交互里点击设置变量值。 将a变量设置成等于10. 将新

论文精读-Supervised Raw Video Denoising with a Benchmark Dataset on Dynamic Scenes

论文精读-Supervised Raw Video Denoising with a Benchmark Dataset on Dynamic Scenes 优势 1、构建了一个用于监督原始视频去噪的基准数据集。为了多次捕捉瞬间,我们手动为对象s创建运动。在高ISO模式下捕获每一时刻的噪声帧,并通过对多个噪声帧进行平均得到相应的干净帧。 2、有效的原始视频去噪网络(RViDeNet),通过探