本文主要是介绍spark中mapPartitions双重循环或两次遍历(duplicate),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
在spark当中通常需要对mapPartitions内部进行计算,这样可以在不进行网络传输的情况下,对数据进行局部计算
而mapPartitions中的迭代器为Iterator
scala中的Iterator只能进行一次迭代,使用过后就消失了,所以在mapPartitions中既不能两次遍历
如:一次mapPartitions求最大最小值
val it = Iterator(20, 40, 2, 50, 69, 90)
println(“Maximum valued element ” + it.max) // 90
println(“Minimum valued element ” + it.min) // 出错
同理,如果进行双重循环等操作,在内部循环第一次循环完毕时,外部循环也会直接跳出
(而这对于计算而言很重要 )
所以在使用两次遍历或双重循环时需要对Iterator进行拷贝
需要用到关键字 duplicate 和 iter.toList
示例如下:(计算KNN高斯核密度)
def gaussianKernel(iterator: Iterator[DenseVector[Double]]): Iterator[Tuple2[DenseVector[Double], Double]] = {var res = List[(DenseVector[Double], Double)]()val (bakiter, curiter) = iterator.duplicateval (sizeiter, tmpiter) = bakiter.duplicateval tmplist = tmpiter.toListval curlist = curiter.toListval size = sizeiter.sizeval k = sqrt(size).toIntcurlist.foreach { cur =>var sumtmp = 0.0val abfDist = ArrayBuffer[Double]()tmplist.foreach { tmp =>abfDist += exp(-sum(pow(cur - tmp, 2)) / (2.0 * C))}val abfDistSorted = abfDist.sortedfor (i <- 0 until k) {sumtmp += abfDistSorted(size - 1 - i)}res.::=(cur, sumtmp)}res.iterator}
这篇关于spark中mapPartitions双重循环或两次遍历(duplicate)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!