Zookeeper 进阶之——Zookeeper编程示例(如何使用Zookeeper实现屏障Barriers和队列Queues)...

本文主要是介绍Zookeeper 进阶之——Zookeeper编程示例(如何使用Zookeeper实现屏障Barriers和队列Queues)...,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

[quote]原文:http://www.cnblogs.com/haippy/archive/2012/07/26/2609769.html
[/quote]
[b]引言[/b]

本文将告诉你如何使用 Zookeeper 实现两种常用的分布式数据结构,屏障(barriers) 和队列(queues),我们为此还分别实现了两个类:Barrier and Queue. 本文中的例子假设你已经成功运行了Zookeeper服务器。

上述两种最基本的原语都使用了下面的常见编码规范:

static ZooKeeper zk = null;
static Integer mutex;

String root;

SyncPrimitive(String address) {
if(zk == null){
try {
System.out.println("Starting ZK:");
zk = new ZooKeeper(address, 3000, this);
mutex = new Integer(-1);
System.out.println("Finished starting ZK: " + zk);
} catch (IOException e) {
System.out.println(e.toString());
zk = null;
}
}
}

synchronized public void process(WatchedEvent event) {
synchronized (mutex) {
mutex.notify();
}
}

以上两个类都扩展了 SyncPrimitive. In this way, we execute steps that are common to all primitives in the constructor of SyncPrimitive. To keep the examples simple, we create a ZooKeeper object the first time we instantiate either a barrier object or a queue object, and we declare a static variable that is a reference to this object. The subsequent instances of Barrier and Queue check whether a ZooKeeper object exists. Alternatively, we could have the application creating a ZooKeeper object and passing it to the constructor of Barrier and Queue.

We use the process() method to process notifications triggered due to watches. In the following discussion, we present code that sets watches. A watch is internal structure that enables ZooKeeper to notify a client of a change to a node. For example, if a client is waiting for other clients to leave a barrier, then it can set a watch and wait for modifications to a particular node, which can indicate that it is the end of the wait. This point becomes clear once we go over the examples.

Barriers

A barrier is a primitive that enables a group of processes to synchronize the beginning and the end of a computation. The general idea of this implementation is to have a barrier node that serves the purpose of being a parent for individual process nodes. Suppose that we call the barrier node "/b1". Each process "p" then creates a node "/b1/p". Once enough processes have created their corresponding nodes, joined processes can start the computation.

In this example, each process instantiates a Barrier object, and its constructor takes as parameters:

the address of a ZooKeeper server (e.g., "zoo1.foo.com:2181")

the path of the barrier node on ZooKeeper (e.g., "/b1")

the size of the group of processes

The constructor of Barrier passes the address of the Zookeeper server to the constructor of the parent class. The parent class creates a ZooKeeper instance if one does not exist. The constructor of Barrier then creates a barrier node on ZooKeeper, which is the parent node of all process nodes, and we call root (Note: This is not the ZooKeeper root "/").


/**
* Barrier constructor
*
* @param address
* @param root
* @param size
*/
Barrier(String address, String root, int size) {
super(address);
this.root = root;
this.size = size;

// Create barrier node
if (zk != null) {
try {
Stat s = zk.exists(root, false);
if (s == null) {
zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
System.out
.println("Keeper exception when instantiating queue: "
+ e.toString());
} catch (InterruptedException e) {
System.out.println("Interrupted exception");
}
}

// My node name
try {
name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
} catch (UnknownHostException e) {
System.out.println(e.toString());
}

}


To enter the barrier, a process calls enter(). The process creates a node under the root to represent it, using its host name to form the node name. It then wait until enough processes have entered the barrier. A process does it by checking the number of children the root node has with "getChildren()", and waiting for notifications in the case it does not have enough. To receive a notification when there is a change to the root node, a process has to set a watch, and does it through the call to "getChildren()". In the code, we have that "getChildren()" has two parameters. The first one states the node to read from, and the second is a boolean flag that enables the process to set a watch. In the code the flag is true.

/**
* Join barrier
*
* @return
* @throws KeeperException
* @throws InterruptedException
*/

boolean enter() throws KeeperException, InterruptedException{
zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
while (true) {
synchronized (mutex) {
List<String> list = zk.getChildren(root, true);

if (list.size() < size) {
mutex.wait();
} else {
return true;
}
}
}
}



Note that enter() throws both KeeperException and InterruptedException, so it is the reponsability of the application to catch and handle such exceptions.

Once the computation is finished, a process calls leave() to leave the barrier. First it deletes its corresponding node, and then it gets the children of the root node. If there is at least one child, then it waits for a notification (obs: note that the second parameter of the call to getChildren() is true, meaning that ZooKeeper has to set a watch on the the root node). Upon reception of a notification, it checks once more whether the root node has any child.

/**
* Wait until all reach barrier
*
* @return
* @throws KeeperException
* @throws InterruptedException
*/

boolean leave() throws KeeperException, InterruptedException{
zk.delete(root + "/" + name, 0);
while (true) {
synchronized (mutex) {
List<String> list = zk.getChildren(root, true);
if (list.size() > 0) {
mutex.wait();
} else {
return true;
}
}
}
}
}



