我的物联网项目(八)简单分布式调度

2024-08-21 11:48

本文主要是介绍我的物联网项目(八)简单分布式调度,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

定时调度基本在任何平台或多或少的要用到,实现定时调度的功能很简单,我做过的项目中用到更多的是spring quartz或者spring task,它们在单机上使用定时任务配置是非常简单的,但是在集群环境中就需要面临一个必须解决的问题:如何限定只有一台机器在执行定时任务?

其实spring quartz也可以实现此功能,它是由数据库的数据来确定调度任务是否正在执行, 正在执行则其他服务器就不能去执行该行调度数据,所以需要数据库的11张表来执行此种功能,总的来说成本较高,操作起来也比较复杂。另外一些开源的分布式调度平台也有一些,如当当网的elastic-job,淘宝的TBSchedule,包括阿里云也有SchedulerX,这些分布式调度平台在一定程度上也可以满足集群环境下的功能需求。我当初在技术选型上,只想简单无门槛,不想有太多的学习成本在里面,尤其针对目前阶段的项目,想用一种简单的方式来实现目的就行,所以尽量基于目前的代码和技术。

一 实现思路

主要利用Redis的(Redis用的云集群,暂时不需要考虑单点故障或者不稳定的情况)函数setNX()来实现分布式锁,大概流程是首先是某个集群环境的单边服务器将某一任务标识名(简单来说就是key)作为键存到redis里,并为其设个过期时间,如果这个时候另外的单边服务器也请求过来,先是通过setNX()看看是否能将任务标识名(同一个标识名)插入到redis里,可以的话就返回true,不可以就返回false,如果返回false,说明这次的任务调度别的服务器已经在做了,不需要执行这次任务。如果返回true,说明这次任何调度是由自己来执行。

这个里面由于集群环境下的每台服务器到了时间点都会去执行一遍,当然肯定只有一台才能执行成功,这个里面需要注意两个事情:

  1. 定时调度的策略应该上一个任务完成到下一个任务开始的时间间隔,这样的话才能保证集群环境下其它的服务器下次抢占锁的机率,如spring task的fixedDelay。
  2. 调度时间循环间隔设置当然以具体业务场景为准,但最好算好大概的每次业务执行的时间长短,然后根据这个时间长短来设置定时调度的循环间隔时间。比如说如果小于1s的调用,由于使用redis会有10几毫秒的运算耗费,因此不能保证在1s以下的时间间隔比较均匀。所以尽量保证每台服务器的均匀分布来执行计划任务。
二 代码实现

1.锁对象

