本文主要是介绍python-apscheduler+ThreadPool-多线程定时任务,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
# -*- coding: utf-8 -*-
import re
import threading
import time
from multiprocessing.pool import ThreadPoolfrom apscheduler.schedulers.blocking import BlockingScheduler# 一个设备一个线程
# 一个设备多个定时任务def task_one(*args, **kwargs):for value in args:print(f"{threading.current_thread().ident}[{time.strftime('%Y-%m-%d %H:%M:%S')}]: task_one--------{value}")def task_two(*args, **kwargs):a = kwargs["a"]b = kwargs["b"]print(f"{threading.current_thread().ident}[{time.strftime('%Y-%m-%d %H:%M:%S')}]: task_two--------{a}+{b}={sum([a, b])}")def do_task(run_date: str, task: dict):date_pattern = re.compile(r"(.*)-(.*)-(.*) (.*):(.*):(.*)")year, month, day, hour, minute, second = date_pattern.search(run_date).groups()apscheduler = BlockingScheduler()for func_name, func_para_list in task.items():for func_para in func_para_list:args = func_para.get("args", ())func = globals()[f"task_{func_name}"]kwargs = func_para.get("kwargs", {})apscheduler.add_job(func=func, trigger="cron", args=args, kwargs=kwargs, year=year, month=month, day=day, hour=hour, minute=minute, second=second)apscheduler.start()if __name__ == '__main__':devices_map = {"one": [{"run_date": "*-*-* *:*:*","task": {"one": [{"args": ["A", "B"]}],"two": [{"kwargs": {"a": 1, "b": 2}},{"kwargs": {"a": 1, "b": 3}}]}}]}pool = ThreadPool(len(devices_map))for item_list in devices_map.values():for item in item_list:run_date = item["run_date"]task = item["task"]pool.apply_async(do_task, kwds={"run_date": run_date, "task": task})pool.close()pool.join()# task = {"one": [{"args": ["A", "B"]}],# "two": [{"kwargs": {"a": 1, "b": 2}},# {"kwargs": {"a": 1, "b": 3}}# ]# }# do_task(run_date="*-*-* *:*:*/10", task=task)
这篇关于python-apscheduler+ThreadPool-多线程定时任务的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!