Producer-Consumer Queues

A producer-consumer queue is a distributed data estructure thata group of processes use to generate and consume items. Producer processes create new elements and add them to the queue. Consumer processes remove elements from the list, and process them. In this implementation, the elements are simple integers. The queue is represented by a root node, and to add an element to the queue, a producer process creates a new node, a child of the root node.

The following excerpt of code corresponds to the constructor of the object. As with Barrier objects, it first calls the constructor of the parent class, SyncPrimitive, that creates a ZooKeeper object if one doesn't exist. It then verifies if the root node of the queue exists, and creates if it doesn't.

/**
* Constructor of producer-consumer queue
*
* @param address
* @param name
*/
Queue(String address, String name) {
super(address);
this.root = name;
// Create ZK node name
if (zk != null) {
try {
Stat s = zk.exists(root, false);
if (s == null) {
zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
System.out
.println("Keeper exception when instantiating queue: "
+ e.toString());
} catch (InterruptedException e) {
System.out.println("Interrupted exception");
}
}
}


A producer process calls "produce()" to add an element to the queue, and passes an integer as an argument. To add an element to the queue, the method creates a new node using "create()", and uses the SEQUENCE flag to instruct ZooKeeper to append the value of the sequencer counter associated to the root node. In this way, we impose a total order on the elements of the queue, thus guaranteeing that the oldest element of the queue is the next one consumed.

/**
* Add element to the queue.
*
* @param i
* @return
*/

boolean produce(int i) throws KeeperException, InterruptedException{
ByteBuffer b = ByteBuffer.allocate(4);
byte[] value;

// Add child with value i
b.putInt(i);
value = b.array();
zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL);

return true;
}



To consume an element, a consumer process obtains the children of the root node, reads the node with smallest counter value, and returns the element. Note that if there is a conflict, then one of the two contending processes won't be able to delete the node and the delete operation will throw an exception.

A call to getChildren() returns the list of children in lexicographic order. As lexicographic order does not necessary follow the numerical order of the counter values, we need to decide which element is the smallest. To decide which one has the smallest counter value, we traverse the list, and remove the prefix "element" from each one.


/**
* Remove first element from the queue.
*
* @return
* @throws KeeperException
* @throws InterruptedException
*/
int consume() throws KeeperException, InterruptedException{
int retvalue = -1;
Stat stat = null;

// Get the first element available
while (true) {
synchronized (mutex) {
List<String> list = zk.getChildren(root, true);
if (list.size() == 0) {
System.out.println("Going to wait");
mutex.wait();
} else {
Integer min = new Integer(list.get(0).substring(7));
for(String s : list){
Integer tempValue = new Integer(s.substring(7));
//System.out.println("Temporary value: " + tempValue);
if(tempValue < min) min = tempValue;
}
System.out.println("Temporary value: " + root + "/element" + min);
byte[] b = zk.getData(root + "/element" + min,
false, stat);
zk.delete(root + "/element" + min, 0);
ByteBuffer buffer = ByteBuffer.wrap(b);
retvalue = buffer.getInt();

return retvalue;
}
}
}
}
}


[b]完整例子[/b]

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Random;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;

