9、Flink 用户自定义 Functions 及 累加器详解

2024-05-01 18:36

本文主要是介绍9、Flink 用户自定义 Functions 及 累加器详解,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1)用户自定义函数
1.实现接口

最基本的方法是实现提供的接口。

# 根据提供的接口创建自定义函数
class MyMapFunction implements MapFunction<String, Integer> {public Integer map(String value) { return Integer.parseInt(value); }
}# 调用创建的自定义函数
data.map(new MyMapFunction());
2.匿名类

可以将 function 当做匿名类传递。

data.map(new MapFunction<String, Integer> () {public Integer map(String value) { return Integer.parseInt(value); }
});
3.Java 8 Lambdas

Flink 在 Java API 中还支持 Java 8 Lambdas 表达式。

data.filter(s -> s.startsWith("http://"));data.reduce((i1,i2) -> i1 + i2);
4.Rich functions

所有需要用户自定义 function 的转化操作都可以将 rich function 作为参数。

class MyMapFunction implements MapFunction<String, Integer> {public Integer map(String value) { return Integer.parseInt(value); }
}

替换成

class MyMapFunction extends RichMapFunction<String, Integer> {public Integer map(String value) { return Integer.parseInt(value); }
}

并将 function 照常传递给 map transformation。

data.map(new MyMapFunction());

Rich functions 也可以定义成匿名类:

data.map (new RichMapFunction<String, Integer>() {public Integer map(String value) { return Integer.parseInt(value); }
});
2)累加器 和 计数器
1.概述

累加器是具有加法运算最终累加结果的一种简单结构,可在作业结束后使用

最简单的累加器是计数器: 可以使用 Accumulator.add(V value) 方法将其递增。

在作业结束时,Flink 会汇总(合并)所有部分的结果并将其发送给客户端,Flink 目前有如下内置累加器,每个都实现了累加器接口。

  • IntCounter , LongCounterDoubleCounter
  • Histogram(直方图): 离散数量的柱状直方图实现;在内部,它只是整形到整形的映射,可以使用它来计算值的分布,例如,单词计数程序的每行单词的分布情况【详见 Metrics】。
2.使用累加器

首先,在需要使用累加器的用户自定义的转换 function 中创建一个累加器对象(此处是计数器)。

private IntCounter numLines = new IntCounter();

其次,必须在 rich function 的 open() 方法中注册累加器对象,也可以在此处定义累加器的名称。

getRuntimeContext().addAccumulator("num-lines", this.numLines);

在操作 function 中的任何位置(包括 open()close() 方法中)使用累加器。

this.numLines.add(1);

最终整体结果会存储在由执行环境的 execute() 方法返回的 JobExecutionResult 对象中(当前只有等待作业完成后执行才起作用)。

myJobExecutionResult.getAccumulatorResult("num-lines");

单个作业的所有累加器共享一个命名空间,因此可以在不同的操作 function 里面使用同一个累加器;Flink 会在内部将所有具有相同名称的累加器合并起来。

关于累加器和迭代的注意事项:当前累加器的结果只有在整个作业结束后才可用;Flink 计划在下一次迭代中提供上一次的迭代结果;可以使用 聚合器 来计算每次迭代的统计信息,并基于此类统计信息来终止迭代。

3.定制累加器

自定义累加器只需要实现累加器接口,可以选择实现 Accumulator 或 SimpleAccumulator。

Accumulator 的实现十分灵活: 它定义了将要添加的值类型 V,并定义了最终的结果类型 R;例如,对于直方图,V 是一个数字且 R 是一个直方图。

SimpleAccumulator 适用于两种类型都相同的情况,例如计数器。

4.总结
1.通过调用 execute() 方法返回的 JobExecutionResult 对象获得累加器结果(只有等待作业完成后执行才起作用)。2.单个作业的所有累加器共享一个命名空间,可以在不同的操作 function 里面使用同一个累加器;Flink 会在内部将所有具有相同名称的累加器合并起来。

这篇关于9、Flink 用户自定义 Functions 及 累加器详解的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/952383

相关文章

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

【前端学习】AntV G6-08 深入图形与图形分组、自定义节点、节点动画(下)

