周期性清除Spark Streaming流状态的方法

2024-09-06 21:58

本文主要是介绍周期性清除Spark Streaming流状态的方法,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在Spark Streaming程序中,我们经常需要使用有状态的流来统计一些累积性的指标,比如各个商品的PV。简单的代码描述如下,使用mapWithState()算子:


现在的问题是,PV并不是一直累加的,而是每天归零,重新统计数据。要达到在凌晨0点清除状态的目的,有以下两种方法。


编写脚本重启Streaming程序

用crontab、Azkaban等在凌晨0点调度执行下面的Shell脚本:

stream_app_name='com.xyz.streaming.MallForwardStreaming'
cnt=`ps aux | grep SparkSubmit | grep ${stream_app_name} | wc -l`

if [ ${cnt} -eq 1 ]; then
pid=`ps aux | grep SparkSubmit | grep ${stream_app_name} | awk '{print $2}'`
kill -9 ${pid}
sleep 20
cnt=`ps aux | grep SparkSubmit | grep ${stream_app_name} | wc -l`
if [ ${cnt} -eq 0 ]; then
nohup sh /path/to/streaming/bin/mall_forward.sh > /path/to/streaming/logs/mall_forward.log 2>&1
fi
fi

这种方式最简单,也不需要对程序本身做任何改动。但随着同时运行的Streaming任务越来越多,就会显得越来越累赘了。

给StreamingContext设置超时

在程序启动之前,先计算出当前时间点距离第二天凌晨0点的毫秒数:

def msTillTomorrow = {
val now = new Date()
val tomorrow = new Date(now.getYear, now.getMonth, now.getDate + 1)
tomorrow.getTime - now.getTime
}

然后将Streaming程序的主要逻辑写在while(true)循环中,并且不像平常一样调用StreamingContext.awaitTermination()方法,而改用awaitTerminationOrTimeout()方法,即:

while (true) {
val ssc = new StreamingContext(sc, Seconds(BATCH_INTERVAL))
ssc.checkpoint(CHECKPOINT_DIR)

// ...处理逻辑...

ssc.start()
ssc.awaitTerminationOrTimeout(msTillTomorrow)
ssc.stop(false, true)
Thread.sleep(BATCH_INTERVAL * 1000)
}

在经过msTillTomorrow毫秒之后,StreamingContext就会超时,再调用其stop()方法(注意两个参数,stopSparkContext表示是否停止关联的SparkContext,stopGracefully表示是否优雅停止),就可以停止并重启StreamingContext。


以上两种方法都是仍然采用Spark Streaming的机制进行状态计算的。如果其他条件允许的话,我们还可以抛弃mapWithState(),直接借助外部存储自己维护状态。比如将Redis的Key设计为product_pv:[product_id]:[date],然后在Spark Streaming的每个批次中使用incrby指令,就能方便地统计PV了,不必考虑定时的问题。


640?wx_fmt=gif

640?wx_fmt=jpeg

这篇关于周期性清除Spark Streaming流状态的方法的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1143233

相关文章

Python中__init__方法使用的深度解析

《Python中__init__方法使用的深度解析》在Python的面向对象编程(OOP)体系中,__init__方法如同建造房屋时的奠基仪式——它定义了对象诞生时的初始状态,下面我们就来深入了解下_... 目录一、__init__的基因图谱二、初始化过程的魔法时刻继承链中的初始化顺序self参数的奥秘默认

html5的响应式布局的方法示例详解

《html5的响应式布局的方法示例详解》:本文主要介绍了HTML5中使用媒体查询和Flexbox进行响应式布局的方法,简要介绍了CSSGrid布局的基础知识和如何实现自动换行的网格布局,详细内容请阅读本文,希望能对你有所帮助... 一 使用媒体查询响应式布局        使用的参数@media这是常用的

Spring 基于XML配置 bean管理 Bean-IOC的方法

《Spring基于XML配置bean管理Bean-IOC的方法》:本文主要介绍Spring基于XML配置bean管理Bean-IOC的方法,本文给大家介绍的非常详细,对大家的学习或工作具有一... 目录一. spring学习的核心内容二. 基于 XML 配置 bean1. 通过类型来获取 bean2. 通过

基于Python实现读取嵌套压缩包下文件的方法

《基于Python实现读取嵌套压缩包下文件的方法》工作中遇到的问题,需要用Python实现嵌套压缩包下文件读取,本文给大家介绍了详细的解决方法,并有相关的代码示例供大家参考,需要的朋友可以参考下... 目录思路完整代码代码优化思路打开外层zip压缩包并遍历文件:使用with zipfile.ZipFil

Python处理函数调用超时的四种方法

《Python处理函数调用超时的四种方法》在实际开发过程中,我们可能会遇到一些场景,需要对函数的执行时间进行限制,例如,当一个函数执行时间过长时,可能会导致程序卡顿、资源占用过高,因此,在某些情况下,... 目录前言func-timeout1. 安装 func-timeout2. 基本用法自定义进程subp

Python列表去重的4种核心方法与实战指南详解

《Python列表去重的4种核心方法与实战指南详解》在Python开发中,处理列表数据时经常需要去除重复元素,本文将详细介绍4种最实用的列表去重方法,有需要的小伙伴可以根据自己的需要进行选择... 目录方法1:集合(set)去重法(最快速)方法2:顺序遍历法(保持顺序)方法3:副本删除法(原地修改)方法4:

Python中判断对象是否为空的方法

《Python中判断对象是否为空的方法》在Python开发中,判断对象是否为“空”是高频操作,但看似简单的需求却暗藏玄机,从None到空容器,从零值到自定义对象的“假值”状态,不同场景下的“空”需要精... 目录一、python中的“空”值体系二、精准判定方法对比三、常见误区解析四、进阶处理技巧五、性能优化

C++中初始化二维数组的几种常见方法

《C++中初始化二维数组的几种常见方法》本文详细介绍了在C++中初始化二维数组的不同方式,包括静态初始化、循环、全部为零、部分初始化、std::array和std::vector,以及std::vec... 目录1. 静态初始化2. 使用循环初始化3. 全部初始化为零4. 部分初始化5. 使用 std::a

如何将Python彻底卸载的三种方法

《如何将Python彻底卸载的三种方法》通常我们在一些软件的使用上有碰壁,第一反应就是卸载重装,所以有小伙伴就问我Python怎么卸载才能彻底卸载干净,今天这篇文章,小编就来教大家如何彻底卸载Pyth... 目录软件卸载①方法:②方法:③方法:清理相关文件夹软件卸载①方法:首先,在安装python时,下

电脑死机无反应怎么强制重启? 一文读懂方法及注意事项

《电脑死机无反应怎么强制重启?一文读懂方法及注意事项》在日常使用电脑的过程中,我们难免会遇到电脑无法正常启动的情况,本文将详细介绍几种常见的电脑强制开机方法,并探讨在强制开机后应注意的事项,以及如何... 在日常生活和工作中,我们经常会遇到电脑突然无反应的情况,这时候强制重启就成了解决问题的“救命稻草”。那