本文主要是介绍花10分钟让你彻底学会Python定时任务框架apscheduler,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
文章目录
- 安装
- 基本概念介绍
- 调度器的工作流程
- 实例1 -间隔性任务
- 实例2 - cron 任务
- 配置调度器
- 方法一
- 方法二
- 方法三:
- 启动调度器
- 方法一:使用默认的作业存储器:
- 方法二:使用数据库作为存储器:
- 调度器事件监听
说到定时任务,你会想起 linux 自带的 crontab ,windows 自带的任务计划,都可以实现守时任务。没错,操作系统基本都会提供定时任务的实现,但是如果你想要更加精细化的控制,或者说任务程序需要跨平台运行,最好还是自己实现定时任务框架,Python 的 apscheduler 提供了非常丰富而且方便易用的定时任务接口。本文介绍如何使用 apscheduler 实现你的定时任务。
apscheduler 使用起来十分方便。提供了基于日期、固定时间间隔以及crontab 类型的任务,我们可以在主程序的运行过程中快速增加新作业或删除旧作业,如果把作业存储在数据库中,那么作业的状态会被保存,当调度器重启时,不必重新添加作业,作业会恢复原状态继续执行。apscheduler 可以当作一个跨平台的调度工具来使用,可以做为 linux 系统crontab 工具或 windows 计划任务程序的替换。注意,apscheduler 不是一个守护进程或服务,它自身不带有任何命令行工具。它主要是要在现有的应用程序中运行,也就是说,apscheduler 为我们提供了构建专用调度器或调度服务的基础模块。
安装
安装非常简单,会用 pip 的人都知道
pin install apscheduler
基本概念介绍
触发器(triggers):触发器包含调度逻辑,描述一个任务何时被触发,按日期或按时间间隔或按 cronjob 表达式三种方式触发。每个作业都有它自己的触发器,除了初始配置之外,触发器是完全无状态的。
作业存储器(job stores):作业存储器指定了作业被存放的位置,默认情况下作业保存在内存,也可将作业保存在各种数据库中,当作业被存放在数据库中时,它会被序列化,当被重新加载时会反序列化。作业存储器充当保存、加载、更新和查找作业的中间商。在调度器之间不能共享作业存储。
执行器(executors):执行器是将指定的作业(调用函数)提交到线程池或进程池中运行,当任务完成时,执行器通知调度器触发相应的事件。
调度器(schedulers):任务调度器,属于控制角色,通过它配置作业存储器、执行器和触发器,添加、修改和删除任务。调度器协调触发器、作业存储器、执行器的运行,通常只有一个调度程序运行在应用程序中,开发人员通常不需要直接处理作业存储器、执行器或触发器,配置作业存储器和执行器是通过调度器来完成的。
调度器的工作流程
实例1 -间隔性任务
# -*- coding: utf-8 -*-
# Time: 2018/10/13 19:01:30
# File Name: ex_interval.pyfrom datetime import datetime
import os
from apscheduler.schedulers.blocking import BlockingSchedulerdef tick():print('Tick! The time is: %s' % datetime.now())if __name__ == '__main__':scheduler = BlockingScheduler()scheduler.add_job(tick, 'interval', seconds=3)print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C '))try:scheduler.start()except (KeyboardInterrupt, SystemExit):pass
说明:
第 1 行代码声明文件内容以 utf-8 编码,告诉Python 解释器以 utf-8 编码解析源代码文件。
导入 datetime 模块,用于打印当前时间。导入 os 模块,用于判断操作系统类型。
导入调度器模块 BlockingScheduler,这是最简单的调度器,调用 start 方阻塞当前进程,如果你的程序只用于调度,除了调度进程外没有其他后台进程,那么请用 BlockingScheduler 非常有用,此时调度进程相当于守护进程。
定义一个函数 tick 代表我们要调度的作业程序。
实例化一个 BlockingScheduler 类,不带参数表明使用默认的作业存储器-内存,默认的执行器是线程池执行器,最大并发线程数默认为 10 个(另一个是进程池执行器)。
第 11 行添加一个作业 tick,触发器为 interval,每隔 3 秒执行一次,另外的触发器为 date,cron。date 按特定时间点触发,cron 则按固定的时间间隔触发。
加入捕捉用户中断执行和解释器退出异常,pass 关键字,表示什么也不做。
执行结果如下所示:
可以看出,每 3 秒打印出了当前时间。
实例2 - cron 任务
# -*- coding: utf-8 -*-
# Time: 2018/10/13 19:21:09
# File Name: ex_cron.pyfrom datetime import datetime
import os
from apscheduler.schedulers.blocking import BlockingSchedulerdef tick():print('Tick! The time is: %s' % datetime.now())if __name__ == '__main__':scheduler = BlockingScheduler()scheduler.add_job(tick, 'cron', hour=19,minute=23)print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C '))try:scheduler.start()except (KeyboardInterrupt, SystemExit):pass
定时 cron 任务也非常简单,直接给触发器 trigger 传入 ‘cron’ 即可。hour =19 ,minute =23 这里表示每天的19:23 分执行任务。这里可以填写数字,也可以填写字符串
hour =19 , minute =23
hour ='19', minute ='23'
minute = '*/3' 表示每 5 分钟执行一次
hour ='19-21', minute= '23' 表示 19:23、 20:23、 21:23 各执行一次任务
python 就是这么灵活、易用、可读。例 2 执行结果如下:
配置调度器
调度器的主循环其实就是反复检查是不是有到时需要执行的任务,分以下几步进行:
- 询问自己的每一个作业存储器,有没有到期需要执行的任务,如果有,计算这些作业中每个作业需要运行的时间点,如果时间点有多个,做 coalesce 检查。
- 提交给执行器按时间点运行。
在配置调度器前,我们首先要选取适合我们应用环境场景的调度器,存储器和执行器。下面是各调度器的适用场景:
- BlockingScheduler:适用于调度程序是进程中唯一运行的进程,调用start函数会阻塞当前线程,不能立即返回。
- BackgroundScheduler:适用于调度程序在应用程序的后台运行,调用start后主线程不会阻塞。
- AsyncIOScheduler:适用于使用了asyncio模块的应用程序。
- GeventScheduler:适用于使用gevent模块的应用程序。
- TwistedScheduler:适用于构建Twisted的应用程序。
- QtScheduler:适用于构建Qt的应用程序。
上述调度器可以满足我们绝大多数的应用环境,本文以两种调度器为例说明如何进行调度器配置。
作业存储器的选择有两种:一是内存,也是默认的配置;二是数据库。具体选哪一种看我们的应用程序在崩溃时是否重启整个应用程序,如果重启整个应用程序,那么作业会被重新添加到调度器中,此时简单的选取内存作为作业存储器即简单又高效。但是,当调度器重启或应用程序崩溃时您需要您的作业从中断时恢复正常运行,那么通常我们选择将作业存储在数据库中,使用哪种数据库通常取决于为在您的编程环境中使用了什么数据库。我们可以自由选择,PostgreSQL 是推荐的选择,因为它具有强大的数据完整性保护。
同样的,执行器的选择也取决于应用场景。通常默认的 ThreadPoolExecutor 已经足够好。如果作业负载涉及CPU 密集型操作,那么应该考虑使用 ProcessPoolExecutor,甚至可以同时使用这两种执行器,将ProcessPoolExecutor 行器添加为二级执行器。
apscheduler 提供了许多不同的方法来配置调度器。可以使用字典,也可以使用关键字参数传递。首先实例化调度程序,添加作业,然后配置调度器,获得最大的灵活性。
如果调度程序在应用程序的后台运行,选择 BackgroundScheduler,并使用默认的 jobstore 和默认的executor,则以下配置即可:
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
假如我们想配置更多信息:设置两个执行器、两个作业存储器、调整新作业的默认值,并设置不同的时区。下述三个方法是完全等同的。
配置需求
- 配置名为“mongo”的MongoDBJobStore作业存储器
- 配置名为“default”的SQLAlchemyJobStore(使用SQLite)
- 配置名为“default”的ThreadPoolExecutor,最大线程数为20
- 配置名为“processpool”的ProcessPoolExecutor,最大进程数为5
- UTC作为调度器的时区
- coalesce默认情况下关闭
- 作业的默认最大运行实例限制为3
方法一
1 from pytz import utc23 from apscheduler.schedulers.background import BackgroundScheduler4 from apscheduler.jobstores.mongodb import MongoDBJobStore5 from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore6 from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExec utor789 jobstores = {10 'mongo': MongoDBJobStore(),11 'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')12 }13 executors = {14 'default': ThreadPoolExecutor(20),15 'processpool': ProcessPoolExecutor(5)16 }17 job_defaults = {18 'coalesce': False,19 'max_instances': 320 }21
这篇关于花10分钟让你彻底学会Python定时任务框架apscheduler的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!