APScheduler定时器使用:django中使用apscheduler,使用mysql做存储后端

2024-05-02 07:36

本文主要是介绍APScheduler定时器使用:django中使用apscheduler,使用mysql做存储后端,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、基本环境

python版本:3.8.5

APScheduler==3.10.4
Django==3.2.7
djangorestframework==3.15.1
SQLAlchemy==2.0.29
PyMySQL==1.1.0

二、django基本设置

2.1、新增一个app

该app用来写apscheduler相关的代码

python manage.py startapp gs_scheduler

2.2、修改配置文件settings.py

#使用pymysql做客户端
import pymysql
pymysql.install_as_MySQLdb()INSTALLED_APPS = ['rest_framework', #注册restful 应用'gs_scheduler', #注册新增的app
]#配置mysql
MYSQL_HOST = '127.0.0.1'
MYSQL_PORT = 3306
MYSQL_USER = 'root'
MYSQL_PASSWORD = 'admin-root'
MYSQL_NAME = 'study_websocket'DATABASES = {'default': {'ENGINE': 'django.db.backends.mysql','HOST': MYSQL_HOST,'PORT': MYSQL_PORT,'USER': MYSQL_USER,'PASSWORD': MYSQL_PASSWORD,'NAME': MYSQL_NAME,}
}

2.3、gs_scheduler创建urls.py

1、gs_scheduler/urls.py

from django.urls import path
from . import views
urlpatterns = []

2、根路由urls.py

from django.contrib import admin
from django.urls import path,includeurlpatterns = [path('admin/', admin.site.urls),path('api/scheduler/',include('gs_scheduler.urls')),
]

三、配置gs_scheduler应用

3.1、配置api接口

这些接口,用来展示定时任务的运行情况

1、urls.py

from django.urls import path
from . import views
urlpatterns = [path('run_next/', views.JobNextRunTimeAPIView.as_view()),#定时任务下次运行path('run_history/',views.JobRunTimeHistory.as_view()),#定时任务运行历史path('run_error/',views.JobRunErrorHistory.as_view()), #定时任务最近运行错误
]

2、models.py