【课程链接】 AntV G6:深入图形与图形分组、自定义节点、节点动画(下)_哔哩哔哩_bilibili 本章十吾老师讲解了一个复杂的自定义节点中,应该怎样去计算和绘制图形,如何给一个图形制作不间断的动画,以及在鼠标事件之后产生动画。(有点难,需要好好理解) <!DOCTYPE html><html><head><meta charset="UTF-8"><title>06

OpenHarmony鸿蒙开发( Beta5.0)无感配网详解

1、简介 无感配网是指在设备联网过程中无需输入热点相关账号信息,即可快速实现设备配网,是一种兼顾高效性、可靠性和安全性的配网方式。 2、配网原理 2.1 通信原理 手机和智能设备之间的信息传递,利用特有的NAN协议实现。利用手机和智能设备之间的WiFi 感知订阅、发布能力,实现了数字管家应用和设备之间的发现。在完成设备间的认证和响应后,即可发送相关配网数据。同时还支持与常规Sof

6.1.数据结构-c/c++堆详解下篇(堆排序,TopK问题)

上篇:6.1.数据结构-c/c++模拟实现堆上篇(向下,上调整算法,建堆,增删数据)-CSDN博客 本章重点 1.使用堆来完成堆排序 2.使用堆解决TopK问题 目录 一.堆排序 1.1 思路 1.2 代码 1.3 简单测试 二.TopK问题 2.1 思路(求最小): 2.2 C语言代码(手写堆) 2.3 C++代码(使用优先级队列 priority_queue)

K8S(Kubernetes)开源的容器编排平台安装步骤详解

K8S(Kubernetes)是一个开源的容器编排平台,用于自动化部署、扩展和管理容器化应用程序。以下是K8S容器编排平台的安装步骤、使用方式及特点的概述: 安装步骤: 安装Docker:K8S需要基于Docker来运行容器化应用程序。首先要在所有节点上安装Docker引擎。 安装Kubernetes Master:在集群中选择一台主机作为Master节点,安装K8S的控制平面组件,如AP

自定义类型:结构体(续)

目录 一. 结构体的内存对齐 1.1 为什么存在内存对齐? 1.2 修改默认对齐数 二. 结构体传参 三. 结构体实现位段 一. 结构体的内存对齐 在前面的文章里我们已经讲过一部分的内存对齐的知识,并举出了两个例子,我们再举出两个例子继续说明: struct S3{double a;int b;char c;};int mian(){printf("%zd\n",s

嵌入式Openharmony系统构建与启动详解

大家好,今天主要给大家分享一下,如何构建Openharmony子系统以及系统的启动过程分解。 第一:OpenHarmony系统构建      首先熟悉一下,构建系统是一种自动化处理工具的集合,通过将源代码文件进行一系列处理,最终生成和用户可以使用的目标文件。这里的目标文件包括静态链接库文件、动态链接库文件、可执行文件、脚本文件、配置文件等。      我们在编写hellowor

LabVIEW FIFO详解

在LabVIEW的FPGA开发中,FIFO(先入先出队列)是常用的数据传输机制。通过配置FIFO的属性,工程师可以在FPGA和主机之间,或不同FPGA VIs之间进行高效的数据传输。根据具体需求,FIFO有多种类型与实现方式,包括目标范围内FIFO(Target-Scoped)、DMA FIFO以及点对点流(Peer-to-Peer)。 FIFO类型 **目标范围FIFO(Target-Sc

Spring 源码解读:自定义实现Bean定义的注册与解析

引言 在Spring框架中,Bean的注册与解析是整个依赖注入流程的核心步骤。通过Bean定义,Spring容器知道如何创建、配置和管理每个Bean实例。本篇文章将通过实现一个简化版的Bean定义注册与解析机制,帮助你理解Spring框架背后的设计逻辑。我们还将对比Spring中的BeanDefinition和BeanDefinitionRegistry,以全面掌握Bean注册和解析的核心原理。

019、JOptionPane类的常用静态方法详解

目录 JOptionPane类的常用静态方法详解 1. showInputDialog()方法 1.1基本用法 1.2带有默认值的输入框 1.3带有选项的输入对话框 1.4自定义图标的输入对话框 2. showConfirmDialog()方法 2.1基本用法 2.2自定义按钮和图标 2.3带有自定义组件的确认对话框 3. showMessageDialog()方法 3.1