Redisson分布式锁原理解析

2024-06-08 00:52

本文主要是介绍Redisson分布式锁原理解析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前言

首先Redis执行命令是单线程的,所以可以利用Redis实现分布式锁,而对于Redis单线程的问题,是其线程模型的问题,本篇重点是对目前流行的工具Redisson怎么去实现的分布式锁进行深入理解;开始之前,我们可以下你思考一个问题,Redisson的实现方式有何不同?为什么?

使用

引入依赖

 <dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.25.0</version>
</dependency>

添加配置

    @Autowiredprivate RedisProperties redisProperties;@Beanpublic RedissonClient redisClient() {Config config = new Config();config.setTransportMode(TransportMode.NIO);SingleServerConfig singleServerConfig = config.useSingleServer();singleServerConfig.setAddress(String.format("redis://%s:%s", redisProperties.getHost(), redisProperties.getPort()));singleServerConfig.setPassword(redisProperties.getPassword());return Redisson.create(config);}

加锁

private final RedissonClient redissonClient;public void lock() throws Exception {RLock lock = redissonClient.getLock(PRODUCT_LOCK_KEY + id);if (lock.tryLock(100, TimeUnit.MILLISECONDS)) {// 业务} else {// 业务}
}

如上使用是最简单的方式,Redission它底层封装了很多逻辑,但如果说要redis客户端实现,你要怎么实现?

Redis中本就有支持分布式锁的命令:setnx,对应RedisTemplate中,使用如下:

redisTemplate.opsForValue().setIfAbsent("lockkey", "", 1, TimeUnit.SECONDS);

使用Redis实现分布式锁需要注意的是不要产生死锁,所以使用Redis实现分布式锁有两种方式:

  1. setnx命令
  2. lua脚本

两种方式都是一个操作完成key,value的设置以及过期时间的设置,你是否有相关,他们的实现是否都一样,或者说,这个实现可以有其他实现方式?

源码

原理

  1. lua脚本保证多个命令的原子性;
  2. 采用hash数据结构,key为锁的名称,field是线程对应的名称,也因为这个数据结构,也支持可重入锁;
  3. 定时延时的操作避免死锁(看门狗);

lock()

我们先看lock()方法,tryLock()lock()底层是一样的,所以我们只看lock方法;

   @Overridepublic void lock() {try {lock(-1, null, false);} catch (InterruptedException e) {throw new IllegalStateException();}}

位置:org.redisson.RedissonLock#lock(long, java.util.concurrent.TimeUnit, boolean)

传参是:(-1, null, false)

image-20240402175531643

/**
leaseTime: 过期时间
unit:过期时间单位
interruptibly: 信号量,是否打断线程
*/
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {// 取线程ID,后面作为锁的一个标志long threadId = Thread.currentThread().getId();// 加锁// 两个步骤:// 1. 利用lua脚本设置锁与过期时间(原子操作),过期时间lockWatchdogTimeout = 30 * 1000 = 30秒// 2. 执行过期时间刷新(这里的刷新时利用回调的方式 + 延迟执行实现的定时任务),lockWatchdogTimeout/3 = 10秒Long ttl = tryAcquire(-1, leaseTime, unit, threadId);// lua脚本中,加锁成功返回nil,对应redis中是null,加锁失败,则返回的是已存在的锁的过期时间// 所以这里返回null,就是加锁成功了,就不再往下走了if (ttl == null) {return;}// 加锁成功的在上面就已经结束了,所以下面的都是加锁失败时走的// 这里它订阅了一个channel,参数threadId无用,别被误导了,它订阅的名称时固定的// 为什么这里要订阅?可以思考一下CompletableFuture<RedissonLockEntry> future = subscribe(threadId);// 检查是否超时pubSub.timeout(future);RedissonLockEntry entry;// 这里的interruptibly应该时程序一次时,是否结束,而不是一直在执行中if (interruptibly) {entry = commandExecutor.getInterrupted(future);} else {entry = commandExecutor.get(future);}try {// 这里就是一个自旋 + 加锁while (true) {// 每次循环都进行加锁操作ttl = tryAcquire(-1, leaseTime, unit, threadId);// 同样,如果这里时null,那么就是加锁成功了if (ttl == null) {break;}// 加锁失败:返回了锁的过期时间if (ttl >= 0) {try {// 信号量 + unsafe.park // 信号量:本地更改线程共享变量状态达到加锁的目的// unsafe.park:利用park方法将加锁失败(信号量更改失败)的线程进行挂起// 为什么要挂起,这个问题应该不用多说entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {if (interruptibly) {throw e;}entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);}} else {// 这个分支时ttl < 0,也就是过期时间为负数,也就是锁失效了// 对本地加锁,之后在下一次循环redis加锁if (interruptibly) {entry.getLatch().acquire();} else {entry.getLatch().acquireUninterruptibly();}}}} finally {unsubscribe(future, threadId);}
//        get(lockAsync(leaseTime, unit));}

上面的步骤就是Redisson大致逻辑:

  1. lua脚本加锁,记录线程id,防止非本线程解锁
  2. 成功则退出
  3. 添加订阅对应的channel(这里的订阅是异步的)
    1. 唤醒:当收到channel的通知后,也就是上一个锁解锁了
    2. 从第6步醒来
  4. 自旋
  5. lua脚本加锁(与第一步一样),存在订阅后,上一个锁解锁了,就不用再挂起线程
  6. 失败挂起:unsafe.park
  7. 加锁成功后,取消订阅

lua脚本加锁

image-20240402175746999image-20240402175814387

image-20240403103101607

简单说一下这个脚本做了什么(lua脚本类似js):

redis.call是调用redis命令,第一个参数是redis的命令,第二个参数是参数;

它先是exists判断了key,以及hash数据结构了的field是否存在,

如果存在,对field递增,为什么要递增?思考一下;并设置过期时间,返回返回nil

如果不存在,调用redis命令pttl,获取key的过期时间,并返回;

那如果我们自己写lua脚本呢?

redis 2.6之后支持lua脚本,一个脚本,执行这个脚本是原子性的,所以脚本里的多个命令是原子性的。

如果说要执行批量的命令,可以使用piple,但是管道的话,它并不是原子性的,他只是一次性把批量的命令发给了redis。

LUA脚本格式:

eval "脚本 KEYS[1...N] ARGV[1...N]" count key[1...N] argv[1...N]

KEYS[1…N]:key的展位符,多个时,序号递增,从1开始

ARGV[1…N]:value的展位符,多个时,序号递增,从1开始

count:是对应key输入的个数

key[1…N]:对应KEYS[0…N]的key

argv[1…N]:对应ARGV[0…N]的value

这里就大概简单的说明了一下,使得看本篇的朋友能够理解,详细的还请百度;

看门狗

使用过Redisson的朋友应该都听过“看门狗”,为什么Redisson加锁要看门狗呢?

如果我们使用lock()方法,不设置过期时间,那么应该是永不过期;

好,如果说,加锁成功,在解锁时出了意外,如服务异常退出,或宕机,导致没有解锁,那么这个锁就需要人工干预了,这是有问题的。

所以,在Redisson中,它并不是永不过期,当我们使用lock()方法时,它的参数时-1,但在最终执行lua脚本时传入了默认参数:

位置:org.redisson.RedissonLock#tryAcquireAsync

image-20240402205409341

过期时间时:internalLockLeaseTime;

image-20240402205832964image-20240402205846003

image-20240402205902393

可以看到在初始化时,它便赋予了该值一个默认的30秒;

它并没有将锁设置为无限时间,而是30秒,又怎么保证锁的有效?

所以它还有一个续约的操作,对未使用unlock的锁进行时间延迟,这一操作是为了保证未来某一时刻如果出现服务或其他问题导致解锁失败,产生死锁这样的一个情况。

来看代码:

 private RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {RFuture<Long> ttlRemainingFuture;// leaseTime=-1表示永不过期if (leaseTime > 0) {// 对应方法:tryLock(过期时间, 时间单位)ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {// 对应方法:lock()// 注意它达到参数是:internalLockLeaseTime,上面说了他是30秒// internalLockLeaseTime = lockWatchdogTimeout = 30 * 1000;ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}// 这里是上面ttlRemainingFuture回调结果处理,// 如果出现异常,就unlock解锁CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture);ttlRemainingFuture = new CompletableFutureWrapper<>(s);
// 这里是定时刷新任务CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {// 执行成功返回的时null,所以这里以null为加锁成功的标志if (ttlRemaining == null) {if (leaseTime > 0) {internalLockLeaseTime = unit.toMillis(leaseTime);} else {scheduleExpirationRenewal(threadId);}}return ttlRemaining;});return new CompletableFutureWrapper<>(f);}

image-20240403105722608

image-20240403105635897

handleNoSync它只是针对异常做了处理,正常情况下只是进行了封装,而unlocakInnerAysnc也是在异常时的一个回调;

return是结果封装;

image-20240403105756315

再回到刷新的部分:

它是在加锁完后的一个回调方法,ttlRemaining它就是上面执行的结果,null或者是过期时间

image-20240403110155068

 private RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {RFuture<Long> ttlRemainingFuture;// leaseTime=-1表示永不过期if (leaseTime > 0) {// 对应方法:tryLock(过期时间, 时间单位)ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);} else {// 对应方法:lock()// 注意它达到参数是:internalLockLeaseTime,上面说了他是30秒// internalLockLeaseTime = lockWatchdogTimeout = 30 * 1000;ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);}// 这里是上面ttlRemainingFuture回调结果处理,// 如果出现异常,就unlock解锁CompletionStage<Long> s = handleNoSync(threadId, ttlRemainingFuture);ttlRemainingFuture = new CompletableFutureWrapper<>(s);
// 这里是定时刷新任务CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {// 执行成功返回的时null,所以这里以null为加锁成功的标志if (ttlRemaining == null) {if (leaseTime > 0) {// leaseTime > 0 表示加锁时设置了过期时间// 而ternalLockLeaseTime存在默认值,这里就是取消了默认值internalLockLeaseTime = unit.toMillis(leaseTime);} else {// 那,这里执行 过期时间的定时刷新scheduleExpirationRenewal(threadId);}}return ttlRemaining;});return new CompletableFutureWrapper<>(f);}

image-20240403144946555

过期时间刷新的步骤:

  1. 创建ExpirationEntry,可以看作时一个上下文对象,他是延时的标识
  2. 原子操作,添加上下文ExpirationEntry
  3. 设置当前线程id到上下文
  4. 创建Timeout延时任务,延时10秒
  5. 延时任务执行(异步):
    1. 根据客户端id和锁获取上下文
    2. 通过上下文获取到线程id
    3. 异步执行延时lua脚本
    4. 执行回调:成功,回调本身,回调第四步,失败,移除上下文,取消延时
 protected void scheduleExpirationRenewal(long threadId) {// 1. 创建ExpirationEntry,可以看作时一个上下文对象,他是延时的标识ExpirationEntry entry = new ExpirationEntry();// 2. 原子操作,添加上下文ExpirationEntry// 要进行过期时间刷新,就要先添加这个对象,可以理解为一个刷新的标志ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);// 3. 设置当前线程id到上下文if (oldEntry != null) {// 不为空,说明这个对象已经存在,已经有其他线程执行了过期刷新oldEntry.addThreadId(threadId);} else {// 为空,则是不存在该映射对象(锁不存在)entry.addThreadId(threadId);try {// 执行过期时间刷新// 4. 创建Timeout延时任务,延时10秒// 5. 延时任务执行(异步)://    5.1 根据客户端id和锁获取上下文//    5.2 通过上下文获取到线程id//    5.3 异步执行延时lua脚本//    5.4 执行回调:成功,回调本身,回调第四步,失败,移除上下文,取消延时renewExpiration();} finally {if (Thread.currentThread().isInterrupted()) {cancelExpirationRenewal(threadId);}}}}

下面我们进入renewExpiration方法:

 private void renewExpiration() {// EXPIRATION_RENEWAL_MAP 在上一层方法中添加了刷新的上下文标志ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee == null) {// 如果这里为null,就说明已经有其他线程移除了,那么就是不需要刷新,锁不存在了return;}// 4. 创建Timeout延时任务,延时10秒// 注意这里的时间: internalLockLeaseTime / 3// 之前说过:internalLockLeaseTime = lockWatchdogTimeout = 30 * 1000;// 所以这里是延迟10秒Timeout task = getServiceManager().newTimeout(new TimerTask() {// 5. 延时任务执行(异步)@Overridepublic void run(Timeout timeout) throws Exception {// 5.1 根据客户端id和锁获取上下文// entryName是客户端id+线程id// 每次执行都要检查,刷新的标志存在,就说明锁还在,需要执行ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) {return;}// 5.2 通过上下文获取到线程idLong threadId = ent.getFirstThreadId();if (threadId == null) {return;}// 5.3 异步执行延时lua脚本CompletionStage<Boolean> future = renewExpirationAsync(threadId);// 当延时完成后,执行这个方法,也是个回调future.whenComplete((res, e) -> {if (e != null) {log.error("Can't update lock {} expiration", getRawName(), e);// 异常时,移除刷新的标志// 也是避免死锁的一个方式EXPIRATION_RENEWAL_MAP.remove(getEntryName());return;}// 5.4 成功:回调本身,回调第四步,失败:移除上下文,取消延时if (res) {// 延时成功,回调当前这个方法,以实现定时任务renewExpiration();} else {// 没有成功,也是移除刷新的标志// 这里没有成功的情况,是锁已经不存在了,对应的定时任务就应该停止;// 两个步骤:// 1. task.cancel():通过上下文获取到任务,然后取消// 2. EXPIRATION_RENEWAL_MAP.remove(getEntryName());cancelExpirationRenewal(null);}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);// 将定时任务放到entry中,也就是放到了定时任务的上下文中// 在下一次时,通过上下文获取到这个定时任务ee.setTimeout(task);}

其实他延时的逻辑也是一个lua脚本:

image-20240403154502286

漏了一点:

由Redisson实现JDK的Timeout类,加回调完成的一个定时任务;当当前任务执行完后,又执行本身方法,创建一个延迟任务,也就实现了一个定时任务

这篇关于Redisson分布式锁原理解析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1040794

相关文章

Golang HashMap实现原理解析

《GolangHashMap实现原理解析》HashMap是一种基于哈希表实现的键值对存储结构,它通过哈希函数将键映射到数组的索引位置,支持高效的插入、查找和删除操作,:本文主要介绍GolangH... 目录HashMap是一种基于哈希表实现的键值对存储结构,它通过哈希函数将键映射到数组的索引位置,支持

Python使用getopt处理命令行参数示例解析(最佳实践)

《Python使用getopt处理命令行参数示例解析(最佳实践)》getopt模块是Python标准库中一个简单但强大的命令行参数处理工具,它特别适合那些需要快速实现基本命令行参数解析的场景,或者需要... 目录为什么需要处理命令行参数?getopt模块基础实际应用示例与其他参数处理方式的比较常见问http

Python利用ElementTree实现快速解析XML文件

《Python利用ElementTree实现快速解析XML文件》ElementTree是Python标准库的一部分,而且是Python标准库中用于解析和操作XML数据的模块,下面小编就来和大家详细讲讲... 目录一、XML文件解析到底有多重要二、ElementTree快速入门1. 加载XML的两种方式2.

Java的栈与队列实现代码解析

《Java的栈与队列实现代码解析》栈是常见的线性数据结构,栈的特点是以先进后出的形式,后进先出,先进后出,分为栈底和栈顶,栈应用于内存的分配,表达式求值,存储临时的数据和方法的调用等,本文给大家介绍J... 目录栈的概念(Stack)栈的实现代码队列(Queue)模拟实现队列(双链表实现)循环队列(循环数组

java解析jwt中的payload的用法

《java解析jwt中的payload的用法》:本文主要介绍java解析jwt中的payload的用法,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Java解析jwt中的payload1. 使用 jjwt 库步骤 1:添加依赖步骤 2:解析 JWT2. 使用 N

Python中__init__方法使用的深度解析

《Python中__init__方法使用的深度解析》在Python的面向对象编程(OOP)体系中,__init__方法如同建造房屋时的奠基仪式——它定义了对象诞生时的初始状态,下面我们就来深入了解下_... 目录一、__init__的基因图谱二、初始化过程的魔法时刻继承链中的初始化顺序self参数的奥秘默认

Java 正则表达式URL 匹配与源码全解析

《Java正则表达式URL匹配与源码全解析》在Web应用开发中,我们经常需要对URL进行格式验证,今天我们结合Java的Pattern和Matcher类,深入理解正则表达式在实际应用中... 目录1.正则表达式分解:2. 添加域名匹配 (2)3. 添加路径和查询参数匹配 (3) 4. 最终优化版本5.设计思

使用Java将DOCX文档解析为Markdown文档的代码实现

《使用Java将DOCX文档解析为Markdown文档的代码实现》在现代文档处理中,Markdown(MD)因其简洁的语法和良好的可读性,逐渐成为开发者、技术写作者和内容创作者的首选格式,然而,许多文... 目录引言1. 工具和库介绍2. 安装依赖库3. 使用Apache POI解析DOCX文档4. 将解析

Java字符串处理全解析(String、StringBuilder与StringBuffer)

《Java字符串处理全解析(String、StringBuilder与StringBuffer)》:本文主要介绍Java字符串处理全解析(String、StringBuilder与StringBu... 目录Java字符串处理全解析:String、StringBuilder与StringBuffer一、St

Spring Boot循环依赖原理、解决方案与最佳实践(全解析)

《SpringBoot循环依赖原理、解决方案与最佳实践(全解析)》循环依赖指两个或多个Bean相互直接或间接引用,形成闭环依赖关系,:本文主要介绍SpringBoot循环依赖原理、解决方案与最... 目录一、循环依赖的本质与危害1.1 什么是循环依赖?1.2 核心危害二、Spring的三级缓存机制2.1 三