Zookeeper(五)Zokeeper 环境搭建与Curator使用

2024-03-20 14:28

本文主要是介绍Zookeeper(五)Zokeeper 环境搭建与Curator使用,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

  • 一 环境搭建
    • 1.1 单机环境搭建
    • 1.2 可视化工具ZooKeeper Assistant
    • 1.3 集群环境搭建
  • 二 常用命令
    • 1.1 命令行语法
    • 1.2 数据节点信息
    • 1.3 节点类型
  • 三 CuratorAPI使用
    • 3.1 依赖
    • 3.1 创建会话
    • 3.2 基本使用增删改查
    • 3.3 ACL权限控制
    • 3.4 分布式锁
    • 3.5 分布式计数器
    • 3.6 分布式Barrier
    • 3.7 主从节点选举
    • 3.8 NodeCache监听
    • 3.9 PathChildrenCache监听
    • 3.10 TreeCache监听

  • 官网:Apache ZooKeeper

一 环境搭建

1.1 单机环境搭建

  • 必要环境:JDK
  • 下载地址:https://zookeeper.apache.org/
  • 历史版本:https://archive.apache.org/dist/zookeeper/

image.png
image.png
image.png

  • 我这里是本地环境,说名一下,无脑解压一下,放在本地环境目录

image.png

  • 复制配置文件一份zoo_sample未zoo

image.png

  • 修改配置文件

image.png

  • 启动

image.png

  • 查看

image.png

1.2 可视化工具ZooKeeper Assistant

  • 下载地址:http://www.redisant.cn/za

image.png

  • 查看状况

image.png

1.3 集群环境搭建

  • 这里我是伪集群环境搭建,注意集群环境只能是奇数

image.png

  • 这里我模拟三套服务器环境,一个主节点,两个从节点,主要是配置文件的变化
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
dataDir=D:\\Tools\\Zookeeper\\ServerA\\data
dataLogDir=D:\\Tools\\Zookeeper\\ServerA\\log
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpHost=0.0.0.0
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true
server.1=localhost:2886:3886
server.2=localhost:2887:3887
server.3=localhost:2888:3888

image.png
image.png
server.A=B:C:D;其中 A 是一个数字,表示这个是第几号服务器;B 是这个服务器的 ip 地址;C 表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;D 表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。如果是伪集群的配置方式,由于 B 都是一样,所以不同的 Zookeeper 实例通信端口号不能一样,所以要给它们分配不同的端口号。

  • myid 建立,依次写入1,2,3,id被称为Server ID,用来标识该机器在集群中的机器序号。同时,在每台ZooKeeper机器上,我们都需要在数据目录(即dataDir参数指定的那个目录)下创建一个 myid文件,该文件只有一行内容,并且是一个数字,即对应于每台机器的Server ID数字。

image.png

  • 后面的B,C一样的,一键启动脚本
@echo off
start cmd /k "cd /d D:\Tools\Zookeeper\ServerA\bin && call zkServer.cmd"
echo Starting ZooKeeper ServerA...
start cmd /k "cd /d D:\Tools\Zookeeper\ServerB\bin && call zkServer.cmd"
echo Starting ZooKeeper ServerB...
start cmd /k "cd /d D:\Tools\Zookeeper\ServerC\bin && call zkServer.cmd"
echo Starting ZooKeeper ServerC...
echo All ZooKeeper servers have been started.
  • 查看启动日志可以看到主从节点情况

image.png

二 常用命令

1.1 命令行语法

命令行语法功能描述
help显示所有操作命令
ls path使用ls命令来查看当前znode的子节点[可监听], -w 监听子节点变化, -s 附加次级信息
create普通创建, -s 含有序列, -e 临时(重启或超时消失)
get path获得节点的值[可监听] -w 监听节点内容变化, -s 附加次级信息
set设置节点的具体值
stat查看节点的状态
delete删除节点
deleteall递归删除节点

1.2 数据节点信息

[zk: bigdata01:2181(CONNECTED) 5] ls -s /
[zookeeper]cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x0
cversion = -1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1
  1. cZxid: 创建的事务zxid
    每次修改Zookeeper状态都会产生一个zookeeper事务ID, 事务ID是Zookeeper中所有修改总的次序. 每次修改都有唯一的zxid, 如果zxid1小于zxid2, 那么zxid1在zxid2之前发生.
  2. ctime: znode被创建的毫秒数(从1970开始)
  3. mzxid: znode最后更新的事务zxid
  4. mtime: znode最后修改的毫秒数(从1970开始)
  5. pZxid: znode最后更新的子节点zxid
  6. cversion: znode子节点变化号, znode子节点修改次数
  7. dataversion:znode 数据变化号
  8. aclVersion:znode 访问控制列表的变化号
  9. ephemeralOwner:如果是临时节点,这个是 znode 拥有者的 session id。如果不是临时节点则是 0。
  10. dataLength:znode 的数据长度
  11. numChildren:znode 子节点数量

