对Redis锁延期的一些讨论与思考

2024-02-27 10:44
文章标签 思考 redis 讨论 延期

本文主要是介绍对Redis锁延期的一些讨论与思考,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

上一篇文章提到使用针对不同的业务场景如何合理使用Redis分布式锁,并引入了一个新的问题

若定义锁的过期时间是10s,此时A线程获取了锁然后执行业务代码,但是业务代码消耗时间花费了15s。这就会导致A线程还没有执行完业务代码,A线程却释放了锁(因为10s到了),第11s B线程发现锁已经释放,重新获取锁也开始执行业务代码。
此时多个线程同时执行业务代码,我们使用锁就是为了保证仅有一个线程执行这一块业务代码,说明这个锁是失效的!

本文将尝试探讨如何处理这个问题!

在这里插入图片描述

下面这个图解释了重置超时时间是什么意思,写一个定时任务,并单独使用一个线程每3s去检查一下是否到终点(任务是否执行完毕),第3s时发现没到终点,重置时间。 假设任务执行完毕需要花费11s。那么锁一共会延期3次,第11s之后,锁被手动释放,如果没释放。等到第19s时,会被自动释放。
在这里插入图片描述
如何实现锁的延期

伪代码:定义锁的结构
key:uuid
value:订单服务if key(锁的唯一标识)是否存在存在,if 锁是否被修改未修改,重置超时时间

这部分有一点需要解释:

  1. 为什么判断锁是否被修改?
    A线程获取了锁之后,B线程修改锁的value为 “文件下载服务”,不加一层校验,A线程就会对修改后的锁操作,而不是原始的锁。

此时你会直接写一个定时任务去实现,会有什么问题吗?
锁延期分为2步(第一步:判断锁;第二步:重置锁),这2步之间是存在间隙的,完全可以在判断锁后,重置锁前发生一些事情(例如恰巧在重置时间前锁被其他线程修改了)。如何才能避免这个间隙不发生意外?

使用lua脚本:使用lua语法实现锁的延期,然后执行这个脚本。lua语法将这两个步骤绑定成一个操作。这也就是为什么提到锁延期的实现,基本都是采用lua实现的根本原因。redis分布式锁自身是有局限性的,不能满足我们的需求,所以我们提出了锁延期。

巧在Redis很支持lua语法,我们只需要按照lua语法要求写好命令,调用Redis提供的方法入口传进去,Redis会自动解析这些命令。更巧在lua语法实现锁延期解决了上面的隐患。。。

        /*** 锁续期*/if (redis.call('exists', KEYS[1]) == 1) then // 锁还存在if (redis.call('get', KEYS[1]) == ARGV[1]) then redis.call('pexpire', KEYS[1], ARGV[2]) ");// 重置超时时间return 1endendreturn 0

接下来完整的看一下如何使用Redis锁延期


