Flink特异的迭代操作-bulkIteration

2023-10-09 03:18

本文主要是介绍Flink特异的迭代操作-bulkIteration,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

迭代算法在很多数据分析领域会用到,比如机器学习或者图计算。为了从大数据中抽取有用信息,这个时候往往会需要在处理的过程中用到迭代计算。大数据处理框架很多,比如sparkmr。实际上这些实现迭代计算都是很困难的。

Flink神奇之处就是它直接支持迭代计算。Flink实现迭代的思路也是很简单,就是实现一个step函数,然后将其嵌入到迭代算子中去。有两种迭代操作算子:IterateDelta Iterate。两个操作算子都是在未收到终止迭代信号之前一直调用step函数。

本小节是主要是讲解理论。

迭代操作算子包括了简单的迭代形式:每次迭代,step函数会消费全量数据(本次输入和上次迭代的结果),然后计算得到下轮迭代的输出(例如,mapreducejoin)

1.迭代输入(Iteration Input)

第一次迭代的初始输入,可能来源于数据源或者先前的操作算子。

2. Step函数

每次迭代都会执行step函数。其是由mapreducejoin等算子组成的数据流,根据业务定制的。

3. 下次迭代的部分结果(Next Partial Solution):

每次迭代,step函数的输出结果会有部分返回参与继续迭代。

4. 最大迭代次数

如果没有其他终止条件,就会在聚合次数达到该值的情况下终止。

5. 自定义聚合器收敛:

迭代允许指定自定义聚合器和收敛标准,如sum会聚合要发出的记录数(聚合器),如果此数字为零则终止(收敛标准)。

案例:累加计数

这个例子主要是给定数据输入,每次增加一,输出结果。

640?wx_fmt=png


  1. 迭代输入:输入是1-5的数字。

  2. step函数:给数字加一操作。

  3. 部分结果:实际上就是一个map函数。

  4. 迭代结果:最大迭代次数是十次,所以最终输出是11-15.

640?wx_fmt=png

代码操作

编程的时候,本文说的这种迭代方式叫做bulk Iteration,需要调用iterate(int),该函数返回的是一个IterativeDataSet,当然我们可以对他进行一些操作,比如map等。Iterate函数唯一的参数是代表最大迭代次数。

迭代是一个环有前面的图可以看到,我们需要进行闭环操作,那么这时候就要用到closeWith(Dataset)操作了,参数就是需要循环迭代的dataset。也可以可选的指定一个终止标准,操作closeWith(DataSet, DataSet),可以通过判断第二个dataset是否为空,来终止迭代。如果不指定终止迭代条件,迭代就会在迭代了最大迭代次数后终止。

下面就是通过迭代计算pi的例子。

 

package Streaming.iteration;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.IterativeDataSet;


public class IteratePi {

  public static voidmain(String[] args) throws Exception{
     final ExecutionEnvironmentenv = ExecutionEnvironment.getExecutionEnvironment();
     // Create initialIterativeDataSet
     IterativeDataSet<Integer> initial= env.fromElements(0).iterate(100);

     DataSet<Integer> iteration= initial.map(new MapFunction<Integer, Integer>(){
        @Override
        public Integermap(Integer i) throws Exception{
           double x = Math.random();
           double y = Math.random();

           return i + ((x * x + y * y < 1) ? 1 : 0);
        }
     });


     // Iterativelytransform the IterativeDataSet
     DataSet<Integer> count = initial.closeWith(iteration);

     count.map(new MapFunction<Integer, Double>(){
        @Override
        public Double map(Integercount) throws Exception {
           return count /(double) 10000 * 4;
        }
     }).print();

     // execute theprogram
     env.execute("IterativePi Example");
  }

}

推荐阅读:

调试flink源码

Flink异步IO第一讲

flink的神奇分流器-sideoutput

Structured Streaming VS Flink

更详细的flink内容分享,欢迎加入浪尖知识星球,与470人一起学习~

640?wx_fmt=jpeg

这篇关于Flink特异的迭代操作-bulkIteration的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/169959

相关文章

C++变换迭代器使用方法小结

《C++变换迭代器使用方法小结》本文主要介绍了C++变换迭代器使用方法小结,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧... 目录1、源码2、代码解析代码解析:transform_iterator1. transform_iterat

Mysql表的简单操作(基本技能)

《Mysql表的简单操作(基本技能)》在数据库中,表的操作主要包括表的创建、查看、修改、删除等,了解如何操作这些表是数据库管理和开发的基本技能,本文给大家介绍Mysql表的简单操作,感兴趣的朋友一起看... 目录3.1 创建表 3.2 查看表结构3.3 修改表3.4 实践案例:修改表在数据库中,表的操作主要

C# WinForms存储过程操作数据库的实例讲解

《C#WinForms存储过程操作数据库的实例讲解》:本文主要介绍C#WinForms存储过程操作数据库的实例,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、存储过程基础二、C# 调用流程1. 数据库连接配置2. 执行存储过程(增删改)3. 查询数据三、事务处

Java使用Curator进行ZooKeeper操作的详细教程

《Java使用Curator进行ZooKeeper操作的详细教程》ApacheCurator是一个基于ZooKeeper的Java客户端库,它极大地简化了使用ZooKeeper的开发工作,在分布式系统... 目录1、简述2、核心功能2.1 CuratorFramework2.2 Recipes3、示例实践3

Java利用JSONPath操作JSON数据的技术指南

《Java利用JSONPath操作JSON数据的技术指南》JSONPath是一种强大的工具,用于查询和操作JSON数据,类似于SQL的语法,它为处理复杂的JSON数据结构提供了简单且高效... 目录1、简述2、什么是 jsONPath?3、Java 示例3.1 基本查询3.2 过滤查询3.3 递归搜索3.4

Python使用DrissionPage中ChromiumPage进行自动化网页操作

《Python使用DrissionPage中ChromiumPage进行自动化网页操作》DrissionPage作为一款轻量级且功能强大的浏览器自动化库,为开发者提供了丰富的功能支持,本文将使用Dri... 目录前言一、ChromiumPage基础操作1.初始化Drission 和 ChromiumPage

利用Go语言开发文件操作工具轻松处理所有文件

《利用Go语言开发文件操作工具轻松处理所有文件》在后端开发中,文件操作是一个非常常见但又容易出错的场景,本文小编要向大家介绍一个强大的Go语言文件操作工具库,它能帮你轻松处理各种文件操作场景... 目录为什么需要这个工具?核心功能详解1. 文件/目录存javascript在性检查2. 批量创建目录3. 文件

Redis中管道操作pipeline的实现

《Redis中管道操作pipeline的实现》RedisPipeline是一种优化客户端与服务器通信的技术,通过批量发送和接收命令减少网络往返次数,提高命令执行效率,本文就来介绍一下Redis中管道操... 目录什么是pipeline场景一:我要向Redis新增大批量的数据分批处理事务( MULTI/EXE

使用Python高效获取网络数据的操作指南

《使用Python高效获取网络数据的操作指南》网络爬虫是一种自动化程序,用于访问和提取网站上的数据,Python是进行网络爬虫开发的理想语言,拥有丰富的库和工具,使得编写和维护爬虫变得简单高效,本文将... 目录网络爬虫的基本概念常用库介绍安装库Requests和BeautifulSoup爬虫开发发送请求解

Oracle存储过程里操作BLOB的字节数据的办法

《Oracle存储过程里操作BLOB的字节数据的办法》该篇文章介绍了如何在Oracle存储过程中操作BLOB的字节数据,作者研究了如何获取BLOB的字节长度、如何使用DBMS_LOB包进行BLOB操作... 目录一、缘由二、办法2.1 基本操作2.2 DBMS_LOB包2.3 字节级操作与RAW数据类型2.