SparkStreaming的窗口

2024-04-09 22:58
文章标签 窗口 sparkstreaming

本文主要是介绍SparkStreaming的窗口,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

    • 1.window(windowLength, slideInterval)
    • 2.countByWindow(windowLength,slideInterval)
    • 3.countByValueAndWindow
    • 4.reduceByWindow(func, windowLength,slideInterval)

窗口函数,就是在DStream流上,以一个可配置的长度为窗口,以一个可配置的速率向前移动窗口,根据窗口函数的具体内容,分别对当前窗口中的这一波数据采取某个对应的操作算子。
在这里插入图片描述

需要注意的是窗口长度,和窗口移动速率需要是batch time的整数倍。

1.window(windowLength, slideInterval)

该操作由一个DStream对象调用,传入一个窗口长度参数,一个窗口移动速率参数,然后将当前时刻当前长度窗口中的元素取出形成一个新的DStream。


import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}object sparkWindowDemo {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("demo")//采集周期batch time,指定的2秒为每次采集的时间间隔val streamingContext = new StreamingContext(sparkConf,Seconds(2))streamingContext.checkpoint("/in/checkPoint/")val kafkaParams = Map((ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.184.40:9092"),(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),(ConsumerConfig.GROUP_ID_CONFIG, "kafkaGroup"))val kafkaStream:InputDStream[ConsumerRecord[String,String]] = KafkaUtils.createDirectStream(streamingContext,//本地策略,可用的执行器上均匀分布LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe(Set("sparkKafkaDemo"), kafkaParams))//window窗口,可加第二个参数,参数是batch time的整数倍,滑动窗口//一个参数  x秒内出现几次//两个参数  x秒加前一窗口滑动y秒出现次数  有重复数据计算val numStream = kafkaStream.flatMap(_.value().toString.split("\\s+")).map((_, 1)).window(Seconds(x),Seconds(y))numStream.print()streamingContext.start()streamingContext.awaitTermination()}
}

2.countByWindow(windowLength,slideInterval)

返回窗口内出现元素个数,注意:需要设置checkpoint

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrat

这篇关于SparkStreaming的窗口的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/889439

相关文章

bat脚本启动git bash窗口,并执行命令方式

《bat脚本启动gitbash窗口,并执行命令方式》本文介绍了如何在Windows服务器上使用cmd启动jar包时出现乱码的问题,并提供了解决方法——使用GitBash窗口启动并设置编码,通过编写s... 目录一、简介二、使用说明2.1 start.BAT脚本2.2 参数说明2.3 效果总结一、简介某些情

基于Redis有序集合实现滑动窗口限流的步骤

《基于Redis有序集合实现滑动窗口限流的步骤》滑动窗口算法是一种基于时间窗口的限流算法,通过动态地滑动窗口,可以动态调整限流的速率,Redis有序集合可以用来实现滑动窗口限流,本文介绍基于Redis... 滑动窗口算法是一种基于时间窗口的限流算法,它将时间划分为若干个固定大小的窗口,每个窗口内记录了该时间

使用JS/Jquery获得父窗口的几个方法(笔记)

<pre name="code" class="javascript">取父窗口的元素方法:$(selector, window.parent.document);那么你取父窗口的父窗口的元素就可以用:$(selector, window.parent.parent.document);如题: $(selector, window.top.document);//获得顶级窗口里面的元素 $(

专题二_滑动窗口_算法专题详细总结

目录 滑动窗口,引入: 滑动窗口,本质:就是同向双指针; 1.⻓度最⼩的⼦数组(medium) 1.解析:给我们一个数组nums,要我们找出最小子数组的和==target,首先想到的就是暴力解法 1)暴力: 2)优化,滑动窗口: 1.进窗口 2.出窗口 3.更新值 2.⽆重复字符的最⻓⼦串(medium) 1)仍然是暴力解法: 2)优化: 进窗口:hash[s[rig

hot100刷题第1-9题,三个专题哈希,双指针,滑动窗口

求满足条件的子数组,一般是前缀和、滑动窗口,经常结合哈希表; 区间操作元素,一般是前缀和、差分数组 数组有序,更大概率会用到二分搜索 目前已经掌握一些基本套路,重零刷起leetcode hot 100, 套路题按套路来,非套路题适当参考gpt解法。 一、梦开始的地方, 两数之和 class Solution:#注意要返回的是数组下标def twoSum(self, nums: Lis

主窗口的设计与开发(二)

主窗口的设计与开发(二) 前言         在上一集当中,我们完成了主窗口的初始化,主窗口包括了左中右三个区域。我们还完成了对左窗口的初始化,左窗口包括了用户头像、会话标签页按钮、好友标签页按钮以及好友申请标签页按钮。对于切换每个标签页,我们还做了初始化信号槽的内容。最后我们将整个MainWidget类设置为单例模式。         那么这一集我们将继续完成主窗口的设计与开发,这一集我

QtC++截图支持窗口获取

介绍 在截图工具中你会发现,接触到窗口后会自动圈出目标窗口,个别强大一点的还能进行元素识别可以自动圈出元素,那么今天简单分析一下QTc++如何获取窗口并圈出当前鼠标下的窗口。 介绍1.如何获取所有窗口2.比较函数3.实现窗口判断 结尾 1.如何获取所有窗口 1.我们需要调用windows接口EnumWindowsProc回调函数来获取所有顶级窗口,需要包含windows.

运行.bat文件,如何在Dos窗口里面得到该文件的路径

把java代码打包成.jar文件,编写一个.bat文件,执行该文件,编译.jar包;(.bat,.jar放在同一个文件夹下) 运行.bat文件,如何在Dos窗口里面得到该文件的路径,并运行.jar文件: echo 当前盘符:%~d0 echo 当前路径:%cd% echo 当前执行命令行:%0 echo 当前bat文件路径:%~dp0 echo 当前bat文件短路径:%~sdp0 nc

类codepen的实现可拖拽窗口demo

首先说下思想 flex或者其他布局方式,实现左右分割布局,主盒子宽度100%,左右布局中包含一个分割条(可在布局容器中,也可以单独定义)为分隔条绑定鼠标点击事件,为document绑定鼠标移动事件和鼠标放开事件,通过监听鼠标移动事件和上一个状态保存下来的鼠标位置作对比,判断当前鼠标移动方向(往左还是往右)然后计算当前鼠标位置和鼠标点击位置的距离,来计算左右容器的变化,然后通过dom的方式设置宽度

【leetcode详解】考试的最大困扰度(滑动窗口典例)

实战总结: sum += answerKey[right] == c; 经典操作,将判断语句转化为0, 1接收来计数//大问题分解: 对'T'还是'F'做修改, 传参为c//滑动窗口: 遍历, 维护left& right指向 及 c的个数, 更新不知从何下手写代码时:考虑先写好第一次的,然后以此为基础补充代码以适后续情况 题面: 解题感受: 思路总体好想, 实现略有挑战。 思路分析: