9、Flink 用户自定义 Functions 及 累加器详解

2024-05-01 18:36

本文主要是介绍9、Flink 用户自定义 Functions 及 累加器详解,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1)用户自定义函数
1.实现接口

最基本的方法是实现提供的接口。

# 根据提供的接口创建自定义函数
class MyMapFunction implements MapFunction<String, Integer> {public Integer map(String value) { return Integer.parseInt(value); }
}# 调用创建的自定义函数
data.map(new MyMapFunction());
2.匿名类

可以将 function 当做匿名类传递。

data.map(new MapFunction<String, Integer> () {public Integer map(String value) { return Integer.parseInt(value); }
});
3.Java 8 Lambdas

Flink 在 Java API 中还支持 Java 8 Lambdas 表达式。

data.filter(s -> s.startsWith("http://"));data.reduce((i1,i2) -> i1 + i2);
4.Rich functions

所有需要用户自定义 function 的转化操作都可以将 rich function 作为参数。

class MyMapFunction implements MapFunction<String, Integer> {public Integer map(String value) { return Integer.parseInt(value); }
}

替换成

class MyMapFunction extends RichMapFunction<String, Integer> {public Integer map(String value) { return Integer.parseInt(value); }
}

并将 function 照常传递给 map transformation。

data.map(new MyMapFunction());

Rich functions 也可以定义成匿名类:

data.map (new RichMapFunction<String, Integer>() {public Integer map(String value) { return Integer.parseInt(value); }
});
2)累加器 和 计数器
1.概述

累加器是具有加法运算最终累加结果的一种简单结构,可在作业结束后使用

最简单的累加器是计数器: 可以使用 Accumulator.add(V value) 方法将其递增。

在作业结束时,Flink 会汇总(合并)所有部分的结果并将其发送给客户端,Flink 目前有如下内置累加器,每个都实现了累加器接口。

  • IntCounter , LongCounterDoubleCounter
  • Histogram(直方图): 离散数量的柱状直方图实现;在内部,它只是整形到整形的映射,可以使用它来计算值的分布,例如,单词计数程序的每行单词的分布情况【详见 Metrics】。
2.使用累加器

首先,在需要使用累加器的用户自定义的转换 function 中创建一个累加器对象(此处是计数器)。

private IntCounter numLines = new IntCounter();

其次,必须在 rich function 的 open() 方法中注册累加器对象,也可以在此处定义累加器的名称。

getRuntimeContext().addAccumulator("num-lines", this.numLines);

在操作 function 中的任何位置(包括 open()close() 方法中)使用累加器。

this.numLines.add(1);

最终整体结果会存储在由执行环境的 execute() 方法返回的 JobExecutionResult 对象中(当前只有等待作业完成后执行才起作用)。

myJobExecutionResult.getAccumulatorResult("num-lines");

单个作业的所有累加器共享一个命名空间,因此可以在不同的操作 function 里面使用同一个累加器;Flink 会在内部将所有具有相同名称的累加器合并起来。

关于累加器和迭代的注意事项:当前累加器的结果只有在整个作业结束后才可用;Flink 计划在下一次迭代中提供上一次的迭代结果;可以使用 聚合器 来计算每次迭代的统计信息,并基于此类统计信息来终止迭代。

3.定制累加器

自定义累加器只需要实现累加器接口,可以选择实现 Accumulator 或 SimpleAccumulator。

Accumulator 的实现十分灵活: 它定义了将要添加的值类型 V,并定义了最终的结果类型 R;例如,对于直方图,V 是一个数字且 R 是一个直方图。

SimpleAccumulator 适用于两种类型都相同的情况,例如计数器。

4.总结
1.通过调用 execute() 方法返回的 JobExecutionResult 对象获得累加器结果(只有等待作业完成后执行才起作用)。2.单个作业的所有累加器共享一个命名空间,可以在不同的操作 function 里面使用同一个累加器;Flink 会在内部将所有具有相同名称的累加器合并起来。

这篇关于9、Flink 用户自定义 Functions 及 累加器详解的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/952383

相关文章

UnityException: Gizmo drawing functions can only be used in OnDrawGizmos and OnDrawGizmosSelected.

You don't have to call OnDrawGizmos() From anywhere, it is a editor function to display gizmos in the scene-view and is called automatically called!! 不能在任何地方调用 OnDrawGizmos(),这个函数是自动调用的。这个函数是编辑器函数,用来

十四、观察者模式与访问者模式详解