1.3 节点类型


这个命令就需要自己去练了

三 CuratorAPI使用

3.1 依赖

        <dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>4.0.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>4.0.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-x-discovery</artifactId><version>4.0.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-test</artifactId><version>4.0.0</version><scope>test</scope></dependency>

3.1 创建会话

使用CuratorFrameworkFactory这个工厂类的两个静态方法来创建一个客户端
image.png
image.png
image.png

package com.shu;import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;import java.util.ArrayList;
import java.util.List;/*** @author 31380* @description* @create 2024/3/16 18:39*/
public class CuratorUtils {/*** 创建连接** @param connectionString  连接地址* @param sessionTimeout    会话超时时间* @param connectionTimeout 连接超时时间* @return*/public static CuratorFramework createCuratorFramework(String connectionString, int sessionTimeout, int connectionTimeout) {return CuratorFrameworkFactory.builder().connectString(connectionString).sessionTimeoutMs(sessionTimeout).retryPolicy(new ExponentialBackoffRetry(1000, 3)).connectionTimeoutMs(connectionTimeout).build();}/*** 创建连接** @param connectionString  连接地址* @param sessionTimeout    会话超时时间* @param connectionTimeout 连接超时时间* @param retryPolicy       重试策略* @return*/public static CuratorFramework createCuratorFrameworkWithRetry(String connectionString,int sessionTimeout,int connectionTimeout,RetryPolicy retryPolicy) {return CuratorFrameworkFactory.builder().connectString(connectionString).sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(connectionTimeout).retryPolicy(retryPolicy).build();}/*** 创建一个隔离的命名空间*/public static CuratorFramework createNamespaceCuratorFramework(String connectionString, int sessionTimeout, int connectionTimeout, String namespace) {return CuratorFrameworkFactory.builder().connectString(connectionString).sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(connectionTimeout).namespace(namespace).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();}/*** ZooDefs.Perms.READ:读权限* ZooDefs.Perms.WRITE:写权限* ZooDefs.Perms.CREATE:创建子节点权限* ZooDefs.Perms.DELETE:删除权限* ZooDefs.Perms.ADMIN:管理权限* ZooDefs.Perms.ALL:所有权限* 以下是一些常用的身份验证方案:* Ids.ANYONE_ID_UNSAFE:表示任何人都可以访问* Ids.AUTH_IDS:表示使用已验证的用户身份* Ids.OPEN_ACL_UNSAFE:表示开放的ACL,任何人都可以访问*  ACL acl = new ACL(ZooDefs.Perms.READ, new Id("myUser", "myPassword"));* @return*/public static List<ACL> getAclList() {ArrayList<ACL> acls = new ArrayList<>();// 权限设置ACL acl = new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.ANYONE_ID_UNSAFE);// 添加权限acls.add(acl);return acls;}}

3.2 基本使用增删改查

