Python 全栈系列246 任务调度对象WFlaskAPS

2024-05-14 09:12

本文主要是介绍Python 全栈系列246 任务调度对象WFlaskAPS,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

说明

之前已经完全跑通了任务调度,实现了S2S的流转Python 全栈系列243 S2S flask_celery。由于request请求用起来比较别扭,所以创建一个对象来进行便捷操作。

内容

1 功能

WFlaskAPS包含管理定时任务的必要功能

from datetime import datetime
from typing import List, Optional
from pydantic import BaseModel
import requests as req class WFlaskAPS(BaseModel):flask_aps_agent:str = 'IP:Port'# 获取当前的任务列表def get_jobs(self):url = 'http://%s/get_jobs' % self.flask_aps_agentreturn req.get(url).json()# 删除某个任务def remove_a_task(self, task_name = None):url = 'http://%s/remove_a_task/' % self.flask_aps_agentdata_dict ={}data_dict['task_id'] = task_namereturn req.post(url, json=data_dict).json()# 获取任务状态def get_jobs_status(self):url = 'http://%s/get_jobs_status/' % self.flask_aps_agentreturn req.get(url).json()# 发布一个任务(task)# task_name 其实是job type# task_id 任务的唯一编号# task_type 原来的服务只实现了cron方式,因为cron方式可以实现其他两种方式。'''date:在指定的日期和时间运行一次interval:在指定时间间隔内运行cron:使用Cron表达式运行'''def publish_a_task(self,task_id= None ,task_name = None, task_type ='cron', task_kwargs = {},year = None, month = None, day = None, week = None, day_of_week = None, hour = None,minute = None, second = None, start_date = None, end_date = None,):url = 'http://%s/publish_a_task/' % self.flask_aps_agentdata_dict = {'task_id':task_id,'task_name':task_name,'task_type':task_type,'task_kwargs':task_kwargs,'year':year,'month':month,'day':day,'week':week,'day_of_week':day_of_week,'hour':hour,'minute':minute,'second':second,'start_date':start_date,'end_date':end_date}return req.post(url, json=data_dict).json()def pause_a_task(self, task_id = None):data_dict = {}url =  'http://%s/pause_a_task/' % self.flask_aps_agentdata_dict['task_id'] = task_idreturn req.post(url, json=data_dict).json()def resume_a_task(self, task_id = None):data_dict = {}data_dict['task_id'] = task_idurl =  'http://%s/resume_a_task/' % self.flask_aps_agentreturn req.post(url, json=data_dict).json()# task_namedef add_a_job(self, fpath = None, func_name = None):url = 'http://%s/add_task_type/' % self.flask_aps_agentdata = {}data['func_name'] = func_namewith open(fpath, 'r') as f: data['func_body'] = f.read()return req.post(url, json=data).json()

从逻辑上,首先需要创建任务类型(add_a_job),之后就可以依据这个job发布n个task,发布任务时的参数是最复杂的。原始的flask apscheduler其实提供了三种触发类型:

  • 1 date 一次性触发
  • 2 interval 周期触发
  • 3 cron 触发

由于cron触发可以涵盖前两种的变化,所以在之前的服务中,只创建了cron格式的定时任务。与linux系统里的cron不同,系统里最小的周期是分钟,而这里是秒。

默认情况下,周期任务是每秒执行的。如果需要改为n秒执行,可以用 '*/5’的方式,指定重复执行的周期。通过start_date和end_date可以钳制任务的周期跨度(所以当然也包含了一次性的任务)。

使用方法比直接请求接口简洁多了

from Basefuncs import * 
# flask_aps_agent = 'IP:PORT'
wf = WFlaskAPS()# 1 获取任务列表
wf.get_jobs()
# 2 删除一个任务
wf.remove_a_task('my_test1')
# 3 获取任务状态
wf.get_jobs_status()
# 4 发布一个任务
cur_dt_str = get_time_str1()
wf.publish_a_task(task_id ='my_test1', task_name= 'hello',start_date=cur_dt_str,second ='*/5')
# 5 暂停一个任务
wf.pause_a_task(task_id = 'my_test1')
# 6 恢复任务
wf.resume_a_task(task_id = 'my_test1')# 7 增加一个job(task_name)
# wf.add_a_job()

2 Next

2.1 flask_aps_job_table

创建一张表,里面存储了job的元信息,表可以存在mymeta.flask_aps下面。

字段解释
job_namejob 名称
description描述
para_dict参数样例

2.2 flask_aps_task_table

字段解释
machine机器名, m1,m2
task_id任务id
job_name任务类型
set_to_status被设定的状态
running_status当前状态
start_dt开始时间
end_dt结束时间
interval_params周期时间

机器名是重要的,因为同样的任务可能在不同的机器上执行。

2.3 assure_tasks.py