21.观察者模式 21.1.课程目标 1、 掌握观察者模式和访问者模式的应用场景。 2、 掌握观察者模式在具体业务场景中的应用。 3、 了解访问者模式的双分派。 4、 观察者模式和访问者模式的优、缺点。 21.2.内容定位 1、 有 Swing开发经验的人群更容易理解观察者模式。 2、 访问者模式被称为最复杂的设计模式。 21.3.观察者模式 观 察 者 模 式 ( Obser

【操作系统】信号Signal超详解|捕捉函数

🔥博客主页: 我要成为C++领域大神🎥系列专栏:【C++核心编程】 【计算机网络】 【Linux编程】 【操作系统】 ❤️感谢大家点赞👍收藏⭐评论✍️ 本博客致力于知识分享,与更多的人进行学习交流 ​ 如何触发信号 信号是Linux下的经典技术,一般操作系统利用信号杀死违规进程,典型进程干预手段,信号除了杀死进程外也可以挂起进程 kill -l 查看系统支持的信号

ROS话题通信流程自定义数据格式

ROS话题通信流程自定义数据格式 需求流程实现步骤定义msg文件编辑配置文件编译 在 ROS 通信协议中,数据载体是一个较为重要组成部分,ROS 中通过 std_msgs 封装了一些原生的数据类型,比如:String、Int32、Int64、Char、Bool、Empty… 但是,这些数据一般只包含一个 data 字段,结构的单一意味着功能上的局限性,当传输一些复杂的数据,比如:

Jitter Injection详解

一、定义与作用 Jitter Injection,即抖动注入,是一种在通信系统中人为地添加抖动的技术。该技术通过在发送端对数据包进行延迟和抖动调整,以实现对整个通信系统的时延和抖动的控制。其主要作用包括: 改善传输质量:通过调整数据包的时延和抖动,可以有效地降低误码率,提高数据传输的可靠性。均衡网络负载:通过对不同的数据流进行不同程度的抖动注入,可以实现网络资源的合理分配,提高整体传输效率。增

Steam邮件推送内容有哪些?配置教程详解!

Steam邮件推送功能是否安全?如何个性化邮件推送内容? Steam作为全球最大的数字游戏分发平台之一,不仅提供了海量的游戏资源,还通过邮件推送为用户提供最新的游戏信息、促销活动和个性化推荐。AokSend将详细介绍Steam邮件推送的主要内容。 Steam邮件推送:促销优惠 每当平台举办大型促销活动,如夏季促销、冬季促销、黑色星期五等,用户都会收到邮件通知。这些邮件详细列出了打折游戏、

探索Elastic Search:强大的开源搜索引擎,详解及使用

🎬 鸽芷咕:个人主页  🔥 个人专栏: 《C++干货基地》《粉丝福利》 ⛺️生活的理想,就是为了理想的生活! 引入 全文搜索属于最常见的需求,开源的 Elasticsearch (以下简称 Elastic)是目前全文搜索引擎的首选,相信大家多多少少的都听说过它。它可以快速地储存、搜索和分析海量数据。就连维基百科、Stack Overflow、

常用MQ消息中间件Kafka、ZeroMQ和RabbitMQ对比及RabbitMQ详解

1、概述   在现代的分布式系统和实时数据处理领域,消息中间件扮演着关键的角色,用于解决应用程序之间的通信和数据传递的挑战。在众多的消息中间件解决方案中,Kafka、ZeroMQ和RabbitMQ 是备受关注和广泛应用的代表性系统。它们各自具有独特的特点和优势,适用于不同的应用场景和需求。   Kafka 是一个高性能、可扩展的分布式消息队列系统,被设计用于处理大规模的数据流和实时数据传输。它

Linux中拷贝 cp命令中拷贝所有的写法详解

This text from: http://www.jb51.net/article/101641.htm 一、预备  cp就是拷贝,最简单的使用方式就是: cp oldfile newfile 但这样只能拷贝文件,不能拷贝目录,所以通常用: cp -r old/ new/ 那就会把old目录整个拷贝到new目录下。注意,不是把old目录里面的文件拷贝到new目录,

53、Flink Interval Join 代码示例

1、概述 interval Join 默认会根据 keyBy 的条件进行 Join 此时为 Inner Join; interval Join 算子的水位线会取两条流中水位线的最小值; interval Join 迟到数据的判定是以 interval Join 算子的水位线为基准; interval Join 可以分别输出两条流中迟到的数据-[sideOutputLeftLateData,