Curator分布式锁

2024-05-12 11:28
文章标签 分布式 curator

本文主要是介绍Curator分布式锁,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

系列文章目录


文章目录

  • 系列文章目录
  • 前言


前言

前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站,这篇文章男女通用,看懂了就去分享给你的码吧。
在这里插入图片描述


分布式锁服务宕机,ZooKeeper一般是以集群部署,如果出现ZooKeeper宕机,那么只要当前正常的服务器超过集群的半数,依然可以正常提供服务持有锁资源服务器宕机,假如一台服务器获取锁之后就宕机了, 那么就会导致其他服务器无法再获取该锁. 就会造成死锁问题, 在Curator中, 锁的信息都是保存在临时节点上, 如果持有锁资源的服务器宕机, 那么ZooKeeper 就会移除它的信息, 这时其他服务器就能进行获取锁操作。
在这里插入图片描述
zookeeper安装单机模式

http://www.javacui.com/opensource/445.html

SpringBoot集成Curator实现Zookeeper基本操作

http://www.javacui.com/tool/615.html

SpringBoot集成Curator实现Watch事件监听

http://www.javacui.com/tool/616.html

Zookeeper实现分布式锁的机制

使用zk的临时节点和有序节点,每个线程获取锁就是在zk创建一个临时有序的节点,比如在/lock/目录下。

创建节点成功后,获取/lock目录下的所有临时节点,再判断当前线程创建的节点是否是所有的节点的序号最小的节点。

如果当前线程创建的节点是所有节点序号最小的节点,则认为获取锁成功。

比如当前线程获取到的节点序号为/lock/003,然后所有的节点列表为[/lock/001,/lock/002,/lock/003],则对/lock/002这个节点添加一个事件监听器。

如果锁释放了,会唤醒下一个序号的节点,然后重新执行第3步,判断是否自己的节点序号是最小。

比如/lock/001释放了,/lock/002监听到时间,此时节点集合为[/lock/002,/lock/003],则/lock/002为最小序号节点,获取到锁。

锁分类

InterProcessSemaphoreMutex:分布式不可重入排它锁

InterProcessMutex:分布式可重入排它锁

InterProcessReadWriteLock:分布式读写锁

InterProcessMultiLock:多重共享锁,将多个锁作为单个实体管理的容器

InterProcessSemaphoreV2:共享信号量

Shared Lock 分布式非可重入锁

官网地址:http://curator.apache.org/curator-recipes/shared-lock.html

InterProcessSemaphoreMutex是一种不可重入的互斥锁,也就意味着即使是同一个线程也无法在持有锁的情况下再次获得锁,所以需要注意,不可重入的锁很容易在一些情况导致死锁,比如你写了一个递归。

Shared Reentrant Lockf分布式可重入锁

官网地址:http://curator.apache.org/curator-recipes/shared-reentrant-lock.html

此锁可以重入,但是重入几次需要释放几次。

InterProcessMutex通过在zookeeper的某路径节点下创建临时序列节点来实现分布式锁,即每个线程(跨进程的线程)获取同一把锁前,都需要在同样的路径下创建一个节点,节点名字由uuid+递增序列组成。而通过对比自身的序列数是否在所有子节点的第一位,来判断是否成功获取到了锁。当获取锁失败时,它会添加watcher来监听前一个节点的变动情况,然后进行等待状态。直到watcher的事件生效将自己唤醒,或者超时时间异常返回。

Shared Reentrant Read Write Lock可重入读写锁

官网地址:http://curator.apache.org/curator-recipes/shared-reentrant-read-write-lock.html

读写锁维护一对关联的锁,一个用于只读操作,一个用于写操作。只要没有写锁,读锁可以被多个用户同时持有,而写锁是独占的。

读写锁允许从写锁降级为读锁,方法是先获取写锁,然后就可以获取读锁。但是,无法从读锁升级到写锁。

Multi Shared Lock 多共享锁

官网地址:http://curator.apache.org/curator-recipes/multi-shared-lock.html

多个锁作为一个锁,可以同时在多个资源上加锁。一个维护多个锁对象的容器。当调用acquire()时,获取容器中所有的锁对象,请求失败时,释放所有锁对象。同样调用release()也会释放所有的锁。

Shared Semaphore共享信号量

官网地址:http://curator.apache.org/curator-recipes/shared-semaphore.html

一个计数的信号量类似JDK的Semaphore,所有使用相同锁定路径的jvm中所有进程都将实现进程间有限的租约。此外,这个信号量大多是“公平的” - 每个用户将按照要求的顺序获得租约。

有两种方式决定信号号的最大租约数。一种是由用户指定的路径来决定最大租约数,一种是通过SharedCountReader来决定。

如果未使用SharedCountReader,则不会进行内部检查比如A表现为有10个租约,进程B表现为有20个。因此,请确保所有进程中的所有实例都使用相同的numberOfLeases值。

