SparkStreaming_window_sparksql_reids

2023-12-30 11:52

本文主要是介绍SparkStreaming_window_sparksql_reids,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1.5 window

滚动窗口+滑动窗口

window操作就是窗口函数。Spark Streaming提供了滑动窗口操作的支持,从而让我们可以对一个滑动窗口内的数据执行计算操作。每次掉落在窗口内的RDD的数据,会被聚合起来执行计算操作,然后生成的RDD,会作为window DStream的一个RDD。比如下图中,就是对每三秒钟的数据执行一次滑动窗口计算,这3秒内的3个RDD会被聚合起来进行处理,然后过了两秒钟,又会对最近三秒内的数据执行滑动窗口计算。所以每个滑动窗口操作,都必须指定两个参数,窗口长度以及滑动间隔,而且这两个参数值都必须是batch间隔的整数倍。

  1. 红色的矩形就是一个窗口,窗口hold的是一段时间内的数据流。

  2. 这里面每一个time都是时间单元,在官方的例子中,每隔window size是3 time unit, 而且每隔2个单位时间,窗口会slide一次。

所以基于窗口的操作,需要指定2个参数:

window length - The duration of the window (3 in the figure)

slide interval - The interval at which the window-based operation is performed (2 in the figure).

  1. 窗口大小,个人感觉是一段时间内数据的容器。

  2. 滑动间隔,就是我们可以理解的cron表达式吧。

案例实现

package com.qianfeng.sparkstreaming
​
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
​
/*** 统计,截止到目前为止出现的每一个key的次数* window窗口操作,每个多长M时间,通过过往N长时间内产生的数据* M就是滑动长度sliding interval* N就是窗口长度window length*/
object Demo05_WCWithWindow {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("WordCountUpdateStateByKey").setMaster("local[*]")val batchInterval = 2val duration = Seconds(batchInterval)val ssc = new StreamingContext(conf, duration)val lines:DStream[String] = ssc.socketTextStream("qianfeng01", 6666)val pairs:DStream[(String, Int)] = lines.flatMap(_.split("\\s+")).map((_, 1))
​val ret:DStream[(String, Int)] = pairs.reduceByKeyAndWindow(_+_,windowDuration = Seconds(batchInterval * 3),slideDuration = Seconds(batchInterval * 2))
​ret.print()
​ssc.start()ssc.awaitTermination()}
}

1.6 SparkSQL和SparkStreaming的整合案例

Spark最强大的地方在于,可以与Spark Core、Spark SQL整合使用,之前已经通过transform、foreachRDD等算子看到,如何将DStream中的RDD使用Spark Core执行批处理操作。现在就来看看,如何将DStream中的RDD与Spark SQL结合起来使用。

案例:top3的商品排序: 最新的top3

这里就是基于updatestateByKey,统计截止到目前为止的不同品类下的商品销量top3

代码实现

package com.qianfeng.sparkstreaming
​
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.DStream
/*** SparkStreaming整合SparkSQL的案例之,热门品类top3排行* 输入数据格式:* id brand category* 1 huwei watch* 2 huawei phone**/
object Demo06_SQLWithStreaming {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("StreamingIntegerationSQL").setMaster("local[*]")val batchInterval = 2val duration = Seconds(batchInterval)val spark = SparkSession.builder().config(conf).getOrCreate()val ssc = new StreamingContext(spark.sparkContext, duration)ssc.checkpoint("/Users/liyadong/data/sparkdata/streamingdata/chk-1")val lines:DStream[String] = ssc.socketTextStream("qianfeng01", 6666)//001 mi moblieval pairs:DStream[(String, Int)] = lines.map(line => {val fields = line.split("\\s+")if(fields == null || fields.length != 3) {("", -1)} else {val brand = fields(1)val category = fields(2)(s"${category}_${brand}", 1)}}).filter(t => t._2 != -1)
​val usb:DStream[(String, Int)] = pairs.updateStateByKey(updateFunc)
​usb.foreachRDD((rdd, bTime) => {if(!rdd.isEmpty()) {//category_brand countimport spark.implicits._val df = rdd.map{case (cb, count) => {val category = cb.substring(0, cb.indexOf("_"))val brand = cb.substring(cb.indexOf("_") + 1)(category, brand, count)}}.toDF("category", "brand", "sales")
​df.createOrReplaceTempView("tmp_category_brand_sales")val sql ="""|select|  t.category,|  t.brand,|  t.sales,|  t.rank|from (|  select|    category,|    brand,|    sales,|    row_number() over(partition by category order by sales desc) rank|  from tmp_category_brand_sales|) t|where t.rank < 4|;""".stripMarginspark.sql(sql).show()}})
​ssc.start()ssc.awaitTermination()}
​def updateFunc(seq: Seq[Int], option: Option[Int]): Option[Int] = {Option(seq.sum + option.getOrElse(0))}
}

