####好#####DStream中的转换(transformation)

2024-05-07 15:08

本文主要是介绍####好#####DStream中的转换(transformation),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

DStream中的转换(transformation)

和RDD类似,transformation允许从输入DStream来的数据被修改。DStreams支持很多在RDD中可用的transformation算子。一些常用的算子如下所示:

TransformationMeaning
map(func)利用函数func处理原DStream的每个元素,返回一个新的DStream
flatMap(func)与map相似,但是每个输入项可用被映射为0个或者多个输出项
filter(func)返回一个新的DStream,它仅仅包含源DStream中满足函数func的项
repartition(numPartitions)通过创建更多或者更少的partition改变这个DStream的并行级别(level of parallelism)
union(otherStream)返回一个新的DStream,它包含源DStream和otherStream的联合元素
count()通过计算源DStream中每个RDD的元素数量,返回一个包含单元素(single-element)RDDs的新DStream
reduce(func)利用函数func聚集源DStream中每个RDD的元素,返回一个包含单元素(single-element)RDDs的新DStream。函数应该是相关联的,以使计算可以并行化
countByValue()这个算子应用于元素类型为K的DStream上,返回一个(K,long)对的新DStream,每个键的值是在原DStream的每个RDD中的频率。
reduceByKey(func, [numTasks])当在一个由(K,V)对组成的DStream上调用这个算子,返回一个新的由(K,V)对组成的DStream,每一个key的值均由给定的reduce函数聚集起来。注意:在默认情况下,这个算子利用了Spark默认的并发任务数去分组。你可以用numTasks参数设置不同的任务数
join(otherStream, [numTasks])当应用于两个DStream(一个包含(K,V)对,一个包含(K,W)对),返回一个包含(K, (V, W))对的新DStream
cogroup(otherStream, [numTasks])当应用于两个DStream(一个包含(K,V)对,一个包含(K,W)对),返回一个包含(K, Seq[V], Seq[W])的元组
transform(func)通过对源DStream的每个RDD应用RDD-to-RDD函数,创建一个新的DStream。这个可以在DStream中的任何RDD操作中使用
updateStateByKey(func)利用给定的函数更新DStream的状态,返回一个新"state"的DStream。

最后两个transformation算子需要重点介绍一下:

UpdateStateByKey操作

updateStateByKey操作允许不断用新信息更新它的同时保持任意状态。你需要通过两步来使用它

  • 定义状态-状态可以是任何的数据类型
  • 定义状态更新函数-怎样利用更新前的状态和从输入流里面获取的新值更新状态

让我们举个例子说明。在例子中,你想保持一个文本数据流中每个单词的运行次数,运行次数用一个state表示,它的类型是整数

def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {val newCount = ...  // add the new values with the previous running count to get the new countSome(newCount)
}

这个函数被用到了DStream包含的单词上

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
// Create a local StreamingContext with two working thread and batch interval of 1 second
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
// Split each line into words
val words = lines.flatMap(_.split(" "))
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val runningCounts = pairs.updateStateByKey[Int](updateFunction _)

更新函数将会被每个单词调用,newValues拥有一系列的1(从 (词, 1)对而来),runningCount拥有之前的次数。要看完整的代码,见例子

Transform操作

transform操作(以及它的变化形式如transformWith)允许在DStream运行任何RDD-to-RDD函数。它能够被用来应用任何没在DStream API中提供的RDD操作(It can be used to apply any RDD operation that is not exposed in the DStream API)。例如,连接数据流中的每个批(batch)和另外一个数据集的功能并没有在DStream API中提供,然而你可以简单的利用transform方法做到。如果你想通过连接带有预先计算的垃圾邮件信息的输入数据流来清理实时数据,然后过了它们,你可以按如下方法来做:

val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam informationval cleanedDStream = wordCounts.transform(rdd => {rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning...
})

事实上,你也可以在transform方法中用机器学习和图计算算法

窗口(window)操作

Spark Streaming也支持窗口计算,它允许你在一个滑动窗口数据上应用transformation算子。下图阐明了这个滑动窗口。

滑动窗口

如上图显示,窗口在源DStream上滑动,合并和操作落入窗内的源RDDs,产生窗口化的DStream的RDDs。在这个具体的例子中,程序在三个时间单元的数据上进行窗口操作,并且每两个时间单元滑动一次。这说明,任何一个窗口操作都需要指定两个参数:

  • 窗口长度:窗口的持续时间
  • 滑动的时间间隔:窗口操作执行的时间间隔

这两个参数必须是源DStream的批时间间隔的倍数。

下面举例说明窗口操作。例如,你想扩展前面的例子用来计算过去30秒的词频,间隔时间是10秒。为了达到这个目的,我们必须在过去30秒的pairs DStream上应用reduceByKey操作。用方法reduceByKeyAndWindow实现。

// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))

一些常用的窗口操作如下所示,这些操作都需要用到上文提到的两个参数:窗口长度和滑动的时间间隔

这篇关于####好#####DStream中的转换(transformation)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/967695

相关文章

PDF 软件如何帮助您编辑、转换和保护文件。

如何找到最好的 PDF 编辑器。 无论您是在为您的企业寻找更高效的 PDF 解决方案,还是尝试组织和编辑主文档,PDF 编辑器都可以在一个地方提供您需要的所有工具。市面上有很多 PDF 编辑器 — 在决定哪个最适合您时,请考虑这些因素。 1. 确定您的 PDF 文档软件需求。 不同的 PDF 文档软件程序可以具有不同的功能,因此在决定哪个是最适合您的 PDF 软件之前,请花点时间评估您的

C# double[] 和Matlab数组MWArray[]转换

C# double[] 转换成MWArray[], 直接赋值就行             MWNumericArray[] ma = new MWNumericArray[4];             double[] dT = new double[] { 0 };             double[] dT1 = new double[] { 0,2 };

数据流与Bitmap之间相互转换

把获得的数据流转换成一副图片(Bitmap) 其原理就是把获得倒的数据流序列化到内存中,然后经过加工,在把数据从内存中反序列化出来就行了。 难点就是在如何实现加工。因为Bitmap有一个专有的格式,我们常称这个格式为数据头。加工的过程就是要把这个数据头与我们之前获得的数据流合并起来。(也就是要把这个头加入到我们之前获得的数据流的前面)      那么这个头是

高斯平面直角坐标讲解,以及地理坐标转换高斯平面直角坐标

高斯平面直角坐标系(Gauss-Krüger 坐标系)是基于 高斯-克吕格投影 的一种常见的平面坐标系统,主要用于地理信息系统 (GIS)、测绘和工程等领域。该坐标系将地球表面的经纬度(地理坐标)通过一种投影方式转换为平面直角坐标,以便在二维平面中进行距离、面积和角度的计算。 一 投影原理 高斯平面直角坐标系使用的是 高斯-克吕格投影(Gauss-Krüger Projection),这是 横

VC环境下整型转换为字符串型(2)

在串口下位机的发送中,可能会用到需要发送数字,显示为字符串型的 和上一篇文字《串口中字符串转换为整型》一正一反,知识点学习会了: #include<iostream.h> #include <stdio.h> #include <string.h>   void inttostr(int m,unsigned char * str) { int length=0;   int tmp,te

时间日期与时间戳转换(Linux C)

本文主要学习三个知识点,第一是UTC时间、GMT时间的概念;第二是在Unix环境下UTC时间与时间戳的转换;第三是在C语言中如何修改时区。 本文参考了《UNP》以及 http://blog.csdn.net/foxir/article/details/43916601 http://blog.csdn.net/ljafl9988/article/details/16847935 一、

点云数据常见的坐标系有哪些,如何进行转换?

文章目录 一、点云坐标系分类1. 世界坐标系2. 相机坐标系3. 极坐标系4. 笛卡尔坐标系(直角坐标系):5. 传感器坐标系6. 地理坐标系 二、坐标系转换方法1. 地理坐标系与投影坐标系之间的转换2. 投影坐标系与局部坐标系之间的转换3. 局部坐标系与3D模型坐标系之间的转换4. 相机坐标系与其他坐标系之间的转换5. 传感器坐标系与其他坐标系之间的转换 三、坐标系转换工具 一

思科网络地址转换5

#网络安全技术实现# #任务五利用动态NAPT实现局域网访问Internet5# #1配置计算机的IP 地址、子网掩码和网关 #2配置路由器A的主机名称及其接口IP地址 Router>enable Router#conf t Router(config)#hostname Router-A Router-A(config)#int f0/0 Router-A(con

itoa()函数,10进制转换到(2~36)进制

先看下itoa()的函数说明吧: 功 能:把一整数转换为字符串   用 法:char *itoa(int value, char *string, int radix);    详细解释:itoa是英文integer to array(将int整型数转化为一个字符串,并将值保存在数组string中)的缩写.    参数:  value: 待转化的整数。            radix:

Untiy TTF转换为SDF

Untiy TTF转换为SDF 原因 下载的字体是TTF格式,但是TMP使用的是SDF格式,不支持TTF,需要转换网络没有检索到TTF转SDF的教程,可能是太简单了,自己记录一下吧 Unity内转换即可 在Asset中找到自己的TTF右键点击TTF,找到TMP,选择Font Asset 即可将自己下载的TTF字体转换为SDF格式 补充-修改默认的SDF 就是每次都自己拖动SDF效率低