celery-redbeat方案(动态定时任务、异步任务)

2024-06-11 17:12

本文主要是介绍celery-redbeat方案(动态定时任务、异步任务),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

    • 为什么选择 RedBeat?
    • 方案
    • 坑事项记录

记一次工作上的问题

问题:项目上当前定时任务框架和服务端耦合,容易出现加载定时任务时间很长,影响后端服务启动,容易改动引发定时任务的问题。且能方便的动态的增加或删除定时任务而无需重启服务。
需求:将定时任务框架解耦,可以独立部署启动,不影响原来服务的功能,不会出现重复执行的定时任务和丢失定时任务
RedBeat是一个 Celery Beat 调度程序 ,它将计划任务和运行时元数据存储在Redis中。

为什么选择 RedBeat?

官方文档:https://redbeat.readthedocs.io/en/latest/intro.html
git仓库:https://github.com/sibson/redbeat

  1. 动态实时任务创建和修改,无需长时间停机
  2. 使用 Redis 绑定从任何语言外部管理任务
  3. 共享数据存储;Beat 不依赖于单个驱动器或机器
  4. 即使任务数量很多也能快速启动
  5. 防止意外运行多个 Beat 服务器
    用Celery-redbeat重构当前定时任务逻辑: Celery-redbeat是基于redis作为celery的中间件(broker)时的一个任务调度器(scheduler),有效提高调度器(scheduler)健壮性。

它最大的变化就是,它将调度器从原来Celery进程中的定时任务调度器(默认作为守护进程),改存在了Redis上。
因此,当任务创建或者发生变化的时候,调度器不再需要暂停和重载。利用Redis数据库的特性,我们只需要更新Redis中的键名,调度器就会随之发生改变。

方案

  1. 另起一个celery服务,不再跟随主服务启动
  2. 有一张数据库表用来针对动态定时任务做持久化存储以及做增删改查状态管理等。celery beat启动时会自动读取这张表的数据,将定时任务加载到redis中(每次会先清空再加载)
  3. 初始化完成后的redis数据, celery beat会根据redis里的scedule数据,如果满足执行条件,会将其推送到broker中,等待worker来消费
  4. 定时任务执行原理和之前可以保持一致,但不需要再往celery队列里插任务了,相当于直接执行celery队列中已有的任务,之前的锁逻辑还可以直接沿用
  5. 当用户对定时任务有任何更删改操作时,可以通过封装好的方法,先更新数据库中的值,然后直接更新redis中的值,从而同步更新队列中的定时任务,无需重启服务(此处需要保障数据库和redis的数据一致性,最后能有兜底or事务机制,要么都成功要么都失败)

坑事项记录

背景:发现线上服务会偶尔出现定时任务全部失效的情况,或者定时任务多次执行的情况
环境:k8s多pod部署服务来增加服务健壮性
分析线上日志,就是在某一个时间点突然报错:
LockNotOwnedError异常,表示无法释放或延长一个已经不再拥有的锁
所以一开始我们怀疑是redbeat存在bug,分析源码:
1、redbeat在启动时会触发 @beat_init.connect 装饰的 acquire_distributed_beat_lock 函数。使用 redis_client.lock() 方法创建一个分布式锁对象,通过 lock.acquire() 方法获取锁,此处会进行阻塞,直到成功获取到锁为止。最后,将获取到的锁对象赋值给调度器的 lock 属性。代码中生成的 time_task_:lock 锁用于在 Beat 启动时获取分布式锁,以实现在多个 Beat 进程或服务器之间协调任务调度的目的。这个锁对于 Beat 工作的用途是确保在分布式环境下只有一个 Beat 进程或服务器在执行任务调度。当多个 Beat 进程或服务器同时运行时,通过获取这个锁,只有获取到锁的进程或服务器可以执行任务调度,其他进程或服务器会被阻塞等待锁的释放。同时如果这个锁存在会给锁续期。(为了避免主进程 Readbeat 在执行任务时出现故障或长时间没有释放锁(例如进程崩溃、网络中断等情况),所以可能会有多个beat进程,虽然正常情况下永远只会有一个在工作。我们平台目前就只有一个beat服务)
2、然后Beat 服务会进入一个无限循环的进程,它会以一定的时间间隔进行轮询,检查是否需要执行任务。并且对time_task_:lock 锁进行续期。只有在进程终止时会释放

然后我们怀疑是,在某次续期时,这个锁会莫名其妙突然的消失,所以导致了报错问题
根据官方文档分析。time_task_:lock 锁仅仅是使用分布式锁来防止多个实例同时运行。我们当前仅设置一个实例服务,可以直接设置redbeat_lock_key = None关闭该功能。