from django.db import models# Create your models here.
import sqlite3
from gs_scheduler.management.commands.config import MYSQL_HOST,MYSQL_NAME,MYSQL_PORT,MYSQL_PASSWORD,MYSQL_USER,MYSQL_CHARSET
from datetime import datetime,timedelta
from django.conf import settingsimport pymysql
from concurrent.futures import ThreadPoolExecutor#时间戳转时间字符串
def timestamp_to_time_str(timestamp):# 使用 datetime 模块将时间戳转换为 datetime 对象dt = datetime.fromtimestamp(timestamp)# 将 datetime 对象格式化为时间字符串time_str = dt.strftime('%Y-%m-%d %H:%M:%S')return time_strclass MysqlDB:DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S'# 创建数据库连接池def __init__(self):self.conn = pymysql.connect(host=MYSQL_HOST,port=MYSQL_PORT,user=MYSQL_USER,password=MYSQL_PASSWORD,db=MYSQL_NAME,charset=MYSQL_CHARSET,cursorclass=pymysql.cursors.DictCursor)# 执行数据库增删改查def _execute_sql(self,query):sql,args = querytry:with self.conn.cursor() as cursor:# 执行sql语句if args:if isinstance(args,(tuple,list)):# args = (value1,value2)cursor.execute(sql,args)else:# args = value1cursor.execute(sql,(args,))else:cursor.execute(sql)# 不同类型sql,设置不同返回值if sql.strip().lower()[:6] == 'select':#查询语句rows = cursor.fetchall() or []return rowselif sql.strip().lower()[:4] == 'show':#查看表是否存在result = cursor.fetchall()return resultelse:#增删改语句self.conn.commit()return Trueexcept Exception as e:print('sql执行失败',e)pass# 线程池执行sql语句def start_sql_with_pool(self, sql:str,args=None):with ThreadPoolExecutor() as executor:result = executor.submit(self._execute_sql, (sql,args)).result()return result#创建记录定时任务历史表(调度器使用)def create_apscheduler_history(self):#创建表语句:表名不能是占位符sql = """CREATE TABLE apscheduler_history (id INTEGER AUTO_INCREMENT PRIMARY KEY,job_id VARCHAR(128),run_time DATETIME,is_error TINYINT,error_msg VARCHAR(256) NULL)"""#判断表存在不,不存在再创建results = self.start_sql_with_pool('SHOW TABLES')is_exist = Falsefor dic in results:if dic['Tables_in_{}'.format(MYSQL_NAME,)] == 'apscheduler_history':is_exist = True#表不存在才创建if not is_exist:# print('表不存在,执行创建表')create = self.start_sql_with_pool(sql)# 创建索引self.start_sql_with_pool(db.conn.cursor().execute("CREATE INDEX run_time_index ON apscheduler_history (run_time)"))return True# 将任务运行的结果记录到数据库中(调度器使用)def insert_into_apscheduler_history(self, data_list: list):if len(data_list) == 4:  # 异常执行的任务sql = """INSERT INTO apscheduler_history (job_id,run_time,is_error,error_msg) VALUES (%s,%s,%s,%s)"""else:  # 正常执行的任务sql = """INSERT INTO apscheduler_history (job_id,run_time,is_error) VALUES (%s,%s,%s)"""#使用线程池执行插入语句self.start_sql_with_pool(sql,data_list)# 删除8小时前的历史记录(调度器使用)def delete_8hour_before_history(self):before_8hour = (datetime.now() - timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S')sql = '''DELETE FROM apscheduler_history WHERE run_time < %s'''self.start_sql_with_pool(sql,(before_8hour,))# 查询每个任务的下次运行时间(api展示)def fetch_all_next_run_time(self):sql = 'SELECT id,next_run_time FROM apscheduler_jobs'# 获取查询结果rows = self.start_sql_with_pool(sql)for row in rows:row['next_run_time'] = timestamp_to_time_str(row['next_run_time'])return rows# 查询所有任务最近10次运行记录(api展示)def fetch_all_run_history(self):''':param query:'SELECT id,next_run_time,job_state FROM apscheduler_jobs':return:'''#获取所有定时任务iddata_list = self.fetch_all_next_run_time()id_list = (dic.get('id') for dic in data_list)# 创建一个游标对象sql = """SELECT id,job_id,run_time FROM apscheduler_history WHERE is_error=0 AND job_id = %s ORDER BY run_time DESC LIMIT 10"""ret_list = []for id in id_list:# 执行查询rows = self.start_sql_with_pool(sql,(id,)) or []dic = {'id': id, 'last_run_time': [], 'msg': '最近10次运行时间'}for row in rows:run_time = row.get('run_time')run_time = run_time.strftime(self.DATETIME_FORMAT) if isinstance(run_time,datetime) else run_timedic['last_run_time'].append(run_time)ret_list.append(dic)return ret_list# 获取任务最近运行失败情况(api展示)def fetch_all_error_history(self):data_list = self.fetch_all_next_run_time()id_list = (dic.get('id') for dic in data_list)sql = """SELECT run_time,error_msg FROM apscheduler_history WHERE is_error=1 AND job_id = %s ORDER BY id DESC LIMIT 5"""ret_list = []for id in id_list:# 获取查询结果rows = self.start_sql_with_pool(sql, (id,)) or []dic = {'id': id, 'last_run_time': [], 'msg': '最近5次执行失败'}for row in rows:run_time = row.get('run_time')run_time = run_time.strftime(self.DATETIME_FORMAT) if isinstance(run_time,datetime) else run_timeerror = row.get('error_msg')dic['last_run_time'].append({'run_time': run_time, 'error': error})ret_list.append(dic)return ret_list# 关闭连接def close(self):self.conn.close()if __name__ == '__main__':db = MysqlDB()db.create_apscheduler_history()# for i in range(1,10):#     db.insert_into_apscheduler_history(['send_to_big_data','2024-05-01 13:{}:12'.format(str(i).zfill(2)),1,'执行失败了'])# db.delete_8hour_before_history()# ret = db.fetch_all_run_history()# print(ret,'history')# ret = db.fetch_all_next_run_time()# print(ret,'next')# ret = db.fetch_all_error_history()# print(ret,'error')db.close()

