本文主要是介绍SparkStreaming的窗口,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
目录
- 1.window(windowLength, slideInterval)
- 2.countByWindow(windowLength,slideInterval)
- 3.countByValueAndWindow
- 4.reduceByWindow(func, windowLength,slideInterval)
窗口函数,就是在DStream流上,以一个可配置的长度为窗口,以一个可配置的速率向前移动窗口,根据窗口函数的具体内容,分别对当前窗口中的这一波数据采取某个对应的操作算子。
需要注意的是窗口长度,和窗口移动速率需要是batch time的整数倍。
1.window(windowLength, slideInterval)
该操作由一个DStream对象调用,传入一个窗口长度参数,一个窗口移动速率参数,然后将当前时刻当前长度窗口中的元素取出形成一个新的DStream。
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}object sparkWindowDemo {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("demo")//采集周期batch time,指定的2秒为每次采集的时间间隔val streamingContext = new StreamingContext(sparkConf,Seconds(2))streamingContext.checkpoint("/in/checkPoint/")val kafkaParams = Map((ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.184.40:9092"),(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),(ConsumerConfig.GROUP_ID_CONFIG, "kafkaGroup"))val kafkaStream:InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream(streamingContext,//本地策略,可用的执行器上均匀分布LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe(Set("sparkKafkaDemo"), kafkaParams))//window窗口,可加第二个参数,参数是batch time的整数倍,滑动窗口//一个参数 x秒内出现几次//两个参数 x秒加前一窗口滑动y秒出现次数 有重复数据计算val numStream = kafkaStream.flatMap(_.value().toString.split("\\s+")).map((_, 1)).window(Seconds(x),Seconds(y))numStream.print()streamingContext.start()streamingContext.awaitTermination()}
}
2.countByWindow(windowLength,slideInterval)
返回窗口内出现元素个数,注意:需要设置checkpoint
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrat
这篇关于SparkStreaming的窗口的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!