public class SyncPrimitive implements Watcher {

static ZooKeeper zk = null;
static Integer mutex;

String root;

SyncPrimitive(String address) {
if(zk == null){
try {
System.out.println("Starting ZK:");
zk = new ZooKeeper(address, 3000, this);
mutex = new Integer(-1);
System.out.println("Finished starting ZK: " + zk);
} catch (IOException e) {
System.out.println(e.toString());
zk = null;
}
}
//else mutex = new Integer(-1);
}

synchronized public void process(WatchedEvent event) {
synchronized (mutex) {
//System.out.println("Process: " + event.getType());
mutex.notify();
}
}

/**
* Barrier
*/
static public class Barrier extends SyncPrimitive {
int size;
String name;

/**
* Barrier constructor
*
* @param address
* @param root
* @param size
*/
Barrier(String address, String root, int size) {
super(address);
this.root = root;
this.size = size;

// Create barrier node
if (zk != null) {
try {
Stat s = zk.exists(root, false);
if (s == null) {
zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
System.out
.println("Keeper exception when instantiating queue: "
+ e.toString());
} catch (InterruptedException e) {
System.out.println("Interrupted exception");
}
}

// My node name
try {
name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
} catch (UnknownHostException e) {
System.out.println(e.toString());
}

}

/**
* Join barrier
*
* @return
* @throws KeeperException
* @throws InterruptedException
*/

boolean enter() throws KeeperException, InterruptedException{
zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.EPHEMERAL_SEQUENTIAL);
while (true) {
synchronized (mutex) {
List<String> list = zk.getChildren(root, true);

if (list.size() < size) {
mutex.wait();
} else {
return true;
}
}
}
}

/**
* Wait until all reach barrier
*
* @return
* @throws KeeperException
* @throws InterruptedException
*/

boolean leave() throws KeeperException, InterruptedException{
zk.delete(root + "/" + name, 0);
while (true) {
synchronized (mutex) {
List<String> list = zk.getChildren(root, true);
if (list.size() > 0) {
mutex.wait();
} else {
return true;
}
}
}
}
}

/**
* Producer-Consumer queue
*/
static public class Queue extends SyncPrimitive {

/**
* Constructor of producer-consumer queue
*
* @param address
* @param name
*/
Queue(String address, String name) {
super(address);
this.root = name;
// Create ZK node name
if (zk != null) {
try {
Stat s = zk.exists(root, false);
if (s == null) {
zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
} catch (KeeperException e) {
System.out
.println("Keeper exception when instantiating queue: "
+ e.toString());
} catch (InterruptedException e) {
System.out.println("Interrupted exception");
}
}
}

/**
* Add element to the queue.
*
* @param i
* @return
*/

boolean produce(int i) throws KeeperException, InterruptedException{
ByteBuffer b = ByteBuffer.allocate(4);
byte[] value;

// Add child with value i
b.putInt(i);
value = b.array();
zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT_SEQUENTIAL);

return true;
}


/**
* Remove first element from the queue.
*
* @return
* @throws KeeperException
* @throws InterruptedException
*/
int consume() throws KeeperException, InterruptedException{
int retvalue = -1;
Stat stat = null;

// Get the first element available
while (true) {
synchronized (mutex) {
List<String> list = zk.getChildren(root, true);
if (list.size() == 0) {
System.out.println("Going to wait");
mutex.wait();
} else {
Integer min = new Integer(list.get(0).substring(7));
for(String s : list){
Integer tempValue = new Integer(s.substring(7));
//System.out.println("Temporary value: " + tempValue);
if(tempValue < min) min = tempValue;
}
System.out.println("Temporary value: " + root + "/element" + min);
byte[] b = zk.getData(root + "/element" + min,
false, stat);
zk.delete(root + "/element" + min, 0);
ByteBuffer buffer = ByteBuffer.wrap(b);
retvalue = buffer.getInt();

return retvalue;
}
}
}
}
}

public static void main(String args[]) {
if (args[0].equals("qTest"))
queueTest(args);
else
barrierTest(args);

}

public static void queueTest(String args[]) {
Queue q = new Queue(args[1], "/app1");

System.out.println("Input: " + args[1]);
int i;
Integer max = new Integer(args[2]);

if (args[3].equals("p")) {
System.out.println("Producer");
for (i = 0; i < max; i++)
try{
q.produce(10 + i);
} catch (KeeperException e){

} catch (InterruptedException e){

}
} else {
System.out.println("Consumer");

for (i = 0; i < max; i++) {
try{
int r = q.consume();
System.out.println("Item: " + r);
} catch (KeeperException e){
i--;
} catch (InterruptedException e){

}
}
}
}

public static void barrierTest(String args[]) {
Barrier b = new Barrier(args[1], "/b1", new Integer(args[2]));
try{
boolean flag = b.enter();
System.out.println("Entered barrier: " + args[2]);
if(!flag) System.out.println("Error when entering the barrier");
} catch (KeeperException e){

} catch (InterruptedException e){

}

// Generate random integer
Random rand = new Random();
int r = rand.nextInt(100);
// Loop for rand iterations
for (int i = 0; i < r; i++) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {

}
}
try{
b.leave();
} catch (KeeperException e){

} catch (InterruptedException e){

}
System.out.println("Left barrier");
}
}