3、views.py

from django.shortcuts import render# Create your views here.
from rest_framework.views import APIView
from rest_framework.response import Response
from .models import MysqlDB#任务下次运行时间
class JobNextRunTimeAPIView(APIView):authentication_classes = []def get(self,request):db = MysqlDB()data = db.fetch_all_next_run_time()db.close()ret = {'code':200,'status':'success','data':data,}return Response(ret)#任务最近运行历史
class JobRunTimeHistory(APIView):authentication_classes = []def get(self,request):db = MysqlDB()data = db.fetch_all_run_history()db.close()ret = {'code':200,'status':'success','data':data}return Response(ret)#任务最近运行错误
class JobRunErrorHistory(APIView):authentication_classes = []def get(self,request):db = MysqlDB()data = db.fetch_all_error_history()db.close()ret = {'code': 200,'status': 'success','data': data}return Response(ret)

3.2、在gs_scheduler创建

1、config.py 代码

该文件存放的是启动APScheduler调度器的一些配置数据

import os
from apscheduler.jobstores.memory import MemoryJobStore #内存做后端存储
#from apscheduler.jobstores.redis import RedisJobStore #redis做后端存储
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore #mysql等做后端存储
# from django.conf import settings
from study_apscheduler import settings
#mysql://root:ldc-root@127.0.0.1:3306/jobs?charset=utf8
MYSQL_CONFIG = settings.DATABASES.get('default')
MYSQL_USER = MYSQL_CONFIG.get('USER')
MYSQL_PASSWORD = MYSQL_CONFIG.get('PASSWORD')
MYSQL_HOST = MYSQL_CONFIG.get('HOST')
MYSQL_PORT = MYSQL_CONFIG.get('PORT')
MYSQL_NAME = MYSQL_CONFIG.get('NAME')
MYSQL_CHARSET = 'utf8mb4'
URL = 'mysql://{}:{}@{}:{}/{}?charset={}'.format(MYSQL_USER,MYSQL_PASSWORD,MYSQL_HOST,MYSQL_PORT,MYSQL_NAME,MYSQL_CHARSET)
#时区
TIME_ZONE = 'Asia/Shanghai'
#job的默认配置
JOB_DEFAULTS =  {'coalesce': True, #系统挂掉,任务积攒多次为执行,True是合并成一次执行,False是执行所有的次数。 持久化存储才有效'max_instances': 3 # 同一个任务同一时间最多只能有3个实例在运行。}
#job的存储后端
JOB_STORE = {'default': SQLAlchemyJobStore(url=URL)
}#监听事件对应的情况
LISTENER={1:'调度程序启动',2:'调度程序关闭',4:'调度程序中任务处理暂停',64:'将任务存储添加到调度程序中',8192:'任务在执行期间引发异常',4096:'任务执行成功',
}

2、task.py

所有的定时任务都存放在这里

import os
from datetime import datetime, timedelta, date
from gs_scheduler.models import MysqlDB
from utils.log_util import info_log
from utils.send_monitor_data import SendData#推送到数据仓的告警信息:5分钟执行一次
send_to_big_data = SendData().send#清除定时任务历史运行记录
def delete_apscheduler_history():db = MysqlDB()db.delete_8hour_before_history()if __name__ == '__main__':print(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))

3、crontab.py

实例化调度器