/*** redis分布式锁* 为了文件拉取加的,可能存在拉取任务耗时很久的情况,增加锁延时操作* @author lixinyu*/
public class RedisDistributeLock {private static final Logger log = LoggerFactory.getLogger(RedisDistributeLock.class);// 默认30秒后自动释放锁private static long defaultExpireTime = 10 * 60 * 1000; // 默认10分钟// 用于锁延时任务的执行private static ScheduledThreadPoolExecutor renewExpirationExecutor;// 加锁和解锁的lua脚本 重入和不可重入两种private static String lockScript;private static String unlockScript;private static String renewScript;// 锁延时脚本private static String lockScript_reentrant;private static String unlockScript_reentrant;private static String renewScript_reentrant;// 锁延时脚本static {/*** 如果指定的锁键(KEYS[1])不存在,则通过set命令设置锁的值(ARGV[1])和超时时间(ARGV[2])。* 如果锁键已存在,则通过pttl命令返回锁的剩余超时时间。*/StringBuilder sb = new StringBuilder();sb.setLength(0);sb.append(" if (redis.call('exists', KEYS[1]) == 0) then ");// 如果不存在这个lockKey锁sb.append("     redis.call('set', KEYS[1], ARGV[1]) ");// 设置锁 ,key-value结构sb.append("     redis.call('pexpire', KEYS[1], ARGV[2]) ");// 设置锁超时时间sb.append("     return nil ");sb.append(" end ");sb.append(" return redis.call('pttl', KEYS[1]) ");// 如果别的线程已经加锁,返回剩余时间lockScript = sb.toString();/*** 如果锁存在,则删除锁*/sb.setLength(0);sb.append(" if (redis.call('get', KEYS[1]) == ARGV[1]) then ");sb.append("      return redis.call('del', KEYS[1]) ");sb.append(" else return 0 ");sb.append(" end");unlockScript = sb.toString();/*** 可重入锁主要解决的是同一个线程能够多次获取锁的问题,而不是防止多个线程同时获取锁* 这通常发生在方法递归调用、嵌套调用或者同一个方法内部多次执行加锁操作的情况下*/sb.setLength(0);sb.append(" if (redis.call('exists', KEYS[1]) == 0) then ");// 如果不存在这个lockKey锁sb.append("     redis.call('hset', KEYS[1], ARGV[1], 1) ");// 设置锁 ,hash结构,hashkey为当前线程id,加锁数为1sb.append("     redis.call('pexpire', KEYS[1], ARGV[2]) ");// 设置锁超时时间sb.append("     return nil ");sb.append(" end ");sb.append(" if (redis.call('hexists', KEYS[1], ARGV[1]) == 1) then ");// 如果当前线程已经加锁sb.append("     redis.call('hincrby', KEYS[1], ARGV[1], 1) ");// 可重入,增加锁计数sb.append("     redis.call('pexpire', KEYS[1], ARGV[2]) ");// 重设置锁超时时间sb.append("     return nil ");sb.append(" end ");sb.append(" return redis.call('pttl', KEYS[1]) ");// 如果别的线程已经加锁,返回剩余时间lockScript_reentrant = sb.toString();/*** 释放锁,通过判断锁的存在、当前线程是否是加锁的线程、以及锁的计数器等条件来实现解锁的操作*/sb.setLength(0);sb.append(" if (redis.call('exists', KEYS[1]) == 0) then ");// 不存在锁,返回1表示解锁成功sb.append("     return 1 ");sb.append(" end ");sb.append(" if (redis.call('hexists', KEYS[1], ARGV[1]) == 0) then ");// 存在锁,不是本人加的,返回0失败sb.append("     return 0 ");sb.append(" end ");sb.append(" local counter = redis.call('hincrby', KEYS[1], ARGV[1], -1) ");// 存在自己加的锁,锁计数减一sb.append(" if (counter > 0) then ");// 判断是否要删除锁,或重置超时时间sb.append("     redis.call('pexpire', KEYS[1], ARGV[2]) ");sb.append("     return 0 ");sb.append(" else ");sb.append("     redis.call('del', KEYS[1]) ");sb.append("     return 1 ");sb.append(" end ");sb.append(" return nil ");unlockScript_reentrant = sb.toString();/*** 锁续期*/sb.setLength(0);sb.append(" if (redis.call('exists', KEYS[1]) == 1) then ");// 锁还存在sb.append("     if (redis.call('get', KEYS[1]) == ARGV[1]) then ");sb.append("        redis.call('pexpire', KEYS[1], ARGV[2]) ");// 重置超时时间sb.append("        return 1");sb.append("     end ");sb.append(" end ");sb.append(" return 0 ");renewScript = sb.toString();/*** 可重入锁续期*/sb.setLength(0);sb.append(" if (redis.call('hexists', KEYS[1], ARGV[1]) == 1) then ");// 锁还存在sb.append("     redis.call('pexpire', KEYS[1], ARGV[2]) ");// 重置超时时间sb.append("     return 1 ");sb.append(" end ");sb.append(" return 0 ");renewScript_reentrant = sb.toString();renewExpirationExecutor = new ScheduledThreadPoolExecutor(2);}private String uuid;// 当前锁对象标识private boolean reentrant;// 当前锁是可重入还是不可重入private RedisUtils redisUtils;public RedisDistributeLock(boolean reentrant) {this.uuid = UUIDUtils.randomUUID8();this.reentrant = reentrant;this.redisUtils = SpringApplicationUtils.getBean(RedisUtils.class);}/*** 尝试对lockKey加锁* @author: lixinyu 2023/4/25**/public boolean tryLock(String lockKey) {String script = lockScript;if (reentrant) {script = lockScript_reentrant;}Object result = redisUtils.evalScript(script, ReturnType.INTEGER, 1, lockKey, uuid, String.valueOf(defaultExpireTime));boolean isSuccess = result == null;if (isSuccess) {// 若成功,增加延时任务scheduleExpirationRenew(lockKey, uuid, reentrant);}return isSuccess;}/*** 解锁* @author: lixinyu 2023/4/25**/public void unlock(String lockKey){if (reentrant) {redisUtils.evalScript(unlockScript_reentrant, ReturnType.INTEGER, 1, lockKey, uuid, String.valueOf(defaultExpireTime));} else {redisUtils.evalScript(unlockScript, ReturnType.INTEGER, 1, lockKey, uuid);}}/*** 锁延时,定时任务队列,定时判断一次是否续期*/private void scheduleExpirationRenew(String lockKey, String lockValue, boolean reentrant) {Runnable renewTask = new Runnable(){@Overridepublic void run() {try {String script = renewScript;if (reentrant) {script = renewScript_reentrant;}// 将lua语法传给redis解析Object result = evalScript(script, ReturnType.INTEGER, 1, lockKey, lockValue, String.valueOf(defaultExpireTime));if (result != null && !result.equals(false) && result.equals(Long.valueOf(1))) {// 延时成功,再定时执行scheduleExpirationRenew(lockKey, lockValue, reentrant);log.info("redis锁【" + lockKey + "】延时成功!");}} catch (Exception e) {log.error("scheduleExpirationRenew run异常", e);}}};renewExpirationExecutor.schedule(renewTask, defaultExpireTime / 3, TimeUnit.MILLISECONDS);}
}
 /***  将lua语法传给redis*/ public Object evalScript(String script, ReturnType returnType, int numKeys,String... keysAndArgs){Object value = false;try{value = redisTemplate.execute((RedisCallback<Object>)conn -> {try{byte[][] keysAndArgsByte = new byte[keysAndArgs.length][];for (int i = 0; i < keysAndArgs.length; i++ ){keysAndArgsByte[i] = redisTemplate.getStringSerializer().serialize(keysAndArgs[i]);}return conn.eval(redisTemplate.getStringSerializer().serialize(script), returnType, numKeys,keysAndArgsByte);}catch (SerializationException e){log.error("异常", e);return false;}});}catch (Exception e){log.error("异常", e);}return value;}