acuquire()方法返回的是Lease对象,客户端在使用完后必须要关闭该lease对象(一般在finally中进行关闭),否则该对象会丢失。如果进程session丢失(如崩溃),该客户端拥有的所有lease会被自动关闭,此时其他端能够使用这些lease。

编码测试

package com.example.springboot;import com.example.springboot.tool.ZkConfiguration;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.*;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.TimeUnit;/*** @Auther: Java小强* @Date: 2022/2/4 - 19:33* @Decsription: com.example.springboot* @Version: 1.0*/
@SpringBootTest(classes = Application.class)
public class CuratorTest {@Autowiredprivate ZkConfiguration zk;// 共享信号量,多个信号量@Testpublic void testInterProcessSemaphoreV22() throws Exception {CuratorFramework client = zk.curatorFramework();// 创建一个信号量, Curator 以公平锁的方式进行实现final InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/lock", 3);new Thread(new Runnable() {@Overridepublic void run() {// 获取锁对象try {String threadName = Thread.currentThread().getName();// 获取2个许可Collection<Lease> acquire = semaphore.acquire(2);System.out.println(threadName + "获取信号量>>>>>>>>>>>>>>>>>>>>>");Thread.sleep(2 * 1000);semaphore.returnAll(acquire);System.out.println(threadName + "释放信号量>>>>>>>>>>>>>>>>>>>>>");} catch (Exception e) {e.printStackTrace();}}}).start();new Thread(new Runnable() {@Overridepublic void run() {// 获取锁对象try {String threadName = Thread.currentThread().getName();// 获取一个许可Lease lease = semaphore.acquire();System.out.println(threadName + "获取信号量>>>>>>>>>>>>>>>>>>>>>");Thread.sleep(2 * 1000);semaphore.returnLease(lease);System.out.println(threadName + "释放信号量>>>>>>>>>>>>>>>>>>>>>");} catch (Exception e) {e.printStackTrace();}}}).start();while (true) {}}// 共享信号量@Testpublic void testInterProcessSemaphoreV2() throws Exception {CuratorFramework client = zk.curatorFramework();// 创建一个信号量, Curator 以公平锁的方式进行实现final InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/lock", 1);new Thread(new Runnable() {@Overridepublic void run() {// 获取锁对象try {String threadName = Thread.currentThread().getName();// 获取一个许可Lease lease = semaphore.acquire();System.out.println(threadName + "获取信号量>>>>>>>>>>>>>>>>>>>>>");Thread.sleep(2 * 1000);semaphore.returnLease(lease);System.out.println(threadName + "释放信号量>>>>>>>>>>>>>>>>>>>>>");} catch (Exception e) {e.printStackTrace();}}}).start();new Thread(new Runnable() {@Overridepublic void run() {// 获取锁对象try {String threadName = Thread.currentThread().getName();// 获取一个许可Lease lease = semaphore.acquire();System.out.println(threadName + "获取信号量>>>>>>>>>>>>>>>>>>>>>");Thread.sleep(2 * 1000);semaphore.returnLease(lease);System.out.println(threadName + "释放信号量>>>>>>>>>>>>>>>>>>>>>");} catch (Exception e) {e.printStackTrace();}}}).start();while (true) {}}// 多重共享锁@Testpublic void testInterProcessMultiLock() throws Exception {CuratorFramework client = zk.curatorFramework();// 可重入锁final InterProcessLock interProcessLock1 = new InterProcessMutex(client, "/lock");// 不可重入锁final InterProcessLock interProcessLock2 = new InterProcessSemaphoreMutex(client, "/lock");// 创建多重锁对象final InterProcessLock lock = new InterProcessMultiLock(Arrays.asList(interProcessLock1, interProcessLock2));new Thread(new Runnable() {@Overridepublic void run() {try {String threadName = Thread.currentThread().getName();// 获取参数集合中的所有锁lock.acquire();// 因为存在一个不可重入锁, 所以整个 InterProcessMultiLock 不可重入System.out.println(threadName + "----->" + lock.acquire(2, TimeUnit.SECONDS));// interProcessLock1 是可重入锁, 所以可以继续获取锁System.out.println(threadName + "----->" + interProcessLock1.acquire(2, TimeUnit.SECONDS));// interProcessLock2 是不可重入锁, 所以获取锁失败System.out.println(threadName + "----->" + interProcessLock2.acquire(2, TimeUnit.SECONDS));} catch (Exception e) {e.printStackTrace();}}}).start();while (true) {}}// 分布式读写锁@Testpublic void testReadWriteLock() throws Exception {CuratorFramework client = zk.curatorFramework();// 创建共享可重入读写锁final InterProcessReadWriteLock locl1 = new InterProcessReadWriteLock(client, "/lock");final InterProcessReadWriteLock lock2 = new InterProcessReadWriteLock(client, "/lock");new Thread(new Runnable() {@Overridepublic void run() {try {String threadName = Thread.currentThread().getName();locl1.writeLock().acquire(); // 获取锁对象System.out.println(threadName + "获取写锁>>>>>>>>>>>>>>>>>>>>>");Thread.sleep(1 * 1000);locl1.readLock().acquire(); // 获取读锁,锁降级System.out.println(threadName + "获取读锁>>>>>>>>>>>>>>>>>>>>>");Thread.sleep(1 * 1000);locl1.readLock().release();System.out.println(threadName + "释放读锁<<<<<<<<<<<<<<<<<<<<<");locl1.writeLock().release();System.out.println(threadName + "释放写锁<<<<<<<<<<<<<<<<<<<<<");} catch (Exception e) {e.printStackTrace();}}}).start();new Thread(new Runnable() {@Overridepublic void run() {try {String threadName = Thread.currentThread().getName();lock2.writeLock().acquire(); // 获取锁对象System.out.println(threadName + "获取写锁>>>>>>>>>>>>>>>>>>>>>");Thread.sleep(1 * 1000);lock2.readLock().acquire(); // 获取读锁,锁降级System.out.println(threadName + "获取读锁>>>>>>>>>>>>>>>>>>>>>");Thread.sleep(1 * 1000);lock2.readLock().release();System.out.println(threadName + "释放读锁<<<<<<<<<<<<<<<<<<<<<");lock2.writeLock().release();System.out.println(threadName + "释放写锁<<<<<<<<<<<<<<<<<<<<<");} catch (Exception e) {e.printStackTrace();}}}).start();while (true) {}}// 分布式可重入排它锁@Testpublic void testInterProcessMutex() throws Exception {CuratorFramework client = zk.curatorFramework();// 分布式可重入排它锁final InterProcessLock lock = new InterProcessMutex(client, "/lock");final InterProcessLock lock2 = new InterProcessMutex(client, "/lock");new Thread(new Runnable() {@Overridepublic void run() {try {String threadName = Thread.currentThread().getName();lock.acquire(); // 获取锁对象System.out.println(threadName + "获取锁>>>>>>>>>>>>>>>>>>>>>");lock.acquire(); // 测试锁重入System.out.println(threadName + "获取锁>>>>>>>>>>>>>>>>>>>>>");Thread.sleep(1 * 1000);lock.release();System.out.println(threadName + "释放锁<<<<<<<<<<<<<<<<<<<<<");lock.release();System.out.println(threadName + "释放锁<<<<<<<<<<<<<<<<<<<<<");} catch (Exception e) {e.printStackTrace();}}}).start();new Thread(new Runnable() {@Overridepublic void run() {try {String threadName = Thread.currentThread().getName();lock.acquire(); // 获取锁对象System.out.println(threadName + "获取锁>>>>>>>>>>>>>>>>>>>>>");lock.acquire(); // 测试锁重入System.out.println(threadName + "获取锁>>>>>>>>>>>>>>>>>>>>>");Thread.sleep(1 * 1000);lock.release();System.out.println(threadName + "释放锁<<<<<<<<<<<<<<<<<<<<<");lock.release();System.out.println(threadName + "释放锁<<<<<<<<<<<<<<<<<<<<<");} catch (Exception e) {e.printStackTrace();}}}).start();while (true) {}
//        顺序不一定,但是同一个线程可以多次获取,获取几次就必须释放几次,其他线程才能获取到锁}// 分布式不可重入排它锁@Testvoid testInterProcessSemaphoreMutex() throws Exception {CuratorFramework client = zk.curatorFramework();// 分布式不可重入排它锁final InterProcessLock lock = new InterProcessSemaphoreMutex(client, "/lock");final InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, "/lock");new Thread(new Runnable() {@Overridepublic void run() {try {String threadName = Thread.currentThread().getName();lock.acquire(); // 获取锁对象System.out.println(threadName + "获取锁>>>>>>>>>>>>>>>>>>>>>");// 测试锁重入Thread.sleep(2 * 1000);lock.release();System.out.println(threadName + "释放锁<<<<<<<<<<<<<<<<<<<<<");} catch (Exception e) {e.printStackTrace();}}}).start();new Thread(new Runnable() {@Overridepublic void run() {// 获取锁对象try {String threadName = Thread.currentThread().getName();lock2.acquire();System.out.println(threadName + "获取锁>>>>>>>>>>>>>>>>>>>>>");Thread.sleep(2 * 1000);lock2.release();System.out.println(threadName + "释放锁<<<<<<<<<<<<<<<<<<<<<");} catch (Exception e) {e.printStackTrace();}}}).start();while (true) {}
//        顺序不一定,但是必须是获取后再释放其他线程才能获取到锁}
}