这篇关于Zookeeper 进阶之——Zookeeper编程示例(如何使用Zookeeper实现屏障Barriers和队列Queues)...的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1114438

相关文章

Python中顺序结构和循环结构示例代码

《Python中顺序结构和循环结构示例代码》:本文主要介绍Python中的条件语句和循环语句,条件语句用于根据条件执行不同的代码块,循环语句用于重复执行一段代码,文章还详细说明了range函数的使... 目录一、条件语句(1)条件语句的定义(2)条件语句的语法(a)单分支 if(b)双分支 if-else(

Python itertools中accumulate函数用法及使用运用详细讲解

《Pythonitertools中accumulate函数用法及使用运用详细讲解》:本文主要介绍Python的itertools库中的accumulate函数,该函数可以计算累积和或通过指定函数... 目录1.1前言:1.2定义:1.3衍生用法:1.3Leetcode的实际运用:总结 1.1前言:本文将详

Java深度学习库DJL实现Python的NumPy方式

《Java深度学习库DJL实现Python的NumPy方式》本文介绍了DJL库的背景和基本功能,包括NDArray的创建、数学运算、数据获取和设置等,同时,还展示了如何使用NDArray进行数据预处理... 目录1 NDArray 的背景介绍1.1 架构2 JavaDJL使用2.1 安装DJL2.2 基本操

最长公共子序列问题的深度分析与Java实现方式

《最长公共子序列问题的深度分析与Java实现方式》本文详细介绍了最长公共子序列(LCS)问题,包括其概念、暴力解法、动态规划解法,并提供了Java代码实现,暴力解法虽然简单,但在大数据处理中效率较低,... 目录最长公共子序列问题概述问题理解与示例分析暴力解法思路与示例代码动态规划解法DP 表的构建与意义动

浅析如何使用Swagger生成带权限控制的API文档

《浅析如何使用Swagger生成带权限控制的API文档》当涉及到权限控制时,如何生成既安全又详细的API文档就成了一个关键问题,所以这篇文章小编就来和大家好好聊聊如何用Swagger来生成带有... 目录准备工作配置 Swagger权限控制给 API 加上权限注解查看文档注意事项在咱们的开发工作里,API

java父子线程之间实现共享传递数据

《java父子线程之间实现共享传递数据》本文介绍了Java中父子线程间共享传递数据的几种方法,包括ThreadLocal变量、并发集合和内存队列或消息队列,并提醒注意并发安全问题... 目录通过 ThreadLocal 变量共享数据通过并发集合共享数据通过内存队列或消息队列共享数据注意并发安全问题总结在 J

SpringBoot+MyBatis-Flex配置ProxySQL的实现步骤

《SpringBoot+MyBatis-Flex配置ProxySQL的实现步骤》本文主要介绍了SpringBoot+MyBatis-Flex配置ProxySQL的实现步骤,文中通过示例代码介绍的非常详... 目录 目标 步骤 1:确保 ProxySQL 和 mysql 主从同步已正确配置ProxySQL 的

Java数字转换工具类NumberUtil的使用

《Java数字转换工具类NumberUtil的使用》NumberUtil是一个功能强大的Java工具类,用于处理数字的各种操作,包括数值运算、格式化、随机数生成和数值判断,下面就来介绍一下Number... 目录一、NumberUtil类概述二、主要功能介绍1. 数值运算2. 格式化3. 数值判断4. 随机

JS 实现复制到剪贴板的几种方式小结

《JS实现复制到剪贴板的几种方式小结》本文主要介绍了JS实现复制到剪贴板的几种方式小结,包括ClipboardAPI和document.execCommand这两种方法,具有一定的参考价值,感兴趣的... 目录一、Clipboard API相关属性方法二、document.execCommand优点:缺点:

nginx部署https网站的实现步骤(亲测)

《nginx部署https网站的实现步骤(亲测)》本文详细介绍了使用Nginx在保持与http服务兼容的情况下部署HTTPS,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值... 目录步骤 1:安装 Nginx步骤 2:获取 SSL 证书步骤 3:手动配置 Nginx步骤 4:测