使用锁

 private void demo() {RedisDistributeLock lock = new RedisDistributeLock(false);String lockKey = redisSeqPrefix + "lock:" + seqName;try {if (lock.tryLock(lockKey)) {String redisValue = redisUtils.get(redisSeqPrefix + seqName);// 加锁之后再次判断是否超出规定长度,防止并发时重置多次if (redisValue != null && redisValue.length() > seqLength) {redisUtils.set(redisSeqPrefix + seqName, "1");}}} catch (Exception e) {logger.error("resetSeqValue异常", e);} finally {lock.unlock(lockKey);}}

不仅仅是锁延期需要两步(判断锁是否存在、重置时间),删除锁也需要两步(判断锁是否存在、删除锁)这也需要保证原子性,所以建议使用lua脚本实现。

你干脆想到:要不然我获取锁、解锁、获取可重入锁、解可重入锁,锁延期等等都写成lua脚本吧,但是工作量好像就变多了。

Redisson 提供了高级的分布式对象和服务,使用起来非常简单,不需要手动编写复杂的 Lua 脚本。只需要引入Redisson 的依赖库
Redisson 提供了许多高级功能,如分布式集合、分布式锁、分布式队列等。这些功能是为了解决常见的分布式应用场景而设计的,使用 Redisson 可以更轻松地集成这些功能

如果你只是一些简单的功能,可以自定义lua脚本实现,毕竟引入新的依赖库,就需要维护这个库,看怎么考虑了。

这篇关于对Redis锁延期的一些讨论与思考的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/752135

相关文章

零基础学习Redis(10) -- zset类型命令使用

zset是有序集合,内部除了存储元素外,还会存储一个score,存储在zset中的元素会按照score的大小升序排列,不同元素的score可以重复,score相同的元素会按照元素的字典序排列。 1. zset常用命令 1.1 zadd  zadd key [NX | XX] [GT | LT]   [CH] [INCR] score member [score member ...]

【编程底层思考】垃圾收集机制,GC算法,垃圾收集器类型概述

Java的垃圾收集(Garbage Collection,GC)机制是Java语言的一大特色,它负责自动管理内存的回收,释放不再使用的对象所占用的内存。以下是对Java垃圾收集机制的详细介绍: 一、垃圾收集机制概述: 对象存活判断:垃圾收集器定期检查堆内存中的对象,判断哪些对象是“垃圾”,即不再被任何引用链直接或间接引用的对象。内存回收:将判断为垃圾的对象占用的内存进行回收,以便重新使用。

Redis中使用布隆过滤器解决缓存穿透问题

一、缓存穿透(失效)问题 缓存穿透是指查询一个一定不存在的数据,由于缓存中没有命中,会去数据库中查询,而数据库中也没有该数据,并且每次查询都不会命中缓存,从而每次请求都直接打到了数据库上,这会给数据库带来巨大压力。 二、布隆过滤器原理 布隆过滤器(Bloom Filter)是一种空间效率很高的随机数据结构,它利用多个不同的哈希函数将一个元素映射到一个位数组中的多个位置,并将这些位置的值置

Lua 脚本在 Redis 中执行时的原子性以及与redis的事务的区别

在 Redis 中,Lua 脚本具有原子性是因为 Redis 保证在执行脚本时,脚本中的所有操作都会被当作一个不可分割的整体。具体来说,Redis 使用单线程的执行模型来处理命令,因此当 Lua 脚本在 Redis 中执行时,不会有其他命令打断脚本的执行过程。脚本中的所有操作都将连续执行,直到脚本执行完成后,Redis 才会继续处理其他客户端的请求。 Lua 脚本在 Redis 中原子性的原因

laravel框架实现redis分布式集群原理

在app/config/database.php中配置如下: 'redis' => array('cluster' => true,'default' => array('host' => '172.21.107.247','port' => 6379,),'redis1' => array('host' => '172.21.107.248','port' => 6379,),) 其中cl

Redis的rehash机制

在Redis中,键值对(Key-Value Pair)存储方式是由字典(Dict)保存的,而字典底层是通过哈希表来实现的。通过哈希表中的节点保存字典中的键值对。我们知道当HashMap中由于Hash冲突(负载因子)超过某个阈值时,出于链表性能的考虑,会进行Resize的操作。Redis也一样。 在redis的具体实现中,使用了一种叫做渐进式哈希(rehashing)的机制来提高字典的缩放效率,避

【吊打面试官系列-Redis面试题】说说 Redis 哈希槽的概念?

大家好,我是锋哥。今天分享关于 【说说 Redis 哈希槽的概念?】面试题,希望对大家有帮助; 说说 Redis 哈希槽的概念? Redis 集群没有使用一致性 hash,而是引入了哈希槽的概念,Redis 集群有 16384 个哈希槽,每个 key 通过 CRC16 校验后对 16384 取模来决定放置哪个槽, 集群的每个节点负责一部分 hash 槽。

Redis地理数据类型GEO

通常要计算两个地理位置的距离不是很方便,这里可以直接通过Redis提供的GEO操作来完成地理位置相关的计算 1)添加地理位置 语法:geoadd key longitude latitude member [longitude latitude member] ...字段说明:key:存放地理位置的集合名称longitude:地理坐标的经度latitude:地理坐标的纬度member:表示这

Redis-主从集群

主从架构 单节点Redis的并发能力是有上限的,要进一步提高Redis的并发能力,就需要搭建主从集群,实现读写分离。 主从数据同步原理 全量同步 主从第一次建立连接时,会执行全量同步,将master节点的所有数据都拷贝给slave节点,流程: 判断是否是第一次同步,如果是,返回版本信息(replication id 和offset),将salve节点的版本信息变为master的

Redis安装使用总结

一、下载安装 从 github 下载:https://github.com/MSOpenTech/redis/releases 或者 https://github.com/ServiceStack/redis-windows 解压缩,如图: 二、配置 打开redis.windows-sevice.conf文件, 2.1 绑定ip:搜索127.0.0.1 ,发现bind 127.0.0.