Flink任务重启策略

2024-09-08 13:38
文章标签 策略 重启 flink 任务

本文主要是介绍Flink任务重启策略,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

概述

  • Flink支持不同的重启策略,以在故障发生时控制作业如何重启
  • 集群在启动时会伴随一个默认的重启策略,在没有定义具体重启策略时会使用该默认策略。
  • 如果在工作提交时指定了一个重启策略,该策略会覆盖集群的默认策略默认的重启策略可以通过 Flink 的配置文件 flink-conf.yaml 指定。配置参数 restart-strategy 定义了哪个策略被使用。
  • 常用的重启策略:
    1. 固定间隔 (Fixed delay)
    2. 失败率 (Failure rate)
    3. 无重启 (No restart)
  • 如果没有启用 checkpointing,则使用无重启 (no restart) 策略。如果启用了 checkpointing,但没有配置重启策略,则使用固定间隔 (fixed-delay) 策略
  • 重启策略可以在flink-conf.yaml中配置,表示全局的配置。也可以在应用代码中动态指定,会覆盖全局配置

重启策略

固定间隔:

第一种:全局配置 flink-conf.yaml

restart-strategy: fixed-delay 
restart-strategy.fixed-delay.attempts: 3 
restart-strategy.fixed-delay.delay: 10 s

第二种:应用代码设置:

env.setRestartStrategy(RestartStrategies.fixedDelayRestart( 3, // 尝试重启的次数 Time.of(10, TimeUnit.SECONDS)) // 间隔 
);

失败率:

  • 失败率重启策略在Job失败后会重启,但是超过失败率后,Job会最终被认定失败。在两个连续的重启尝试之间,重启策略会等待一个固定的时间

下面配置是5分钟内若失败了3次则认为该job失败,重试间隔为10s
第一种:全局配置 flink-conf.yaml

restart-strategy: failure-rate  
restart-strategy.failure-rate.max-failures-per-interval: 3  
restart-strategy.failure-rate.failure-rate-interval: 5 min  
restart-strategy.failure-rate.delay: 10 s

第二种:应用代码设置

env.setRestartStrategy(RestartStrategies.failureRateRestart(  3, //一个时间段内的最大失败次数  Time.of(5, TimeUnit.MINUTES), // 衡量失败次数的是时间段  Time.of(10, TimeUnit.SECONDS)) // 间隔  
);

无重启策略

第一种:全局配置 flink-conf.yaml

restart-strategy: none

第二种:应用代码设置

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 
env.setRestartStrategy(RestartStrategies.noRestart());

实际代码演示

public class RestartTest {public static void main(String[] args) {//获取flink的运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 每隔1000 ms进行启动一个检查点【设置checkpoint的周期】env.enableCheckpointing(1000);// 间隔10秒 重启3次env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,Time.seconds(10)));//5分钟内若失败了3次则认为该job失败,重试间隔为10senv.setRestartStrategy(RestartStrategies.failureRateRestart(3,Time.of(5,TimeUnit.MINUTES),Time.of(10,TimeUnit.SECONDS)));//不重试env.setRestartStrategy(RestartStrategies.noRestart());}
}

这篇关于Flink任务重启策略的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1148265

相关文章

解决systemctl reload nginx重启Nginx服务报错:Job for nginx.service invalid问题

《解决systemctlreloadnginx重启Nginx服务报错:Jobfornginx.serviceinvalid问题》文章描述了通过`systemctlstatusnginx.se... 目录systemctl reload nginx重启Nginx服务报错:Job for nginx.javas

Python Invoke自动化任务库的使用

《PythonInvoke自动化任务库的使用》Invoke是一个强大的Python库,用于编写自动化脚本,本文就来介绍一下PythonInvoke自动化任务库的使用,具有一定的参考价值,感兴趣的可以... 目录什么是 Invoke?如何安装 Invoke?Invoke 基础1. 运行测试2. 构建文档3.

windows系统下shutdown重启关机命令超详细教程

《windows系统下shutdown重启关机命令超详细教程》shutdown命令是一个强大的工具,允许你通过命令行快速完成关机、重启或注销操作,本文将为你详细解析shutdown命令的使用方法,并提... 目录一、shutdown 命令简介二、shutdown 命令的基本用法三、远程关机与重启四、实际应用

Python 中 requests 与 aiohttp 在实际项目中的选择策略详解

《Python中requests与aiohttp在实际项目中的选择策略详解》本文主要介绍了Python爬虫开发中常用的两个库requests和aiohttp的使用方法及其区别,通过实际项目案... 目录一、requests 库二、aiohttp 库三、requests 和 aiohttp 的比较四、requ

解决Cron定时任务中Pytest脚本无法发送邮件的问题

《解决Cron定时任务中Pytest脚本无法发送邮件的问题》文章探讨解决在Cron定时任务中运行Pytest脚本时邮件发送失败的问题,先优化环境变量,再检查Pytest邮件配置,接着配置文件确保SMT... 目录引言1. 环境变量优化:确保Cron任务可以正确执行解决方案:1.1. 创建一个脚本1.2. 修

Redis过期键删除策略解读

《Redis过期键删除策略解读》Redis通过惰性删除策略和定期删除策略来管理过期键,惰性删除策略在键被访问时检查是否过期并删除,节省CPU开销但可能导致过期键滞留,定期删除策略定期扫描并删除过期键,... 目录1.Redis使用两种不同的策略来删除过期键,分别是惰性删除策略和定期删除策略1.1惰性删除策略

Java实现任务管理器性能网络监控数据的方法详解

《Java实现任务管理器性能网络监控数据的方法详解》在现代操作系统中,任务管理器是一个非常重要的工具,用于监控和管理计算机的运行状态,包括CPU使用率、内存占用等,对于开发者和系统管理员来说,了解这些... 目录引言一、背景知识二、准备工作1. Maven依赖2. Gradle依赖三、代码实现四、代码详解五

如何使用celery进行异步处理和定时任务(django)

《如何使用celery进行异步处理和定时任务(django)》文章介绍了Celery的基本概念、安装方法、如何使用Celery进行异步任务处理以及如何设置定时任务,通过Celery,可以在Web应用中... 目录一、celery的作用二、安装celery三、使用celery 异步执行任务四、使用celery

什么是cron? Linux系统下Cron定时任务使用指南

《什么是cron?Linux系统下Cron定时任务使用指南》在日常的Linux系统管理和维护中,定时执行任务是非常常见的需求,你可能需要每天执行备份任务、清理系统日志或运行特定的脚本,而不想每天... 在管理 linux 服务器的过程中,总有一些任务需要我们定期或重复执行。就比如备份任务,通常会选在服务器资

在JS中的设计模式的单例模式、策略模式、代理模式、原型模式浅讲

1. 单例模式(Singleton Pattern) 确保一个类只有一个实例,并提供一个全局访问点。 示例代码: class Singleton {constructor() {if (Singleton.instance) {return Singleton.instance;}Singleton.instance = this;this.data = [];}addData(value)