本文主要是介绍Flink DataSet分配唯一标识符,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
专栏原创出处:github-源笔记文件 ,github-源码 ,欢迎 Star,转载请附上原文出处链接和本声明。
DataSet分配唯一标识符
在某些算法中,可能需要为数据集元素分配唯一标识符。[[org.apache.flink.api.scala.utils.DataSetUtils]] scala 包装类
具体实现源码可参考 DataSetUtils
zipWithIndex 方式分配
为元素分配连续的标签,接收数据集作为输入并返回 DataSet[(Long, T)] 2 元组的新数据集。
此过程需要两步操作,首先是计数,然后是标记元素,由于计数同步,因此无法进行流水线处理。
替代方法 zipWithUniqueId 以流水线方式工作,当唯一的标签足够时,它是首选方法。
zipWithUniqueId 方式分配
在许多情况下,可能不需要分配连续的标签。
zipWithUniqueId 以管道方式工作,加快了标签分配过程。
此方法接收一个数据集作为输入,并返回一个新的 DataSet[(Long, T)] 2 元组数据集
代码示例 ZippingElements :
import io.gourd.flink.scala.api.BatchExecutionEnvironmentApp/** 在某些算法中,可能需要为数据集元素分配唯一标识符。* 本文档说明了如何将* [[org.apache.flink.api.scala.utils.DataSetUtils]]* [[org.apache.flink.api.java.utils.DataSetUtils.zipWithIndex()]]* [[org.apache.flink.api.java.utils.DataSetUtils.zipWithUniqueId()]]* 用于此目的。** @author Li.Wei by 2019/11/12*/
object ZippingElements extends BatchExecutionEnvironmentApp {import org.apache.flink.api.scala._val input: DataSet[String] = bEnv.fromElements("A", "B", "C", "D", "E", "F", "G", "H")bEnv.setParallelism(2)/*zipWithIndex 为元素分配连续的标签,接收数据集作为输入并返回 DataSet[(Long, T)] 2 元组的新数据集。此过程需要两步操作,首先是计数,然后是标记元素,由于计数同步,因此无法进行流水线处理。替代方法 zipWithUniqueId 以流水线方式工作,当唯一的标签足够时,它是首选方法。*/import org.apache.flink.api.scala.utils.DataSetUtilsinput.zipWithIndex.print()
/*
(0,A)
(1,B)
(2,C)
(3,D)
(4,E)
(5,F)
(6,G)
(7,H)*/println()/*在许多情况下,可能不需要分配连续的标签。zipWithUniqueId 以管道方式工作,加快了标签分配过程。此方法接收一个数据集作为输入,并返回一个新的 DataSet[(Long, T)] 2 元组数据集本机执行,未发生并行,实际情况参考分布式测试结果*/input.zipWithUniqueId.print()/*
(0,A)
(1,B)
(2,C)
(3,D)
(4,E)
(5,F)
(6,G)
(7,H)*/
}
这篇关于Flink DataSet分配唯一标识符的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!