五、分布式锁-redission

2024-03-25 02:36
文章标签 分布式 redission

本文主要是介绍五、分布式锁-redission,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

源码仓库地址:git@gitee.com:chuangchuang-liu/hm-dingping.git

1、redission介绍

目前基于redis的setnx特性实现的自定义分布式锁仍存在的问题:

问题描述
重入问题同一个线程无法多次获取统一把锁。当方法A成功获取锁后,调用方法B,方法B也要获取锁,此时由于锁是不可重入的,也就是被方法A占用着,此时就产生了死锁的问题
不可重试自定义分布式锁无失败重试机制
超时释放锁的超时释放虽然可以避免死锁问题,但确实也可能存在业务执行时间比较长的情况,那这种情况下就仍存在安全隐患问题
主从一致性如果Redis提供了主从集群,当我们向集群写数据时,主机需要异步的将数据同步给从机,而万一在同步过去之前,主机宕机了,就会出现死锁问题。

什么是Redission?
Redission是一个用于Java的Redis客户端,它提供了丰富的特性,包括内存数据网格的功能。它支持同步/异步/RxJava/Reactive API,拥有超过50种基于Redis的Java对象和服务。Redission的使用非常简单,没有学习曲线,您不需要了解任何Redis命令就可以开始使用。(GitHub - redisson/redisson, Redisson官网)
Redission可以让Java应用更方便地访问和操作Redis数据存储,适合于需要高性能和高并发的应用场景。

2、快速开始

  1. 导入依赖
<!--redission-->
<dependency><groupId>org.redisson</groupId><artifactId>redisson</artifactId><version>3.13.6</version>
</dependency>
  1. Redission配置客户端
@Configuration
public class RedisConfig {@Beanpublic RedissonClient redisClient(){Config config = new Config();// 可以用"rediss://"来启用SSL连接config.useSingleServer().setAddress("redis://192.168.224.128:6379").setPassword("123456");return Redisson.create(config);}
}
  1. 使用Redission分布式锁
@Resource
private RedissionClient redissonClient;@Test
void testRedisson() throws Exception{//获取锁(可重入),指定锁的名称RLock lock = redissonClient.getLock("anyLock");//尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放时间,时间单位boolean isLock = lock.tryLock(1,10,TimeUnit.SECONDS);//判断获取锁成功if(isLock){try{System.out.println("执行业务");          }finally{//释放锁lock.unlock();}}
}

3、redission可重入锁原理

3.1、原理介绍

在Lock锁中,他是借助于底层的一个voaltile的一个state变量来记录重入的状态的,比如当前没有人持有这把锁,那么state=0,假如有人持有这把锁,那么state=1,如果持有这把锁的人再次持有这把锁,那么state就会+1 ,如果是对于synchronized而言,他在c语言代码中会有一个count,原理和state类似,也是重入一次就加一,释放一次就-1 ,直到减少成0 时,表示当前这把锁没有被人持有。 、

而在redission中,也支持这种可重入锁原理,是通过redis的hash数据结构实现的。其中key表示这把锁是否存在,field判断锁是被哪个线程持有,value则记录锁被持有次数。
image.png

3.2、源码剖析

  • 获取锁

其中各参数解释:
KEYS[1]:锁的名称
ARGV[1]:锁过期时间
ARGV[2]:id + “:” + threadId

"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);"

判断锁是否存在
如果不存在,则设置当前线程标识,计数器+1;设置过期时间;
如果存在。做二次判断,判断锁的持有线程是不是自己?
如果是,计数器+1,重置锁的过期时间;
如果不是,获取锁失败,返回锁的剩余过期时间