# 导入所需的调度器类和触发器类
from apscheduler.schedulers.background import BackgroundScheduler #后台运行
from apscheduler.schedulers.blocking import BlockingScheduler  #主进程运行,需要单独运行
from apscheduler.triggers.interval import IntervalTrigger #时间间隔
from apscheduler.triggers.cron import CronTrigger #复杂的定时任务
from apscheduler.triggers.date import DateTrigger #一次性定时任务
from django.core.management.base import BaseCommand
from apscheduler import events
from pytz import timezone
from threading import RLock
from datetime import datetime, timedelta
from gs_scheduler.models import MysqlDB
#定时任务
from .task import delete_apscheduler_history
from .task import send_to_big_data
#日志
from utils.log_util import info_log
from .config import LISTENER
from .config import TIME_ZONE,JOB_DEFAULTS,JOB_STORE#脚本运行:python manage.py crontab
class Command(BaseCommand):TIME_FORMAT = '%Y-%m-%d %H:%M:%S'#初始化调度器def _scheduler_obj(self):scheduler = BlockingScheduler()scheduler.configure(timezone = TIME_ZONE, #时区job_defaults=JOB_DEFAULTS, #job的默认配置jobstores=JOB_STORE, #job的存储后端)return scheduler#添加任务def _add_job(self,scheduler:BlockingScheduler):#每5分钟执行一次推送告警到大数据仓scheduler.add_job(send_to_big_data,trigger=IntervalTrigger(minutes=1),id='send_to_big_data',replace_existing=True,coalesce=True,)#每隔8个小时,清除历史的记录scheduler.add_job(delete_apscheduler_history,trigger=IntervalTrigger(hours=8),id='delete_apscheduler_history',replace_existing=True,coalesce=True,)#添加监听器def _listener(self,event:events):code = event.coderun_time = datetime.now().strftime(self.TIME_FORMAT)msg = LISTENER.get(code)db = MysqlDB()if msg:if code == 4096:#成功运行job_id = event.job_id#记录到数据库中db.insert_into_apscheduler_history([job_id,run_time,0])elif code == 8192:#运行异常了job_id = event.job_id#记录到数据库中db.insert_into_apscheduler_history([job_id,run_time,1,msg])else:info_log(msg)db.close()def start(self):scheduler = self._scheduler_obj()# 创建记录运行记录db = MysqlDB()db.create_apscheduler_history()db.close()# 设置监听器scheduler.add_listener(self._listener)# 设置定时任务self._add_job(scheduler)try:# print('{},定时器启动成功,等待定时任务执行...'.format(datetime.now().strftime(self.TIME_FORMAT)))scheduler.start()except KeyboardInterrupt:scheduler.shutdown()# python manage.py crontab运行 就是调用该方法def handle(self, *args, **options):self.start()#伴随django,在后台运行的。 在wsgi.py 文件中,调用BackRunScheduler().start()
class BackRunScheduler(Command):# 初始化调度器def _scheduler_obj(self):scheduler = BackgroundScheduler()scheduler.configure(timezone=TIME_ZONE,  # 时区job_defaults=JOB_DEFAULTS,  # job的默认配置jobstores=JOB_STORE,  # job的存储后端)return scheduler

3.3、启动方式:

方式一: python manage.py  crontab    (生产环境,推荐使用此方式,单独运行)

方式二:在settings.py中,调用BackRunScheduler().start()  , 后台运行

四、测试

4.1、启动

启动django项目:python manage.py runserver 8080

启动定时器:python manage.py crontab

4.2、等待一段时间

1、查询任务下次执行时间

请求:http://127.0.0.1:8080/api/scheduler/run_next/

2、查询任务最近运行情况

请求:http://127.0.0.1:8080/api/scheduler/run_next/

3、查询任务最近异常情况

请求:http://127.0.0.1:8080/api/scheduler/run_error/

五、源代码下载

码云地址:

django应用定时器: django下使用定时器的方法icon-default.png?t=N7T8https://gitee.com/liuhaizhang/django-application-timer/

目前有两套代码:一个以mysql存储任务,一个用sqlite存储任务

这篇关于APScheduler定时器使用:django中使用apscheduler,使用mysql做存储后端的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/953723

相关文章

如何使用Docker部署FTP和Nginx并通过HTTP访问FTP里的文件