这个脚本伴随服务的启动会执行一次:

  • 1 根据本机名称,去flask_aps_task_table寻找哪些被设定为运行的任务
  • 2 获取当前的任务列表,根据任务名,运行状态的差集执行:
    • 1 发布任务。这种情况一般是服务重启之后,运行任务丢失了。

其他类型的操作,可以后续通过前端来交互(这里又会用一下MongoEngine).

2.4 S2S Work Mode

创建一个S2S,然后让Worker根据这个来执行一些任务。

这篇关于Python 全栈系列246 任务调度对象WFlaskAPS的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/988359

相关文章

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

python: 多模块(.py)中全局变量的导入

文章目录 global关键字可变类型和不可变类型数据的内存地址单模块(单个py文件)的全局变量示例总结 多模块(多个py文件)的全局变量from x import x导入全局变量示例 import x导入全局变量示例 总结 global关键字 global 的作用范围是模块(.py)级别: 当你在一个模块(文件)中使用 global 声明变量时,这个变量只在该模块的全局命名空

【Python编程】Linux创建虚拟环境并配置与notebook相连接

1.创建 使用 venv 创建虚拟环境。例如,在当前目录下创建一个名为 myenv 的虚拟环境: python3 -m venv myenv 2.激活 激活虚拟环境使其成为当前终端会话的活动环境。运行: source myenv/bin/activate 3.与notebook连接 在虚拟环境中,使用 pip 安装 Jupyter 和 ipykernel: pip instal

科研绘图系列:R语言扩展物种堆积图(Extended Stacked Barplot)

介绍 R语言的扩展物种堆积图是一种数据可视化工具,它不仅展示了物种的堆积结果,还整合了不同样本分组之间的差异性分析结果。这种图形表示方法能够直观地比较不同物种在各个分组中的显著性差异,为研究者提供了一种有效的数据解读方式。 加载R包 knitr::opts_chunk$set(warning = F, message = F)library(tidyverse)library(phyl

【机器学习】高斯过程的基本概念和应用领域以及在python中的实例

引言 高斯过程(Gaussian Process,简称GP)是一种概率模型,用于描述一组随机变量的联合概率分布,其中任何一个有限维度的子集都具有高斯分布 文章目录 引言一、高斯过程1.1 基本定义1.1.1 随机过程1.1.2 高斯分布 1.2 高斯过程的特性1.2.1 联合高斯性1.2.2 均值函数1.2.3 协方差函数(或核函数) 1.3 核函数1.4 高斯过程回归(Gauss

【生成模型系列(初级)】嵌入(Embedding)方程——自然语言处理的数学灵魂【通俗理解】

【通俗理解】嵌入(Embedding)方程——自然语言处理的数学灵魂 关键词提炼 #嵌入方程 #自然语言处理 #词向量 #机器学习 #神经网络 #向量空间模型 #Siri #Google翻译 #AlexNet 第一节:嵌入方程的类比与核心概念【尽可能通俗】 嵌入方程可以被看作是自然语言处理中的“翻译机”,它将文本中的单词或短语转换成计算机能够理解的数学形式,即向量。 正如翻译机将一种语言

【学习笔记】 陈强-机器学习-Python-Ch15 人工神经网络(1)sklearn

系列文章目录 监督学习:参数方法 【学习笔记】 陈强-机器学习-Python-Ch4 线性回归 【学习笔记】 陈强-机器学习-Python-Ch5 逻辑回归 【课后题练习】 陈强-机器学习-Python-Ch5 逻辑回归(SAheart.csv) 【学习笔记】 陈强-机器学习-Python-Ch6 多项逻辑回归 【学习笔记 及 课后题练习】 陈强-机器学习-Python-Ch7 判别分析 【学

nudepy,一个有趣的 Python 库!

更多资料获取 📚 个人网站:ipengtao.com 大家好,今天为大家分享一个有趣的 Python 库 - nudepy。 Github地址:https://github.com/hhatto/nude.py 在图像处理和计算机视觉应用中,检测图像中的不适当内容(例如裸露图像)是一个重要的任务。nudepy 是一个基于 Python 的库,专门用于检测图像中的不适当内容。该

pip-tools:打造可重复、可控的 Python 开发环境,解决依赖关系,让代码更稳定

在 Python 开发中,管理依赖关系是一项繁琐且容易出错的任务。手动更新依赖版本、处理冲突、确保一致性等等,都可能让开发者感到头疼。而 pip-tools 为开发者提供了一套稳定可靠的解决方案。 什么是 pip-tools? pip-tools 是一组命令行工具,旨在简化 Python 依赖关系的管理,确保项目环境的稳定性和可重复性。它主要包含两个核心工具:pip-compile 和 pip

flume系列之:查看flume系统日志、查看统计flume日志类型、查看flume日志

遍历指定目录下多个文件查找指定内容 服务器系统日志会记录flume相关日志 cat /var/log/messages |grep -i oom 查找系统日志中关于flume的指定日志 import osdef search_string_in_files(directory, search_string):count = 0