airflow调度时间详解

2024-08-24 03:04
文章标签 详解 时间 调度 airflow

本文主要是介绍airflow调度时间详解,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

⭐️ airflow调度概述

Apache Airflow 是一个开源的工作流调度和监控平台,广泛用于数据工程、ETL(提取、转换、加载)管道以及各种自动化任务。下面我将详细说明 Airflow 的调度算法。

1. DAG(有向无环图)

Airflow 的核心是 DAG(Directed Acyclic Graph),它定义了一组有序的任务,其中每个任务称为一个 “task”。DAG 是调度的基本单位,Airflow 通过 DAG 定义任务的依赖关系和执行顺序。

在这里插入图片描述

2. DAG调度器

Airflow 中的调度器(Scheduler)负责监控 DAG,决定何时触发 DAG 中的任务。

3. 调度策略(Scheduling Policy)

在 Airflow 中,调度策略定义了 DAG 应该何时被触发。调度策略通常基于以下两个因素:

  • 时间表(Schedule Interval): 这是一个时间间隔,定义了 DAG 被调度的频率。例如,每小时调度一次、每天调度一次等。时间表可以通过 Cron 表达式、timedelta 对象等来定义。

  • 开始时间(Start Date): DAG 从这个时间开始调度。start_dateschedule_interval 一起决定了 DAG 的首次执行时间和后续的执行时间点。

4. Catchup

默认情况下,Airflow 会自动执行所有未执行的 DAG 实例。如果你设置了一个 DAG 的 start_date 为过去的某个时间,且 schedule_interval 为每天执行一次,Airflow 会在下次调度器运行时尝试执行所有中间日期的任务,直到赶上当前日期。

如果 DAG 不是为了处理其追赶而编写的,那么可以关闭catchup。这可以通过在DAG中设置catchup=False或在配置文件中设置catchup_by_default=False来实现。关闭catchup时,调度程序仅为最新间隔创建DAG运行。

5. External Triggers

Airflow 还支持外部事件触发,例如通过 REST API 或 Sensor 任务来触发 DAG 或特定任务的执行。这个机制使得 Airflow 能够适应动态和复杂的调度需求,而不仅仅依赖于固定的时间表。

在这里插入图片描述

⭐️ airflow内部是怎样计算调度时间的

Airflow 的开始时间(start_date)和调度时间(schedule_interval)是密切相关的,理解它们之间的关系对正确配置 DAG 至关重要。

1. 开始时间(start_date)

start_date 是指 DAG 或任务首次被调度的参考时间点。它表示任务计划从何时开始执行。重要的是,Airflow 将在 start_date 后的第一个调度间隔完成时实际开始执行任务。换句话说,start_date 指的是任务应该开始考虑执行的时间,而不是实际执行的时间。

2. 调度时间(schedule_interval)

schedule_interval 是指 DAG 定期运行的时间间隔。例如,如果你设置了 schedule_interval=‘@daily’,Airflow 会每天触发一次 DAG。Airflow 调度任务的实际时间通常是 start_date 加上 schedule_interval 的长度。

3. 两者的关系

Airflow 的调度器总是会在 start_date 之后的调度间隔结束时调度任务,这意味着任务实际运行的时间通常是 start_date 加上一个 schedule_interval。

举例说明,假设你有一个 DAG,其 start_date 设置为 2024-08-14 00:00:00,schedule_interval 设置为 @daily(每天运行一次):start_date 是 2024-08-14 00:00:00。
第一次调度的实际执行时间(即第一个 DAG run)会是 2024-08-15 00:00:00。

为什么会这样? 因为 Airflow 认为每个 DAG run 是在上一个时间区间的末尾处理数据。例如,在这个例子中,2024-08-15 00:00:00 运行的是 2024-08-14 的数据。

4. 调度时间的具体计算

Airflow 采用了如下逻辑计算调度时间:

初始调度时间计算:
初始调度时间是基于 start_date 和 schedule_interval 计算得出的第一个调度时间。例如,如果 start_date 是 2024-08-14 00:00:00,schedule_interval 是 1 小时,那么第一个调度时间是 2024-08-14 01:00:00。