这篇关于Curator分布式锁的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/982494

相关文章

ZooKeeper 中的 Curator 框架解析

Apache ZooKeeper 是一个为分布式应用提供一致性服务的软件。它提供了诸如配置管理、分布式同步、组服务等功能。在使用 ZooKeeper 时,Curator 是一个非常流行的客户端库,它简化了 ZooKeeper 的使用,提供了高级的抽象和丰富的工具。本文将详细介绍 Curator 框架,包括它的设计哲学、核心组件以及如何使用 Curator 来简化 ZooKeeper 的操作。 1

集中式版本控制与分布式版本控制——Git 学习笔记01

什么是版本控制 如果你用 Microsoft Word 写过东西,那你八成会有这样的经历: 想删除一段文字,又怕将来这段文字有用,怎么办呢?有一个办法,先把当前文件“另存为”一个文件,然后继续改,改到某个程度,再“另存为”一个文件。就这样改着、存着……最后你的 Word 文档变成了这样: 过了几天,你想找回被删除的文字,但是已经记不清保存在哪个文件了,只能挨个去找。真麻烦,眼睛都花了。看

开源分布式数据库中间件

转自:https://www.csdn.net/article/2015-07-16/2825228 MyCat:开源分布式数据库中间件 为什么需要MyCat? 虽然云计算时代,传统数据库存在着先天性的弊端,但是NoSQL数据库又无法将其替代。如果传统数据易于扩展,可切分,就可以避免单机(单库)的性能缺陷。 MyCat的目标就是:低成本地将现有的单机数据库和应用平滑迁移到“云”端