本以为解决了问题,结果过了几天问题又再次出现,最后在偶然的灵机一动下发现
我们使用的k8s多pod服务,每个pod服务中的项目代码都是同一套一样的(不管是server还是celery还是beat),发现如果某个pod重启了,会将前面方案中的第二点中的预加载init函数(为了重置所有定时任务,防止异常数据残留)执行一次,就是会清空redis关于redbeat的数据然后重新加载数据,这就导致了所有数据和锁消失了~~

这篇关于celery-redbeat方案(动态定时任务、异步任务)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1051786

相关文章

一文详解SpringBoot中控制器的动态注册与卸载

《一文详解SpringBoot中控制器的动态注册与卸载》在项目开发中,通过动态注册和卸载控制器功能,可以根据业务场景和项目需要实现功能的动态增加、删除,提高系统的灵活性和可扩展性,下面我们就来看看Sp... 目录项目结构1. 创建 Spring Boot 启动类2. 创建一个测试控制器3. 创建动态控制器注

Knife4j+Axios+Redis前后端分离架构下的 API 管理与会话方案(最新推荐)

《Knife4j+Axios+Redis前后端分离架构下的API管理与会话方案(最新推荐)》本文主要介绍了Swagger与Knife4j的配置要点、前后端对接方法以及分布式Session实现原理,... 目录一、Swagger 与 Knife4j 的深度理解及配置要点Knife4j 配置关键要点1.Spri

MySQL 定时新增分区的实现示例

《MySQL定时新增分区的实现示例》本文主要介绍了通过存储过程和定时任务实现MySQL分区的自动创建,解决大数据量下手动维护的繁琐问题,具有一定的参考价值,感兴趣的可以了解一下... mysql创建好分区之后,有时候会需要自动创建分区。比如,一些表数据量非常大,有些数据是热点数据,按照日期分区MululbU

SQLite3 在嵌入式C环境中存储音频/视频文件的最优方案

《SQLite3在嵌入式C环境中存储音频/视频文件的最优方案》本文探讨了SQLite3在嵌入式C环境中存储音视频文件的优化方案,推荐采用文件路径存储结合元数据管理,兼顾效率与资源限制,小文件可使用B... 目录SQLite3 在嵌入式C环境中存储音频/视频文件的专业方案一、存储策略选择1. 直接存储 vs

Golang如何对cron进行二次封装实现指定时间执行定时任务

《Golang如何对cron进行二次封装实现指定时间执行定时任务》:本文主要介绍Golang如何对cron进行二次封装实现指定时间执行定时任务问题,具有很好的参考价值,希望对大家有所帮助,如有错误... 目录背景cron库下载代码示例【1】结构体定义【2】定时任务开启【3】使用示例【4】控制台输出总结背景

在Golang中实现定时任务的几种高效方法

《在Golang中实现定时任务的几种高效方法》本文将详细介绍在Golang中实现定时任务的几种高效方法,包括time包中的Ticker和Timer、第三方库cron的使用,以及基于channel和go... 目录背景介绍目的和范围预期读者文档结构概述术语表核心概念与联系故事引入核心概念解释核心概念之间的关系

springboot如何通过http动态操作xxl-job任务

《springboot如何通过http动态操作xxl-job任务》:本文主要介绍springboot如何通过http动态操作xxl-job任务的问题,具有很好的参考价值,希望对大家有所帮助,如有错... 目录springboot通过http动态操作xxl-job任务一、maven依赖二、配置文件三、xxl-

一文详解MySQL如何设置自动备份任务

《一文详解MySQL如何设置自动备份任务》设置自动备份任务可以确保你的数据库定期备份,防止数据丢失,下面我们就来详细介绍一下如何使用Bash脚本和Cron任务在Linux系统上设置MySQL数据库的自... 目录1. 编写备份脚本1.1 创建并编辑备份脚本1.2 给予脚本执行权限2. 设置 Cron 任务2

SpringBoot服务获取Pod当前IP的两种方案

《SpringBoot服务获取Pod当前IP的两种方案》在Kubernetes集群中,SpringBoot服务获取Pod当前IP的方案主要有两种,通过环境变量注入或通过Java代码动态获取网络接口IP... 目录方案一:通过 Kubernetes Downward API 注入环境变量原理步骤方案二:通过

Springboot3+将ID转为JSON字符串的详细配置方案

《Springboot3+将ID转为JSON字符串的详细配置方案》:本文主要介绍纯后端实现Long/BigIntegerID转为JSON字符串的详细配置方案,s基于SpringBoot3+和Spr... 目录1. 添加依赖2. 全局 Jackson 配置3. 精准控制(可选)4. OpenAPI (Spri