后续调度时间计算:
Airflow 会从 start_date 开始逐个计算调度时间,直到当前时间。例如,若 schedule_interval 是 1 小时,Airflow 会逐小时计算每个调度时间点。

Missed Schedules(错过的调度):
如果一个 DAG 处于 paused 状态或因为某些原因未被调度,Airflow 会在 DAG 恢复为 active 状态时补齐所有错过的调度时间点。

5. 实际运行中的情况

在实际应用中,start_date 和 schedule_interval 的组合可能会导致以下情况:
迟到执行:由于 schedule_interval 的存在,第一次 DAG 任务通常在 start_date 后一个完整的调度间隔结束时执行。
调度延迟:如果 DAG 配置或系统资源不够,可能会出现调度延迟。

6. 示例说明

假设你有一个 DAG:

start_date: 2024-08-14 00:00:00
schedule_interval: 每 1 小时一次 (timedelta(hours=1))

那么 Airflow 会按如下时间点调度任务:

2024-08-14 01:00:00 调度 2024-08-14 00:00:00 到 01:00:00 这一小时的数据
2024-08-14 02:00:00 调度 2024-08-14 01:00:00 到 02:00:00 这一小时的数据
以此类推

7. 具体代码实现

在 Airflow 的源码中,调度时间的计算主要依赖于以下几个组件:
DagRun: 每次 DAG 运行都会创建一个 DagRun 对象,其中记录了调度时间、开始时间、结束时间等信息。
next_dagrun_info: 这个函数会基于当前的 schedule_interval 计算下一个调度时间。
catchup: 如果 catchup=True,Airflow 会尝试补齐所有错过的调度时间。

在这里插入图片描述

⭐️ 一个场景

假如开始时间为当天0点,每隔1小时执行一次,而且开始时DAG是被paused的,当在凌晨3点手动从paused状态变成acitve状态,那么后面的运行是怎么调度的,分析如下:

在 Airflow 中,DAG 被 paused 状态解除并变为 active 状态时,调度的逻辑将根据 start_dateschedule_interval 来决定接下来的任务调度时间。让我们详细讨论一下这种情况下的调度行为。

1. 情景描述

  • start_date: 当天的凌晨 00:00(例如 2024-08-14 00:00)。
  • schedule_interval: 每 1 小时运行一次('0 * * * *'timedelta(hours=1))。
  • DAG 状态: 初始时 DAG 处于 paused 状态,凌晨 3 点手动将其设置为 active 状态。

2. 关键点

  1. DAG 的调度逻辑

    • Airflow 的调度是基于 start_dateschedule_interval 的,并且会计算所有可能的调度时间点。
    • 当 DAG 处于 paused 状态时,Airflow 不会调度新的任务实例,但会“记住”错过的调度窗口。
    • 当 DAG 被从 paused 变为 active 时,Airflow 会立即尝试补齐所有错过的调度实例,除非你手动跳过这些实例。
  2. 调度行为

    • 你在凌晨 3 点将 DAG 从 paused 变为 active 状态后,Airflow 将立即调度凌晨 1 点、2 点和 3 点的实例,因为它们都是基于 start_dateschedule_interval 计算出来的调度点。
    • 这意味着,当你在凌晨 3 点将 DAG 设为 active,Airflow 会依次调度并执行 00:00-01:00、01:00-02:00、02:00-03:00 这几个时间段的 DAG 任务。

3. 示例说明

假设 start_date 是 2024-08-14 00:00,DAG 设定为每 1 小时执行一次,调度时间点依次是:

  • 2024-08-14 01:00:00
  • 2024-08-14 02:00:00
  • 2024-08-14 03:00:00
  • 2024-08-14 04:00:00

如果 DAG 在 2024-08-14 03:00:00 从 paused 状态变为 active,Airflow 会立即调度前面错过的任务:

  • 2024-08-14 01:00:00(任务处理 00:00 到 01:00 的数据)
  • 2024-08-14 02:00:00(任务处理 01:00 到 02:00 的数据)
  • 2024-08-14 03:00:00(任务处理 02:00 到 03:00 的数据)