  • 新增
package com.shu.base;import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;import java.util.List;/*** @author 31380* @description* @create 2024/3/16 18:43*/
public class CuratorCreatTest {/*** 总结:curator创建节点方法* 1.创建节点,如果节点已经存在则抛出异常 create().forPath()* 2.withMode():节点类型: CreateMode.EPHEMERAL 临时节点,CreateMode.PERSISTENT 永久节点* 3.递归创建节点 creatingParentsIfNeeded()* 4.查询所有子节点 getChildren().forPath()* 5.删除节点 delete().forPath()* 6.判断节点是否存在 checkExists().forPath()* 7.关闭连接 close()* @param args* @throws Exception*/public static void main(String[] args) throws Exception {CuratorFramework curatorClint = CuratorUtils.createCuratorFramework("127.0.0.1:2181", 1000, 1000);System.out.println("连接成功!");curatorClint.start();// 创建节点,如果节点已经存在则抛出异常try {curatorClint.create().forPath("/test");} catch (Exception e) {System.out.println("创建节点失败!"+e.getMessage());}// 删除节点try {curatorClint.delete().forPath("/test");System.out.println("删除节点成功!");} catch (Exception e) {System.out.println("删除节点失败!"+e.getMessage());}/*** 临时节点(EPHEMERAL):临时创建的,会话结束节点自动被删除,也可以手动删除,临时节点不能拥有子节点.* 持久节点(PERSISTENT):创建后永久存在,除非主动删除。*/// 临时节点,当会话结束后,节点自动删除curatorClint.create().withMode(CreateMode.EPHEMERAL).forPath("/secondPath", "hello,word".getBytes());System.out.println("临时节点:"+new String(curatorClint.getData().forPath("/secondPath")));// 永久节点curatorClint.create().withMode(CreateMode.PERSISTENT).forPath("/thirdPath", "hello,word".getBytes());System.out.println("永久节点:"+new String(curatorClint.getData().forPath("/thirdPath")));// 递归创建节点curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/parent/child", "hello,word".getBytes());System.out.println("递归创建节点:"+new String(curatorClint.getData().forPath("/parent/child")));// 查询所有子节点List<String> list= curatorClint.getChildren().forPath("/");System.out.println(list);// 关闭连接curatorClint.close();}
}
  • 读取
package com.shu.base;import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.data.Stat;import java.util.Date;/*** @author 31380* @description 读取节点数据* @create 2024/3/17 11:28*/
public class CuratorReadTest {/*** 总结:* 1. 读取单个节点数据:curatorClint.getData().forPath("/base/test")* 2. 读取多个节点数据:curatorClint.getChildren().forPath("/test").forEach(System.out::println)* 3. 读取节点数据并获取 stat:curatorClint.getData().storingStatIn(stat).forPath("/base/test")* 4:Stat:节点状态,包含节点的版本、数据长度、子节点数量、创建时间、修改时间、最近一次修改的事务 ID、数据版本、ACL 版本、临时节点* @param args*/public static void main(String[] args) {// 地址String connectString = "127.0.0.1:2181"; // 确保连接字符串正确CuratorFramework curatorClint = CuratorUtils.createCuratorFramework(connectString, 1000, 1000);System.out.println("连接成功!");curatorClint.start();// 读取单个节点数据try {byte[] bytes = curatorClint.getData().forPath("/base/test");System.out.println(new String(bytes));System.out.println("读取节点数据成功!");} catch (Exception e) {System.out.println("读取节点数据失败!"+e.getMessage());}// 读取多个节点数据try {curatorClint.getChildren().forPath("/test").forEach(System.out::println);System.out.println("读取多个节点数据成功!");} catch (Exception e) {System.out.println("读取多个节点数据失败!"+e.getMessage());}try {Stat stat = new Stat();byte[] data = curatorClint.getData().storingStatIn(stat).forPath("/base/test");String dataString = new String(data);System.out.println("节点数据:" + dataString);System.out.println("节点状态:");System.out.println("  节点创建版本:" + stat.getCversion());System.out.println("  数据长度:" + stat.getDataLength());System.out.println("  子节点数量:" + stat.getNumChildren());System.out.println("  创建时间:" + new Date(stat.getCtime()));System.out.println("  修改时间:" + new Date(stat.getMtime()));System.out.println("  最近一次修改的事务 ID:" + stat.getMzxid());System.out.println("  数据版本:" + stat.getVersion());System.out.println("  ACL 版本:" + stat.getAversion());System.out.println("  临时节点:" + stat.getEphemeralOwner());System.out.println("读取节点数据并获取 stat 成功!");} catch (Exception e) {System.out.println("读取节点数据并获取 stat 失败:" + e.getMessage());}// 关闭连接curatorClint.close();}
}
  • 删除
package com.shu.base;import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;/*** @author 31380* @description* @create 2024/3/16 19:25*/
public class CuratorDeleteTest {/*** 总结:* 1. 删除节点:delete().forPath("/test")* 2. 如果存在子节点,删除子节点:delete().deletingChildrenIfNeeded().forPath("/parent")* 3. 递归删除节点:delete().deletingChildrenIfNeeded().forPath("/secondPath")* 4. 判断节点是否存在:checkExists().forPath("/secondPath")* 5. 关闭连接:close()* @param args* @throws Exception*/public static void main(String[] args) throws Exception {CuratorFramework curatorClint = CuratorUtils.createCuratorFramework("127.0.0.1:2181", 1000, 1000);System.out.println("连接成功!");curatorClint.start();// 删除节点try {curatorClint.delete().forPath("/test");System.out.println("删除节点成功!");} catch (Exception e) {System.out.println("删除节点失败!"+e.getMessage());}// 如果存在子节点,删除子节点try {curatorClint.delete().deletingChildrenIfNeeded().forPath("/parent");System.out.println("删除节点成功!");} catch (Exception e) {System.out.println("删除节点失败!"+e.getMessage());}// 递归删除节点curatorClint.delete().deletingChildrenIfNeeded().forPath("/secondPath");// 判断节点是否存在if (curatorClint.checkExists().forPath("/secondPath") == null) {System.out.println("节点不存在!");} else {System.out.println("节点存在!");}// 关闭连接curatorClint.close();}
}
  • 修改
package com.shu.base;import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.data.Stat;/*** @author 31380* @description* @create 2024/3/17 11:35*/
public class CuratorUpdateTest {/*** 总计* 1. 更新节点:setData().forPath("/test", "hello,word".getBytes())* 2. 指定版本更新节点:setData().withVersion(1).forPath("/test", "hello,word".getBytes())* @param args*/public static void main(String[] args) throws Exception {// 地址String connectString = "127.0.0.1:2181";//创建节点CuratorFramework curatorClint = CuratorUtils.createCuratorFramework(connectString, 1000, 1000);System.out.println("连接成功!");curatorClint.start();// 更新节点try {curatorClint.setData().forPath("/base/test", "hello,word1111".getBytes());// 获取节点数据byte[] bytes = curatorClint.getData().forPath("/base/test");System.out.println(new String(bytes));System.out.println("更新节点成功!");} catch (Exception e) {System.out.println("更新节点失败!"+e.getMessage());}// 先获取节点的版本号Stat stat = new Stat();byte[] data = curatorClint.getData().storingStatIn(stat).forPath("/base/test");String dataString = new String(data);System.out.println("节点数据:" + dataString);System.out.println("节点状态:");System.out.println("  数据版本:" + stat.getVersion());// 指定版本更新节点:CAS 机制try {curatorClint.setData().withVersion(stat.getVersion()).forPath("/base/test", "hello,word2222".getBytes());// 获取节点数据byte[] bytes = curatorClint.getData().forPath("/base/test");System.out.println(new String(bytes));System.out.println("更新节点成功!");} catch (Exception e) {System.out.println("更新节点失败!"+e.getMessage());}}
}
  • 异步创建
package com.shu.base;import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;/*** @author 31380* @description Curator异步操作* @create 2024/3/17 11:42*/
public class CuratorAyncTest {/*** 总结:* 1 异步操作:inBackground()* 2.创建节点,如果节点已经存在则抛出异常 create().forPath()* 3.递归创建节点 creatingParentsIfNeeded()* @param args* @throws Exception*/public static void main(String[] args) throws Exception {// 地址String connectString = "127.0.0.1:2181";//创建节点CuratorFramework curatorClint = CuratorUtils.createCuratorFramework(connectString, 1000, 1000);System.out.println("连接成功!");curatorClint.start();CountDownLatch cdl = new CountDownLatch(2);ExecutorService executorService = Executors.newFixedThreadPool(2);curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground((client, event) -> {System.out.println("Code:" + event.getResultCode());System.out.println("Type:" + event.getType());System.out.println("Path:" + event.getPath());cdl.countDown();}, executorService).forPath("/test1", "hello,word".getBytes());curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground((client, event) -> {System.out.println("Code:" + event.getResultCode());System.out.println("Type:" + event.getType());System.out.println("Path:" + event.getPath());cdl.countDown();}).forPath("/test2", "hello,word".getBytes());cdl.await();executorService.shutdown();curatorClint.close();}/*** 事件类型* CREATE, // 创建* DELETE, // 删除* EXISTS, // 存在* GET_DATA, // 获取数据* SET_DATA, // 设置数据* CHILDREN, // 子节点* SYNC, // 同步* GET_ACL, // 获取ACL* SET_ACL, // 设置ACL* TRANSACTION, // 事务* GET_CONFIG, // 获取配置* RECONFIG, // 重新配置* WATCHED, // 监听* REMOVE_WATCHES, // 移除监听* CLOSING; // 关闭* @param args*//*** 响应码* OK(0), // OK* CONNECTIONLOSS(-4), // 连接丢失* MARSHALLINGERROR(-7), // 编组错误* UNIMPLEMENTED(-9), // 未实现* OPERATIONTIMEOUT(-10), // 操作超时* BADARGUMENTS(-8), // 错误参数* APIERROR(-100), // API错误* NONODE(-101), // 无节点·*/}
  • 不同的顺序节点
package com.shu.base;import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;/*** @author 31380* @description* @create 2024/3/17 11:03*/
public class CuratorSEQCreat {/*** 临时顺序节点(EPHEMERAL_SEQUENTIAL):具有临时节点特征,但是它会有序列号。* 持久顺序节点(PERSISTENT_SEQUENTIAL):具有持久节点特征,但是它会有序列号。* @param args*/public static void main(String[] args) {CuratorFramework curatorClint = CuratorUtils.createCuratorFramework("127.0.0.1:2181", 1000, 1000);System.out.println("连接成功!");curatorClint.start();// 创建一个持久顺序节点A-1,A-2,A-3try {curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/test/A", "hello,word".getBytes());curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/test/A", "hello,word".getBytes());curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/test/A", "hello,word".getBytes());System.out.println("创建节点成功!");} catch (Exception e) {System.out.println("创建节点失败!"+e.getMessage());}// 创建一个临时顺序节点B-1,B-2,B-3try {curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/test/B", "hello,word".getBytes());curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/test/B", "hello,word".getBytes());curatorClint.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/test/B", "hello,word".getBytes());System.out.println("创建节点成功!");} catch (Exception e) {System.out.println("创建节点失败!"+e.getMessage());}// 关闭连接curatorClint.close();}
}
  • 事务
package com.shu.base;import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.transaction.CuratorTransactionResult;import java.util.Collection;/*** @author 31380* @description TODO* @create 2024/3/16 19:28*/
public class CuratorTransactionTest {public static void main(String[] args) throws Exception {CuratorFramework curatorClint = CuratorUtils.createCuratorFramework("127.0.0.1:2181", 1000, 1000);System.out.println("连接成功!");curatorClint.start();Collection<CuratorTransactionResult> commit = curatorClint.inTransaction().create().forPath("/xiao", "456".getBytes()).and().setData().forPath("/xiao", "123".getBytes()).and().commit();for (CuratorTransactionResult result : commit) {System.out.println(result.getForPath() + "--->" + result.getType());}curatorClint.close();}
}

3.3 ACL权限控制

package com.shu.acl;import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;/*** @author 31380* @description* @create 2024/3/16 19:56*/
public class CuratorAclTest {public static void main(String[] args) {CuratorFramework curatorClint = CuratorUtils.createCuratorFramework("127.0.0.1:2181", 1000, 1000);System.out.println("连接成功!");curatorClint.start();// 创建节点,ACL为ip:try {curatorClint.create().withACL(CuratorUtils.getAclList()).forPath("/test");System.out.println("创建节点成功!");} catch (Exception e) {System.out.println("创建节点失败!"+e.getMessage());}}
}/*** @description* @author 31380* @create 2024/3/17 11:12* Schema 代表权限控制模式,分别为:* ● World 任何人* ● Auth 不需要ID* ● Digest 用户名和密码方式的认证* ● IP Address IP地址方式的认证* perms(权限),ZooKeeper支持如下权限* ● CREATE: 创建子节点* ● READ: 获取子节点与自身节点的数据信息* ● WRITE:在Znode节点上写数据* ● DELETE:删除子节点* ● ADMIN:设置ACL权限* ————————————————*/
package com.shu.acl;

3.4 分布式锁

package com.shu.lock;import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;import java.text.SimpleDateFormat;
import java.util.concurrent.CountDownLatch;/*** @author 31380* @description 分布式锁* @create 2024/3/17 13:12*/
public class LockTest {/*** 分布式锁:InterProcessMutex* 1: 获取锁,acquire()* 2: 释放锁,release()* 3: 创建 InterProcessMutex 对象* 4: 调用 acquire() 方法获取锁* 5: 业务操作* 6: 调用 release() 方法释放锁* @param args*/public static void main(String[] args) {CountDownLatch latch = new CountDownLatch(10);String connectString = "127.0.0.1:2181";String lockPath = "/lock";CuratorFramework curatorFramework = CuratorUtils.createCuratorFramework(connectString, 1000, 1000);curatorFramework.start();InterProcessMutex lock = new InterProcessMutex(curatorFramework, lockPath);for (int i = 0; i < 10; i++) {new Thread(() -> {try {latch.await();lock.acquire();System.out.println(Thread.currentThread().getName() + "获取到锁");// 模拟业务操作,生成订单号Thread.sleep(1000);SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");String orderNo = sdf.format(System.currentTimeMillis());System.out.println("生成的订单号:" + orderNo);} catch (Exception e) {e.printStackTrace();} finally {try {lock.release();} catch (Exception e) {e.printStackTrace();}}}, "Thread-" + i).start();latch.countDown();}}
}

3.5 分布式计数器

package com.shu.lock;import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger;/*** @author 31380* @description 分布式计数器* @create 2024/3/17 13:20*/
public class RecipeDisAtomicIntTest {/*** 分布式计数器:DistributedAtomicInteger* 1、创建DistributedAtomicInteger对象* 2、调用add方法* 3、获取当前值** @param args*/public static void main(String[] args) {String connectString = "127.0.0.1:2181";String connectString2 = "127.0.0.1:2182";String connectString3 = "127.0.0.1:2183";CuratorFramework curatorFramework = CuratorUtils.createCuratorFramework(connectString, 1000, 1000);curatorFramework.start();DistributedAtomicInteger atomicInteger = new DistributedAtomicInteger(curatorFramework, "/atomic", null);try {AtomicValue<Integer> added = atomicInteger.add(1);System.out.println("1Result: " + added.succeeded());// 获取当前值System.out.println("2Result: " + added.postValue());} catch (Exception e) {throw new RuntimeException(e);}// 客户端2CuratorFramework curatorFramework2 = CuratorUtils.createCuratorFramework(connectString2, 1000, 1000);curatorFramework2.start();DistributedAtomicInteger atomicInteger2 = new DistributedAtomicInteger(curatorFramework2, "/atomic", null);try {AtomicValue<Integer> added = atomicInteger2.add(1);System.out.println("2Result: " + added.succeeded());// 获取当前值System.out.println("2Result: " + added.postValue());} catch (Exception e) {throw new RuntimeException(e);}// 客户端3CuratorFramework curatorFramework3 = CuratorUtils.createCuratorFramework(connectString3, 1000, 1000);curatorFramework3.start();DistributedAtomicInteger atomicInteger3 = new DistributedAtomicInteger(curatorFramework3, "/atomic", null);try {AtomicValue<Integer> added = atomicInteger3.add(1);System.out.println("3Result: " + added.succeeded());// 获取当前值System.out.println("3Result: " + added.postValue());} catch (Exception e) {throw new RuntimeException(e);}}}

3.6 分布式Barrier

package com.shu.lock;import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;/*** @author 31380* @description* 分布式Barrier:分布式 Barrier 是一种常见的同步原语,用于在分布式系统中协调多个进程或线程的执行顺序。* 它可以用来实现诸如等待直到所有参与者都准备好,然后一起执行某项任务,或者等待直到某些条件达成后再继续执行的场景。* @create 2024/3/17 13:27*/
public class CycliBarrierTest {static DistributedBarrier barrier;public static void main(String[] args) {String connectString = "127.0.0.1:2181";String path="/barrier";CuratorFramework curatorFramework = CuratorUtils.createCuratorFramework(connectString, 1000, 1000);curatorFramework.start();// 等待所有的线程到达barrier 10个线程for (int i = 0; i < 10; i++) {new Thread(() -> {try {barrier = new DistributedBarrier(curatorFramework, path);System.out.println(Thread.currentThread().getName() + "号barrier设置");barrier.setBarrier();barrier.waitOnBarrier();System.out.println("启动...");} catch (Exception e) {e.printStackTrace();}}, "Thread-" + i).start();}try {Thread.sleep(2000);barrier.removeBarrier();} catch (Exception e) {e.printStackTrace();}}
}

3.7 主从节点选举

package com.shu.master;import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;/*** @author 31380* @description 主节点选举* @create 2024/3/17 13:03*/
public class MasterSelectTest {/*** 主节点选举:LeaderSelector* 1、创建LeaderSelector对象* 2、调用start方法* 3、添加监听器* 4、关闭连接* @param args*/public static void main(String[] args) {// 地址String connectString = "127.0.0.1:2181";String connectString2 = "127.0.0.1:2182";String connectString3 = "127.0.0.1:2183";// 创建并连接 CuratorFramework 实例CuratorFramework curatorFramework1 = CuratorUtils.createCuratorFramework(connectString, 1000, 1000);CuratorFramework curatorFramework2 = CuratorUtils.createCuratorFramework(connectString2, 1000, 1000);CuratorFramework curatorFramework3 = CuratorUtils.createCuratorFramework(connectString3, 1000, 1000);curatorFramework1.start();curatorFramework2.start();curatorFramework3.start();// 第一个节点LeaderSelector leaderSelector1 = new LeaderSelector(curatorFramework1, "/master1", new LeaderSelectorListenerAdapter() {@Overridepublic void takeLeadership(CuratorFramework curatorFramework) throws Exception {System.out.println("节点1成为master节点");Thread.sleep(10000);System.out.println("节点1完成master操作,释放master权利");}});leaderSelector1.autoRequeue();leaderSelector1.start();// 第二个节点LeaderSelector leaderSelector2 = new LeaderSelector(curatorFramework2, "/master1", new LeaderSelectorListenerAdapter() {@Overridepublic void takeLeadership(CuratorFramework curatorFramework) throws Exception {System.out.println("节点2成为master节点");Thread.sleep(10000);System.out.println("节点2完成master操作,释放master权利");}});leaderSelector2.autoRequeue();leaderSelector2.start();// 第三个节点LeaderSelector leaderSelector3 = new LeaderSelector(curatorFramework3, "/master1", new LeaderSelectorListenerAdapter() {@Overridepublic void takeLeadership(CuratorFramework curatorFramework) throws Exception {System.out.println("节点3成为master节点");Thread.sleep(10000);System.out.println("节点3完成master操作,释放master权利");}});leaderSelector3.autoRequeue();leaderSelector3.start();try {Thread.sleep(Integer.MAX_VALUE);} catch (InterruptedException e) {e.printStackTrace();}}}

3.8 NodeCache监听

package com.shu.watch;import com.shu.CuratorUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;/*** @author 31380* @description* @create 2024/3/16 19:38*/
public class CuratorNodeCacheTest {/*** NodeCache:监听节点的新增、修改操作* 1、创建NodeCache对象* 2、调用start方法* 3、添加监听器* 4、关闭连接* @param args* @throws Exception*/public static void main(String[] args) throws Exception {String path = "/test";CuratorFramework curatorClint = CuratorUtils.createCuratorFramework("127.0.0.1:2181", 1000, 1000);System.out.println("连接成功!");curatorClint.start();final NodeCache nodeCache = new NodeCache(curatorClint, path);nodeCache.start();nodeCache.getListenable().addListener(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {System.out.println("监听事件触发");System.out.println("重新获得节点内容为:" + new String(nodeCache.getCurrentData().getData()));}});curatorClint.setData().forPath(path,"456".getBytes());curatorClint.setData().forPath(path,"789".getBytes());curatorClint.setData().forPath(path,"123".getBytes());curatorClint.setData().forPath(path,"222".getBytes());curatorClint.setData().forPath(path,"333".getBytes());curatorClint.setData().forPath(path,"444".getBytes());Thread.sleep(15000);}
}

3.9 PathChildrenCache监听

package com.shu.watch;import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;/*** @author 31380* @description* @create 2024/3/16 19:43*/
public class CuratorPathChildrenCacheTest {/*** PathChildrenCache:监听子节点的新增、修改、删除操作* @param args* @throws Exception*/public static void main(String[] args) throws Exception {CuratorFramework client = getClient();String parentPath = "/p1";PathChildrenCache pathChildrenCache = new PathChildrenCache(client,parentPath,false);/* * StartMode:初始化方式* POST_INITIALIZED_EVENT:异步初始化。初始化后会触发事件* NORMAL:异步初始化* BUILD_INITIAL_CACHE:同步初始化* */pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {@Overridepublic void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {System.out.println("事件类型:"  + event.getType() + ";操作节点:" + event.getData().getPath());switch(event.getType()){case CHILD_ADDED:System.out.println("新增子节点:" + event.getData().getPath());break;case CHILD_UPDATED:System.out.println("更新子节点:" + event.getData().getPath());break;case CHILD_REMOVED:System.out.println("删除子节点:" + event.getData().getPath());break;default:break;}}});String path = "/p1/c1";client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path);Thread.sleep(1000); // 此处需留意,如果没有现成睡眠则无法触发监听事件client.delete().forPath(path);Thread.sleep(15000);}private static CuratorFramework getClient(){RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181").retryPolicy(retryPolicy).sessionTimeoutMs(6000).connectionTimeoutMs(3000).namespace("demo").build();client.start();return client;}
}

3.10 TreeCache监听

package com.shu.watch;import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;/*** @author 31380* @description* @create 2024/3/16 19:44*/
public class CuratorWatcher3 {private static final String CONNECT_ADDR = "127.0.0.1:2181";private static final int SESSION_TIMEOUT = 5000;public static void main(String[] args) throws Exception {RetryPolicy policy = new ExponentialBackoffRetry(1000, 10);CuratorFramework curator = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR).sessionTimeoutMs(SESSION_TIMEOUT).retryPolicy(policy).build();curator.start();TreeCache treeCache = new TreeCache(curator, "/treeCache");treeCache.start();treeCache.getListenable().addListener((curatorFramework, treeCacheEvent) -> {switch (treeCacheEvent.getType()) {case NODE_ADDED:System.out.println("NODE_ADDED:路径:" + treeCacheEvent.getData().getPath() + ",数据:" + new String(treeCacheEvent.getData().getData())+ ",状态:" + treeCacheEvent.getData().getStat());break;case NODE_UPDATED:System.out.println("NODE_UPDATED:路径:" + treeCacheEvent.getData().getPath() + ",数据:" + new String(treeCacheEvent.getData().getData())+ ",状态:" + treeCacheEvent.getData().getStat());break;case NODE_REMOVED:System.out.println("NODE_REMOVED:路径:" + treeCacheEvent.getData().getPath() + ",数据:" + new String(treeCacheEvent.getData().getData())+ ",状态:" + treeCacheEvent.getData().getStat());break;default:break;}});curator.create().forPath("/treeCache", "123".getBytes());curator.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/treeCache/c1", "456".getBytes());curator.setData().forPath("/treeCache", "789".getBytes());curator.setData().forPath("/treeCache/c1", "910".getBytes());curator.delete().forPath("/treeCache/c1");curator.delete().forPath("/treeCache");Thread.sleep(5000);curator.close();}
}

详细介绍参考书籍《从Paxos到Zookeeper:分布式一致性原理与实践》

这篇关于Zookeeper(五)Zokeeper 环境搭建与Curator使用的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/829742

相关文章

Zookeeper安装和配置说明

一、Zookeeper的搭建方式 Zookeeper安装方式有三种,单机模式和集群模式以及伪集群模式。 ■ 单机模式:Zookeeper只运行在一台服务器上,适合测试环境; ■ 伪集群模式:就是在一台物理机上运行多个Zookeeper 实例; ■ 集群模式:Zookeeper运行于一个集群上,适合生产环境,这个计算机集群被称为一个“集合体”(ensemble) Zookeeper通过复制来实现

中文分词jieba库的使用与实景应用(一)

知识星球:https://articles.zsxq.com/id_fxvgc803qmr2.html 目录 一.定义: 精确模式(默认模式): 全模式: 搜索引擎模式: paddle 模式(基于深度学习的分词模式): 二 自定义词典 三.文本解析   调整词出现的频率 四. 关键词提取 A. 基于TF-IDF算法的关键词提取 B. 基于TextRank算法的关键词提取

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

Hadoop数据压缩使用介绍

一、压缩原则 (1)运算密集型的Job,少用压缩 (2)IO密集型的Job,多用压缩 二、压缩算法比较 三、压缩位置选择 四、压缩参数配置 1)为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器 2)要在Hadoop中启用压缩,可以配置如下参数

Makefile简明使用教程

文章目录 规则makefile文件的基本语法:加在命令前的特殊符号:.PHONY伪目标: Makefilev1 直观写法v2 加上中间过程v3 伪目标v4 变量 make 选项-f-n-C Make 是一种流行的构建工具,常用于将源代码转换成可执行文件或者其他形式的输出文件(如库文件、文档等)。Make 可以自动化地执行编译、链接等一系列操作。 规则 makefile文件

使用opencv优化图片(画面变清晰)

文章目录 需求影响照片清晰度的因素 实现降噪测试代码 锐化空间锐化Unsharp Masking频率域锐化对比测试 对比度增强常用算法对比测试 需求 对图像进行优化,使其看起来更清晰,同时保持尺寸不变,通常涉及到图像处理技术如锐化、降噪、对比度增强等 影响照片清晰度的因素 影响照片清晰度的因素有很多,主要可以从以下几个方面来分析 1. 拍摄设备 相机传感器:相机传

阿里开源语音识别SenseVoiceWindows环境部署

SenseVoice介绍 SenseVoice 专注于高精度多语言语音识别、情感辨识和音频事件检测多语言识别: 采用超过 40 万小时数据训练,支持超过 50 种语言,识别效果上优于 Whisper 模型。富文本识别:具备优秀的情感识别,能够在测试数据上达到和超过目前最佳情感识别模型的效果。支持声音事件检测能力,支持音乐、掌声、笑声、哭声、咳嗽、喷嚏等多种常见人机交互事件进行检测。高效推

pdfmake生成pdf的使用

实际项目中有时会有根据填写的表单数据或者其他格式的数据,将数据自动填充到pdf文件中根据固定模板生成pdf文件的需求 文章目录 利用pdfmake生成pdf文件1.下载安装pdfmake第三方包2.封装生成pdf文件的共用配置3.生成pdf文件的文件模板内容4.调用方法生成pdf 利用pdfmake生成pdf文件 1.下载安装pdfmake第三方包 npm i pdfma

零基础学习Redis(10) -- zset类型命令使用

zset是有序集合,内部除了存储元素外,还会存储一个score,存储在zset中的元素会按照score的大小升序排列,不同元素的score可以重复,score相同的元素会按照元素的字典序排列。 1. zset常用命令 1.1 zadd  zadd key [NX | XX] [GT | LT]   [CH] [INCR] score member [score member ...]

搭建Kafka+zookeeper集群调度

前言 硬件环境 172.18.0.5        kafkazk1        Kafka+zookeeper                Kafka Broker集群 172.18.0.6        kafkazk2        Kafka+zookeeper                Kafka Broker集群 172.18.0.7        kafkazk3