  • 释放锁

其中各参数解释:
KEYS[1]:锁的名称
KEYS[2]:订阅频道
ARGV[1]:是要发布的消息内容
ARGV[2]:锁过期时间
ARGV[3]:id + “:” + threadId

"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;"

判断锁是不是当前线程?
不是==>直接返回
是==>计数器–
二次判断,判断计数器是否大于0
大于0==>重置锁过期时间
否则==>真正释放锁

4、redission锁重试和WatchDog机制

4.1、redission是如何解决不可重试的?

源码剖析:
用户调用tryLock方法时,指定waitTime最大等待时间

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {long time = unit.toMillis(waitTime);long current = System.currentTimeMillis();long threadId = Thread.currentThread().getId();Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}time -= System.currentTimeMillis() - current;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}current = System.currentTimeMillis();RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {if (!subscribeFuture.cancel(false)) {subscribeFuture.onComplete((res, e) -> {if (e == null) {unsubscribe(subscribeFuture, threadId);}});}acquireFailed(waitTime, unit, threadId);return false;}try {time -= System.currentTimeMillis() - current;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}while (true) {long currentTime = System.currentTimeMillis();ttl = tryAcquire(waitTime, leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return true;}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}// waiting for messagecurrentTime = System.currentTimeMillis();if (ttl >= 0 && ttl < time) {subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);}time -= System.currentTimeMillis() - currentTime;if (time <= 0) {acquireFailed(waitTime, unit, threadId);return false;}}} finally {unsubscribe(subscribeFuture, threadId);}//        return get(tryLockAsync(waitTime, leaseTime, unit));
}
  1. 计算等待时间和获取当前时间:将用户指定的等待时间转换为毫秒,并记录方法调用时的当前时间。
  2. 尝试获取锁。如果ttl为空,则获取锁成功;否则,返回的是其他线程占用锁的剩余有效时间
  3. 检查剩余等待时间。如果time小于等于0,调用acquireFailed方法返回false
  4. 订阅锁。通过subscribe方法订阅相关的锁。如果在剩余时间内未能订阅成功,处理取消订阅并调用acquireFailed方法,返回false。
  5. 循环等待锁释放消息。等待过程中会调用tryAcquire方法获取锁,如果获取成功返回true
  6. 处理锁的ttl。如果ttl大于0,返回锁被其他线程占用的剩余过期时间(ttl)。更新剩余等待时间(time)。以time和ttl中较小的值继续等待再次尝试。
  7. 再次检查等于剩余等待时间。如果小于0,调用acquireFailed方法返回false
  8. 循环结束后(要么获取锁成功,要么超过最大等待时间了),最终调用unsubscribe方法取消订阅

结论1:redission不是获取锁失败后立即进行重试,而是等待“一定时间”后再进行重试,节省了一定的CPU资源,对服务器性能有一定提升;
结论2:一定要采取调用tryLock方法携带参数waitTime的重载方法,其他重载的tryLock方法底层是不具备重试机制的。

4.2、redission是如何解决锁超时释放的-看门狗机制?

自定义分布式锁仍存在的一个问题是:锁的超时释放虽然可以避免死锁问题,但确实也可能存在业务执行时间比较长的情况,那这种情况下业务还未执行完毕,锁就被释放了,存在一定的安全隐患。
源码剖析:

private RFuture<Boolean> tryAcquireOnceAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {if (leaseTime != -1) {return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);}RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(waitTime,commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);ttlRemainingFuture.onComplete((ttlRemaining, e) -> {if (e != null) {return;}// lock acquiredif (ttlRemaining) {// 开启任务更新过期时间scheduleExpirationRenewal(threadId);}});return ttlRemainingFuture;
}
private void renewExpiration() {ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ee == null) {return;}Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (ent == null) {return;}Long threadId = ent.getFirstThreadId();if (threadId == null) {return;}RFuture<Boolean> future = renewExpirationAsync(threadId);future.onComplete((res, e) -> {if (e != null) {log.error("Can't update lock " + getName() + " expiration", e);return;}if (res) {// reschedule itselfrenewExpiration();}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);ee.setTimeout(task);
}
protected RFuture<Boolean> renewExpirationAsync(long threadId) {return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;",Collections.singletonList(getName()),internalLockLeaseTime, getLockName(threadId));}
  1. 如果没有指定leaseTime,那么底层会默认传入commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout()看门狗时间
  2. 在leasetime的1/3处时间,会创建一个任务renewExpirationAsync方法来异步地更新重置锁过期时间
  3. 递归地调用自身来更新锁过期时间,直到业务处理完毕。

至此redission解决了因业务阻塞而导致锁提前释放的问题

业务执行完毕,释放锁源码剖析:

public RFuture<Void> unlockAsync(long threadId) {RPromise<Void> result = new RedissonPromise<Void>();// 释放锁RFuture<Boolean> future = unlockInnerAsync(threadId);future.onComplete((opStatus, e) -> {// 取消锁失效时间更新重置任务cancelExpirationRenewal(threadId);if (e != null) {result.tryFailure(e);return;}if (opStatus == null) {IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "+ id + " thread-id: " + threadId);result.tryFailure(cause);return;}result.trySuccess(null);});return result;
}void cancelExpirationRenewal(Long threadId) {ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());if (task == null) {return;}if (threadId != null) {task.removeThreadId(threadId);}if (threadId == null || task.hasNoThreads()) {Timeout timeout = task.getTimeout();if (timeout != null) {timeout.cancel();}// 删除递归更新锁时间任务EXPIRATION_RENEWAL_MAP.remove(getEntryName());}
}

当业务执行完毕且锁正常释放后,删除递归更新锁时间任务,避免redission一直递归创建任务更新锁过期时间

5、redission锁的MultiLock原理

为了提高redis的可用性,我们会搭建集群或者主从,现在以主从为例
此时我们去写命令,写在主机上, 主机会将数据同步给从机,但是假设在主机还没有来得及把数据写入到从机去的时候,此时主机宕机,哨兵会发现主机宕机,并且选举一个slave变成master,而此时新的master中实际上并没有锁信息,此时锁信息就已经丢掉了。
1653553998403.png
为了解决该问题,redission的方案是去掉redis集群主从关系,每一个节点都是平等的。加锁逻辑是需要写入到每一个节点上才算加锁成功。这样,当某一台机器宕机了,这台机器的slave节点变为master节点,此时另一个线程趁虚而入,虽然可以正常写入,但其它机器仍会写入失败,最终结果仍是获取锁失败,从而保证了获取锁的可靠性。
1653554055048.png
MulitLock源码剖析:

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {//        try {//            return tryLockAsync(waitTime, leaseTime, unit).get();//        } catch (ExecutionException e) {//            throw new IllegalStateException(e);//        }long newLeaseTime = -1;if (leaseTime != -1) {if (waitTime == -1) {newLeaseTime = unit.toMillis(leaseTime);} else {newLeaseTime = unit.toMillis(waitTime)*2;}}long time = System.currentTimeMillis();long remainTime = -1;if (waitTime != -1) {remainTime = unit.toMillis(waitTime);}long lockWaitTime = calcLockWaitTime(remainTime);int failedLocksLimit = failedLocksLimit();List<RLock> acquiredLocks = new ArrayList<>(locks.size());for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {RLock lock = iterator.next();boolean lockAcquired;try {if (waitTime == -1 && leaseTime == -1) {lockAcquired = lock.tryLock();} else {long awaitTime = Math.min(lockWaitTime, remainTime);lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);}} catch (RedisResponseTimeoutException e) {unlockInner(Arrays.asList(lock));lockAcquired = false;} catch (Exception e) {lockAcquired = false;}if (lockAcquired) {acquiredLocks.add(lock);} else {if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {break;}if (failedLocksLimit == 0) {unlockInner(acquiredLocks);if (waitTime == -1) {return false;}failedLocksLimit = failedLocksLimit();acquiredLocks.clear();// reset iteratorwhile (iterator.hasPrevious()) {iterator.previous();}} else {failedLocksLimit--;}}if (remainTime != -1) {remainTime -= System.currentTimeMillis() - time;time = System.currentTimeMillis();if (remainTime <= 0) {unlockInner(acquiredLocks);return false;}}}if (leaseTime != -1) {List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());for (RLock rLock : acquiredLocks) {RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);futures.add(future);}for (RFuture<Boolean> rFuture : futures) {rFuture.syncUninterruptibly();}}return true;}
  1. 遍历锁集合,调用lock.tryLock尝试获取锁。将获取结果传给变量lockAcquired
  2. 如果获取成功,将当前锁存放到acquiredLocks集合中
  3. 获取成功后,如果此时的剩余等待时间小于等于0,释放自己已获取的锁,返回false
  4. 如果获取失败,判断是否具备重试机制
    1. 没有重试,则直接返回false
    2. 有重试机制,将acquiredLocks集合清空,将iterator指针前移,重新遍历尝试。

6、结论

目前已接触的分布式锁有:

  • 可不重入锁/自定义分布式锁:

原理: 利用setnx特性、expire避免死锁、添加线程标识避免锁误删
缺点: 仍存在不可重入、失败不可重试、锁超时失效等问题

  • 可重入锁:

原理: 利用hash数据结构存储线程标识和重入次数、利用看门狗机制延续锁失效时间、利用信号量机制控制等待重试时间
缺点: 仍存在集群模式下redis宕机导致的锁失效问题

  • MulitLock

原理: 利用多个平等的redis节点,所有redis都写入才算获取锁成功
缺点: 维护成本高,实现相对复杂

这篇关于五、分布式锁-redission的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/843588

相关文章

集中式版本控制与分布式版本控制——Git 学习笔记01

什么是版本控制 如果你用 Microsoft Word 写过东西,那你八成会有这样的经历: 想删除一段文字,又怕将来这段文字有用,怎么办呢?有一个办法,先把当前文件“另存为”一个文件,然后继续改,改到某个程度,再“另存为”一个文件。就这样改着、存着……最后你的 Word 文档变成了这样: 过了几天,你想找回被删除的文字,但是已经记不清保存在哪个文件了,只能挨个去找。真麻烦,眼睛都花了。看

开源分布式数据库中间件

转自:https://www.csdn.net/article/2015-07-16/2825228 MyCat:开源分布式数据库中间件 为什么需要MyCat? 虽然云计算时代,传统数据库存在着先天性的弊端,但是NoSQL数据库又无法将其替代。如果传统数据易于扩展,可切分,就可以避免单机(单库)的性能缺陷。 MyCat的目标就是:低成本地将现有的单机数据库和应用平滑迁移到“云”端

laravel框架实现redis分布式集群原理

在app/config/database.php中配置如下: 'redis' => array('cluster' => true,'default' => array('host' => '172.21.107.247','port' => 6379,),'redis1' => array('host' => '172.21.107.248','port' => 6379,),) 其中cl

基于MySQL实现的分布式锁

概述 在单机时代,虽然不需要分布式锁,但也面临过类似的问题,只不过在单机的情况下,如果有多个线程要同时访问某个共享资源的时候,我们可以采用线程间加锁的机制,即当某个线程获取到这个资源后,就立即对这个资源进行加锁,当使用完资源之后,再解锁,其它线程就可以接着使用了。例如,在JAVA中,甚至专门提供了一些处理锁机制的一些API(synchronize/Lock等)。 但是到了分布式系统的时代,这种

Kafka 分布式消息系统详细介绍

Kafka 分布式消息系统 一、Kafka 概述1.1 Kafka 定义1.2 Kafka 设计目标1.3 Kafka 特点 二、Kafka 架构设计2.1 基本架构2.2 Topic 和 Partition2.3 消费者和消费者组2.4 Replica 副本 三、Kafka 分布式集群搭建3.1 下载解压3.1.1 上传解压 3.2 修改 Kafka 配置文件3.2.1 修改zookeep

Spring Cloud整合Seata实现分布式事务

文章目录 1.Seata1.1 官网1.2 下载1.3 通过安装包运行seata1.3.1 解压seata-server-1.3.0.zip1.3.2 修改 conf/file.conf 配置文件1.3.3 修改conf/registry.conf配置文件1.3.4 添加seata配置信息到nacos1.3.5 配置seata服务端数据库表结构1.3.6 启动seata 2.Spring

ELK+Spring Cloud搭建分布式日志中心

ELK+Spring Cloud搭建分布式日志中心 1.ELK简介2.资源包下载3.Elasticsearch安装3.1 解压Elasticsearch3.2 修改Elasticsearch的配置文件3.3 修改系统配置3.4 启动Elasticsearch 4.ElasticSearch-head插件安装5.Logstash安装6.Kibana安装7.SpringCloud集成logsta

Redis进阶(七):分布式锁

在分布式系统下,涉及到多个节点访问同一个公共资源的情况,此时需要通过 锁 进行互斥控制:避免出现 线程安全问题。 1.分布式锁的基本实现 超卖问题: 解决: 采用redis实现分布式锁 可用采取:在购票的时候,操作过程中需要先加锁。在redis上设置一个key - value,完成上述买票操作,再把key - value 删掉。如果发现key - value 存在,就加锁失败,无法进

聊聊分布式,再讨论分布式解决方案

前言 最近很久没有写博客了,一方面是因为公司事情最近比较忙,另外一方面是因为在进行 CAP 的下一阶段的开发工作,不过目前已经告一段落了。 接下来还是开始我们今天的话题,说说分布式事务,或者说是我眼中的分布式事务,因为每个人可能对其的理解都不一样。 分布式事务是企业集成中的一个技术难点,也是每一个分布式系统架构中都会涉及到的一个东西,特别是在微服务架构中,几乎可以说是无法避免,本文就分布式事

分布式 事务的几种实现方案

背景 四月初,去面试了本市的一家之前在做办公室无人货架的公司,虽然他们现在在面临着转型,但是对于我这种想从传统企业往互联网行业走的孩子来说,还是比较有吸引力的。 在面试过程中就提到了分布式事务问题。我又一次在没有好好整理的问题上吃了亏,记录一下,还是长记性 !!! 先看面试过程 面试官先是在纸上先画了这样一张图: 让我看这张图按照上面的流程走,有没有什么问题?面试官并没有直接说出来这里面