1.7 SparkStreaming整合Reids

//将实时结果写入Redis中
dStream.foreachRDD((w,c)=>{val jedis = new Jedis("192.168.10.101", 6379)   //抽到公共地方即可jedis.auth("root")jedis.set(w.toString(),c.toString())  //一个key对应多个值,可以考虑hset
})

Guff_hys_python数据结构,大数据开发学习,python实训项目-CSDN博客

这篇关于SparkStreaming_window_sparksql_reids的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/552763

相关文章

js window.addEventListener 是什么?

window.addEventListener 是 JavaScript 中的一个方法,用于向指定对象(在这个情况下是 window 对象,代表浏览器窗口)添加事件监听器,以便在该对象上发生特定事件时执行相应的函数(称为事件处理函数或事件监听器)。 这个方法接受三个参数: 事件类型(type):一个字符串,表示要监听的事件类型。例如,"click" 表示鼠标点击事件,"load" 表示页面加

Qt中window frame的影响

window frame 在创建图形化界面的时候,会创建窗口主体,上面会多出一条,周围多次一圈细边,这就叫window frame窗口框架,这是操作系统自带的。 这个对geometry的一些属性有一定影响,主要体现在Qt坐标系体系: 窗口当中包含一个按钮,这个按钮的坐标系是以父元素为参考,那么这个参考是widget本体作为参考,还是window frame作为参考,这两种参考体系都存在

Caused by: android.view.WindowManager$BadTokenException: Unable to add window -- token android.os.B

一个bug日志 FATAL EXCEPTION: main03-25 14:24:07.724: E/AndroidRuntime(4135): java.lang.RuntimeException: Unable to start activity ComponentInfo{com.syyx.jingubang.ky/com.anguotech.android.activity.Init

最初的window

不知你是否也是一个常年在MFC下编程的程序员,有的时候是否忘记了在MFC之前是如何写画窗口的了呢,或者你从来都只是机械的在MFC下面写代码,已经麻木了。其实有一个很简单的方法,或许能够帮你更清楚的了解WINDOW是怎么产生的。 随便用什么版本的VS,在创建win32工程的时候,直接创建WINDOW类型的就OK了。然后,来研究下产生的源代码吧。 // Global Variables:H

VC环境下window网络程序:UDP Socket程序

最近在学Windows网络编程,正好在做UDPsocket的程序,贴上来: 服务器框架函数:              socket();    bind();    recfrom();  sendto();  closesocket(); 客户机框架函数:            socket();      recfrom();  sendto();  closesocket();

Window下编译OpenJDK17

本文详细介绍Window下如何编译OpenJDK17,包含源码路径,各工具下载地址,严格按照文章中的步骤来操作,你将获得一个由自己亲手编译出的jdk。  一、下载OpenJDK17源码 下载地址:GitHub - openjdk/jdk at jdk-17+35 说明: 1、kkgithub为github的国内镜像,能够提高下载速度  2、下载下来的源码存放路径:无中文、无空格

POJ 2823 Sliding Window(线段树入门)

题意: 8 31 3 -1 -3 5 3 6 7 一串数列,有一个窗口大小为3,从数列开始往后移动,输出最大和最小值。 -1 -3 -3 -3 3 33 3 5 5 6 7 窗口大小为3 思路: 维护一个线段树,代码很详细 解题心得: 因为关键值的输入量有1000000,也就是叶节点有1000000个,总节点按理说是2000000-1,但这题得开3000000才能过

Flink原理与实现:Window的实现原理

硬刚大数据系列文章链接: 2021年从零到大数据专家的学习指南(全面升级版) 2021年从零到大数据专家面试篇之Hadoop/HDFS/Yarn篇 2021年从零到大数据专家面试篇之SparkSQL篇 2021年从零到大数据专家面试篇之消息队列篇 2021年从零到大数据专家面试篇之Spark篇 2021年从零到大数据专家面试篇之Hbase篇

Apache Flink:Keyed Window与Non-Keyed Window

Apache Flink中,Window操作在流式数据处理中是非常核心的一种抽象,它把一个无限流数据集分割成一个个有界的Window(或称为Bucket),然后就可以非常方便地定义作用于Window之上的各种计算操作。本文我们主要基于Apache Flink 1.4.0版本,说明Keyed Window与Non-Keyed Window的基本概念,然后分别对与其相关的WindowFunction

SparkSQL在字节跳动的应用实践和优化实战

来源:字节跳动白泉的分享 作者:大数据技术与架构整理 点击右侧关注,大数据开发领域最强公众号! 点击右侧关注,暴走大数据! By  大数据技术与架构 场景描述: 面对大量复杂的数据分析需求,提供一套稳定、高效、便捷的企业级查询分析服务具有重大意义。本次演讲介绍了字节跳动