本文主要是介绍大数据(8q)流计算updateStateByKey,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
文章目录
- 前言
- updateStateByKey示例
- updateStateByKey源码
- Option知识补充
- getOrElse
- isEmpty
前言
- 本文属于Spark Streaming分支章节
- 流式处理中,分为有状态和冇状态
- 有状态:记录之前数据流处理的信息
- updateStateByKey是有状态的Transformation
updateStateByKey示例
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable// 创建SparkContext和SparkStreamingContext
val c0: SparkConf = new SparkConf().setAppName("a0").setMaster("local[2]")
val sc: SparkContext = new SparkContext(c0)
val ssc: StreamingContext = new StreamingContext(sc, Seconds(9))
// 创建RDD队列,并放入QueueInputDStream
val rddQueue: mutable.Queue[RDD[String]] = new mutable.Queue[RDD[String]]()
val iDS: InputDStream[String] = ssc.queueStream(rddQueue, oneAtATime = false)
//===========================================================================
// 数据预处理
val dS: DStream[(String, Int)] = iDS.map((_, 1))
// 无状态
dS.reduceByKey(_ + _).print()
//设置检查点路径,用于保存状态
ssc.checkpoint("checkpoint")
// 根据 Key 来更新状态
dS.updateStateByKey(// seq是一个DStream内所有RDD相同Key连成的Value队列(seq: Seq[Int], state: Option[Int]) => {Option(seq.sum + state.getOrElse(0))}
).print()
//===========================================================================
// 启动任务:循环输入文本,按空格切分
ssc.start()
while (true) {rddQueue += sc.makeRDD(scala.io.StdIn.readLine.split(" "))
}
ssc.awaitTermination()
结果打印
updateStateByKey源码
def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S],partitioner: Partitioner): DStream[(K, S)] = ssc.withScope {val cleanedUpdateF = sparkContext.clean(updateFunc)val newUpdateFunc = (iterator: Iterator[(K, Seq[V], Option[S])]) => {iterator.flatMap(t => cleanedUpdateF(t._2, t._3).map(s => (t._1, s)))}updateStateByKey(newUpdateFunc, partitioner, true)
}
def updateStateByKey[S: ClassTag](updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],partitioner: Partitioner,rememberPartitioner: Boolean): DStream[(K, S)] = ssc.withScope {val cleanedFunc = ssc.sc.clean(updateFunc)val newUpdateFunc = (_: Time, it: Iterator[(K, Seq[V], Option[S])]) => {cleanedFunc(it)}new StateDStream(self, newUpdateFunc, partitioner, rememberPartitioner, None)
}
Option知识补充
- Option译作选项,用来表示一个值是可选的(有值或无值)
Option[T]
是一个类型为T
的可选值的容器:
若值存在,Option[T]
就是一个Some[T]
若值不存在,Option[T]
就是对象None
val myMap: Map[String, String] = Map("key1" -> "value1")
val value1: Option[String] = myMap.get("key1")
val value2: Option[String] = myMap.get("key2")
println(value1) // Some("value1")
println(value2) // None
getOrElse
val a: Option[Int] = Some(5)
val b: Option[Int] = None
println(a.getOrElse(0)) // 5
println(b.getOrElse(0)) // 0
isEmpty
val a: Option[Int] = Some(5)
val b: Option[Int] = None
println(a.isEmpty) // false
println(b.isEmpty) // true
这篇关于大数据(8q)流计算updateStateByKey的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!