《如何使用Docker部署FTP和Nginx并通过HTTP访问FTP里的文件》本文介绍了如何使用Docker部署FTP服务器和Nginx,并通过HTTP访问FTP中的文件,通过将FTP数据目录挂载到N... 目录docker部署FTP和Nginx并通过HTTP访问FTP里的文件1. 部署 FTP 服务器 (

MySQL 日期时间格式化函数 DATE_FORMAT() 的使用示例详解

《MySQL日期时间格式化函数DATE_FORMAT()的使用示例详解》`DATE_FORMAT()`是MySQL中用于格式化日期时间的函数,本文详细介绍了其语法、格式化字符串的含义以及常见日期... 目录一、DATE_FORMAT()语法二、格式化字符串详解三、常见日期时间格式组合四、业务场景五、总结一、

mysql线上查询之前要性能调优的技巧及示例

《mysql线上查询之前要性能调优的技巧及示例》文章介绍了查询优化的几种方法,包括使用索引、避免不必要的列和行、有效的JOIN策略、子查询和派生表的优化、查询提示和优化器提示等,这些方法可以帮助提高数... 目录避免不必要的列和行使用有效的JOIN策略使用子查询和派生表时要小心使用查询提示和优化器提示其他常

Python中配置文件的全面解析与使用

《Python中配置文件的全面解析与使用》在Python开发中,配置文件扮演着举足轻重的角色,它们允许开发者在不修改代码的情况下调整应用程序的行为,下面我们就来看看常见Python配置文件格式的使用吧... 目录一、INI配置文件二、YAML配置文件三、jsON配置文件四、TOML配置文件五、XML配置文件

Go使用pprof进行CPU,内存和阻塞情况分析

《Go使用pprof进行CPU,内存和阻塞情况分析》Go语言提供了强大的pprof工具,用于分析CPU、内存、Goroutine阻塞等性能问题,帮助开发者优化程序,提高运行效率,下面我们就来深入了解下... 目录1. pprof 介绍2. 快速上手:启用 pprof3. CPU Profiling:分析 C

grom设置全局日志实现执行并打印sql语句

《grom设置全局日志实现执行并打印sql语句》本文主要介绍了grom设置全局日志实现执行并打印sql语句,包括设置日志级别、实现自定义Logger接口以及如何使用GORM的默认logger,通过这些... 目录gorm中的自定义日志gorm中日志的其他操作日志级别Debug自定义 Loggergorm中的

MySQL InnoDB引擎ibdata文件损坏/删除后使用frm和ibd文件恢复数据

《MySQLInnoDB引擎ibdata文件损坏/删除后使用frm和ibd文件恢复数据》mysql的ibdata文件被误删、被恶意修改,没有从库和备份数据的情况下的数据恢复,不能保证数据库所有表数据... 参考:mysql Innodb表空间卸载、迁移、装载的使用方法注意!此方法只适用于innodb_fi

mysql通过frm和ibd文件恢复表_mysql5.7根据.frm和.ibd文件恢复表结构和数据

《mysql通过frm和ibd文件恢复表_mysql5.7根据.frm和.ibd文件恢复表结构和数据》文章主要介绍了如何从.frm和.ibd文件恢复MySQLInnoDB表结构和数据,需要的朋友可以参... 目录一、恢复表结构二、恢复表数据补充方法一、恢复表结构(从 .frm 文件)方法 1:使用 mysq

mysql8.0无备份通过idb文件恢复数据的方法、idb文件修复和tablespace id不一致处理

《mysql8.0无备份通过idb文件恢复数据的方法、idb文件修复和tablespaceid不一致处理》文章描述了公司服务器断电后数据库故障的过程,作者通过查看错误日志、重新初始化数据目录、恢复备... 周末突然接到一位一年多没联系的妹妹打来电话,“刘哥,快来救救我”,我脑海瞬间冒出妙瓦底,电信火苲马扁.

Python中conda虚拟环境创建及使用小结

《Python中conda虚拟环境创建及使用小结》本文主要介绍了Python中conda虚拟环境创建及使用小结,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们... 目录0.前言1.Miniconda安装2.conda本地基本操作3.创建conda虚拟环境4.激活c