public class Lock {private String name;private String value;public Lock(String name, String value) {this.name = name;this.value = value;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getValue() {return value;}public void setValue(String value) {this.value = value;}}

2.分布式锁工具类

public class DistributedLockService {private final Logger log = LoggerFactory.getLogger(getClass());private final static long LOCK_EXPIRE = 10;//单个业务持有锁的时间10s,防止死锁private static final SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");@Autowiredprivate RedisTemplate<String, String> redisTemplate;/*** 尝试获取全局锁*/public boolean tryLock(Lock lock) {return getLock(lock,LOCK_EXPIRE);}/*** 操作redis获取全局锁*/public boolean getLock(Lock lock,long lockExpireTime){if (StringUtils.isEmpty(lock.getName()) || StringUtils.isEmpty(lock.getValue())) {return false;}// SETNX成功,则成功获取一个锁if (setNX(lock.getName(), lock.getValue(),lockExpireTime)) {return true;}else {// SETNX失败,说明锁仍然被其他对象保持log.info(lock.getName()+" lock is exist!" + dateFormat.format(new Date()) + "###");return false;}}/*** @Title: setNX * @Description: 设置锁*/private boolean setNX(final String key, final String value, final long expire) {return (Boolean) redisTemplate.execute(new RedisCallback<Boolean>() {@SuppressWarnings("unchecked")public Boolean doInRedis(RedisConnection connection) {byte[] keyBytes = ((RedisSerializer<String>) redisTemplate.getKeySerializer()).serialize(key);boolean locked = connection.setNX(keyBytes, ((RedisSerializer<String>)redisTemplate.getValueSerializer()).serialize(value));if(locked){connection.expire(keyBytes, expire);}return locked;}});}/*** @Title: get * @Description: 根据key获取value*/public Object get(final String key) {return redisTemplate.execute(new RedisCallback<Object>() {@SuppressWarnings("unchecked")public Object doInRedis(RedisConnection connection) throws DataAccessException {byte[] bs = connection.get(((RedisSerializer<String>)redisTemplate.getKeySerializer()).serialize(key));return redisTemplate.getDefaultSerializer().deserialize(bs);}});}/*** 释放锁*/public void releaseLock(Lock lock) {if (!StringUtils.isEmpty(lock.getName())) {redisTemplate.delete(lock.getName());}log.info(lock.getName()+" lock is unchecked!" + dateFormat.format(new Date()) + "###");}}

 

3.定时调度实现

public class ScheduledTasks {@Autowiredprivate DistributedLockService distributedLockService;private final static Logger log= Logger.getLogger(ScheduledTasks.class);private static final SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss");//5秒执行一次@Scheduled(fixedDelay = 5000)public void doJob() {log.info("###sync start:"+ dateFormat.format(new Date()) + "###");Lock lock = new Lock("xxlock" , "xxx");if(distributedLockService.tryLock(lock)){log.info("Gets the lock!" + dateFormat.format(new Date()) + "###");       //做具体业务......distributedLockService.releaseLock(lock);}log.info("###sync end:"+ dateFormat.format(new Date()) + "###");}}

 

三 继续优化

上面将做具体业务的代码耦合到了定时调度ScheduledTasks里面,这块需要优化下,后面我们将具体的业务代码单独抽离出来做成一个rest服务,ScheduledTasks里面通过接口请求去执行业务逻辑即可。

定时调度这块后续我们还在继续优化,主要有如下:

1.将调度时间间隔,调度http请求接口,调度的动态开启和关闭,查看目前的调度任务和执行的日志做成后台可视化界面,方便统一管理和运维。

2.集群环境下的服务器做到分片,负载均衡。其实现在的并没有严格做到负载均衡,其实集群环境下每台服务器都在执行,只是没有执行具体业务而已,所以后续这块自己将用代码实现。

这篇关于我的物联网项目(八)简单分布式调度的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1093078

相关文章

基于Qt开发一个简单的OFD阅读器

《基于Qt开发一个简单的OFD阅读器》这篇文章主要为大家详细介绍了如何使用Qt框架开发一个功能强大且性能优异的OFD阅读器,文中的示例代码讲解详细,有需要的小伙伴可以参考一下... 目录摘要引言一、OFD文件格式解析二、文档结构解析三、页面渲染四、用户交互五、性能优化六、示例代码七、未来发展方向八、结论摘要

javafx 如何将项目打包为 Windows 的可执行文件exe

《javafx如何将项目打包为Windows的可执行文件exe》文章介绍了三种将JavaFX项目打包为.exe文件的方法:方法1使用jpackage(适用于JDK14及以上版本),方法2使用La... 目录方法 1:使用 jpackage(适用于 JDK 14 及更高版本)方法 2:使用 Launch4j(

Docker集成CI/CD的项目实践

《Docker集成CI/CD的项目实践》本文主要介绍了Docker集成CI/CD的项目实践,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学... 目录一、引言1.1 什么是 CI/CD?1.2 docker 在 CI/CD 中的作用二、Docke

SpringBoot项目引入token设置方式

《SpringBoot项目引入token设置方式》本文详细介绍了JWT(JSONWebToken)的基本概念、结构、应用场景以及工作原理,通过动手实践,展示了如何在SpringBoot项目中实现JWT... 目录一. 先了解熟悉JWT(jsON Web Token)1. JSON Web Token是什么鬼

手把手教你idea中创建一个javaweb(webapp)项目详细图文教程

《手把手教你idea中创建一个javaweb(webapp)项目详细图文教程》:本文主要介绍如何使用IntelliJIDEA创建一个Maven项目,并配置Tomcat服务器进行运行,过程包括创建... 1.启动idea2.创建项目模板点击项目-新建项目-选择maven,显示如下页面输入项目名称,选择

Jenkins中自动化部署Spring Boot项目的全过程

《Jenkins中自动化部署SpringBoot项目的全过程》:本文主要介绍如何使用Jenkins从Git仓库拉取SpringBoot项目并进行自动化部署,通过配置Jenkins任务,实现项目的... 目录准备工作启动 Jenkins配置 Jenkins创建及配置任务源码管理构建触发器构建构建后操作构建任务

MyBatis框架实现一个简单的数据查询操作

《MyBatis框架实现一个简单的数据查询操作》本文介绍了MyBatis框架下进行数据查询操作的详细步骤,括创建实体类、编写SQL标签、配置Mapper、开启驼峰命名映射以及执行SQL语句等,感兴趣的... 基于在前面几章我们已经学习了对MyBATis进行环境配置,并利用SqlSessionFactory核

Nginx、Tomcat等项目部署问题以及解决流程

《Nginx、Tomcat等项目部署问题以及解决流程》本文总结了项目部署中常见的four类问题及其解决方法:Nginx未按预期显示结果、端口未开启、日志分析的重要性以及开发环境与生产环境运行结果不一致... 目录前言1. Nginx部署后未按预期显示结果1.1 查看Nginx的启动情况1.2 解决启动失败的

这15个Vue指令,让你的项目开发爽到爆

1. V-Hotkey 仓库地址: github.com/Dafrok/v-ho… Demo: 戳这里 https://dafrok.github.io/v-hotkey 安装: npm install --save v-hotkey 这个指令可以给组件绑定一个或多个快捷键。你想要通过按下 Escape 键后隐藏某个组件,按住 Control 和回车键再显示它吗?小菜一碟: <template

如何用Docker运行Django项目

本章教程,介绍如何用Docker创建一个Django,并运行能够访问。 一、拉取镜像 这里我们使用python3.11版本的docker镜像 docker pull python:3.11 二、运行容器 这里我们将容器内部的8080端口,映射到宿主机的80端口上。 docker run -itd --name python311 -p