Airflow 会按照这些顺序来执行这些任务,确保所有错过的调度时间点都得到处理。

下图是一个在实际生产中的案例,开始时间是凌晨0点5分,每5个小时调度一次,但开始时DAG是paused的,上午9点多手动将DAG恢复,此图是从airflow ui 中截取的,

在这里插入图片描述

笔者水平有限,若有不对的地方欢迎评论指正!

这篇关于airflow调度时间详解的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1101263

相关文章

Linux换行符的使用方法详解

《Linux换行符的使用方法详解》本文介绍了Linux中常用的换行符LF及其在文件中的表示,展示了如何使用sed命令替换换行符,并列举了与换行符处理相关的Linux命令,通过代码讲解的非常详细,需要的... 目录简介检测文件中的换行符使用 cat -A 查看换行符使用 od -c 检查字符换行符格式转换将

详解C#如何提取PDF文档中的图片

《详解C#如何提取PDF文档中的图片》提取图片可以将这些图像资源进行单独保存,方便后续在不同的项目中使用,下面我们就来看看如何使用C#通过代码从PDF文档中提取图片吧... 当 PDF 文件中包含有价值的图片,如艺术画作、设计素材、报告图表等,提取图片可以将这些图像资源进行单独保存,方便后续在不同的项目中使

Android中Dialog的使用详解

《Android中Dialog的使用详解》Dialog(对话框)是Android中常用的UI组件,用于临时显示重要信息或获取用户输入,本文给大家介绍Android中Dialog的使用,感兴趣的朋友一起... 目录android中Dialog的使用详解1. 基本Dialog类型1.1 AlertDialog(

C#数据结构之字符串(string)详解

《C#数据结构之字符串(string)详解》:本文主要介绍C#数据结构之字符串(string),具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录转义字符序列字符串的创建字符串的声明null字符串与空字符串重复单字符字符串的构造字符串的属性和常用方法属性常用方法总结摘

Java中StopWatch的使用示例详解

《Java中StopWatch的使用示例详解》stopWatch是org.springframework.util包下的一个工具类,使用它可直观的输出代码执行耗时,以及执行时间百分比,这篇文章主要介绍... 目录stopWatch 是org.springframework.util 包下的一个工具类,使用它

Java进行文件格式校验的方案详解

《Java进行文件格式校验的方案详解》这篇文章主要为大家详细介绍了Java中进行文件格式校验的相关方案,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、背景异常现象原因排查用户的无心之过二、解决方案Magandroidic Number判断主流检测库对比Tika的使用区分zip

Java实现时间与字符串互相转换详解

《Java实现时间与字符串互相转换详解》这篇文章主要为大家详细介绍了Java中实现时间与字符串互相转换的相关方法,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、日期格式化为字符串(一)使用预定义格式(二)自定义格式二、字符串解析为日期(一)解析ISO格式字符串(二)解析自定义

springboot security快速使用示例详解

《springbootsecurity快速使用示例详解》:本文主要介绍springbootsecurity快速使用示例,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝... 目录创www.chinasem.cn建spring boot项目生成脚手架配置依赖接口示例代码项目结构启用s

Python中随机休眠技术原理与应用详解

《Python中随机休眠技术原理与应用详解》在编程中,让程序暂停执行特定时间是常见需求,当需要引入不确定性时,随机休眠就成为关键技巧,下面我们就来看看Python中随机休眠技术的具体实现与应用吧... 目录引言一、实现原理与基础方法1.1 核心函数解析1.2 基础实现模板1.3 整数版实现二、典型应用场景2

一文详解SpringBoot响应压缩功能的配置与优化

《一文详解SpringBoot响应压缩功能的配置与优化》SpringBoot的响应压缩功能基于智能协商机制,需同时满足很多条件,本文主要为大家详细介绍了SpringBoot响应压缩功能的配置与优化,需... 目录一、核心工作机制1.1 自动协商触发条件1.2 压缩处理流程二、配置方案详解2.1 基础YAML