laravel框架实现redis分布式集群原理

在app/config/database.php中配置如下: 'redis' => array('cluster' => true,'default' => array('host' => '172.21.107.247','port' => 6379,),'redis1' => array('host' => '172.21.107.248','port' => 6379,),) 其中cl

基于MySQL实现的分布式锁

概述 在单机时代,虽然不需要分布式锁,但也面临过类似的问题,只不过在单机的情况下,如果有多个线程要同时访问某个共享资源的时候,我们可以采用线程间加锁的机制,即当某个线程获取到这个资源后,就立即对这个资源进行加锁,当使用完资源之后,再解锁,其它线程就可以接着使用了。例如,在JAVA中,甚至专门提供了一些处理锁机制的一些API(synchronize/Lock等)。 但是到了分布式系统的时代,这种

Kafka 分布式消息系统详细介绍

Kafka 分布式消息系统 一、Kafka 概述1.1 Kafka 定义1.2 Kafka 设计目标1.3 Kafka 特点 二、Kafka 架构设计2.1 基本架构2.2 Topic 和 Partition2.3 消费者和消费者组2.4 Replica 副本 三、Kafka 分布式集群搭建3.1 下载解压3.1.1 上传解压 3.2 修改 Kafka 配置文件3.2.1 修改zookeep

Spring Cloud整合Seata实现分布式事务

文章目录 1.Seata1.1 官网1.2 下载1.3 通过安装包运行seata1.3.1 解压seata-server-1.3.0.zip1.3.2 修改 conf/file.conf 配置文件1.3.3 修改conf/registry.conf配置文件1.3.4 添加seata配置信息到nacos1.3.5 配置seata服务端数据库表结构1.3.6 启动seata 2.Spring

ELK+Spring Cloud搭建分布式日志中心

ELK+Spring Cloud搭建分布式日志中心 1.ELK简介2.资源包下载3.Elasticsearch安装3.1 解压Elasticsearch3.2 修改Elasticsearch的配置文件3.3 修改系统配置3.4 启动Elasticsearch 4.ElasticSearch-head插件安装5.Logstash安装6.Kibana安装7.SpringCloud集成logsta

Redis进阶(七):分布式锁

在分布式系统下,涉及到多个节点访问同一个公共资源的情况,此时需要通过 锁 进行互斥控制:避免出现 线程安全问题。 1.分布式锁的基本实现 超卖问题: 解决: 采用redis实现分布式锁 可用采取:在购票的时候,操作过程中需要先加锁。在redis上设置一个key - value,完成上述买票操作,再把key - value 删掉。如果发现key - value 存在,就加锁失败,无法进

聊聊分布式,再讨论分布式解决方案

前言 最近很久没有写博客了,一方面是因为公司事情最近比较忙,另外一方面是因为在进行 CAP 的下一阶段的开发工作,不过目前已经告一段落了。 接下来还是开始我们今天的话题,说说分布式事务,或者说是我眼中的分布式事务,因为每个人可能对其的理解都不一样。 分布式事务是企业集成中的一个技术难点,也是每一个分布式系统架构中都会涉及到的一个东西,特别是在微服务架构中,几乎可以说是无法避免,本文就分布式事