本文主要是介绍改造JedisCluster使其支持pipeline操作,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
Redis 管道技术
Redis 管道技术可以在服务端未响应时,客户端可以继续向服务端发送请求,并最终一次性读取所有服务端的响应。
管道技术的优势
管道技术最显著的优势是提高了 redis 服务的性能。redis本身性能是很高的,单个redis命令的执行时间很短,大量的redis操作,网络IO的耗时非常大,而redis管道技术大大的减少了程序和redis的交互 次数,性能提升非常明显。
管道的使用----Jedis
public class PipelineTest {
public static void main(String[] args){
Jedis jedis = new Jedis("localhost",6060);
Pipeline pipeline = jedis.pipelined();
pipeline.set("xiajw","test");
pipeline.set("hello","world");
List<Object> result = pipeline.syncAndReturnAll();
jedis.close();
}
}
JedisCluster改造
在使用Jedis的时候,pipeline的使用非常简单。但是实际上,大家大部分情况会使用集群JedisCluster来进行操作,当我去JedisCluster类里面查找的时候发现,没有pipeline方法。JedisCluster本身是不支持管道操作的。
为什么不支持,以下仅代表个人观点。JedisCluster底层其实就是对Jedis的调用。redis集群中的每个主节点管理着各自的slot区间,而数据的存储,会计算key所在的slot,然后调用slot所在的redis实例去操作。JedisCluster本身如果支持管道的话,由于要分步调用不同的redis管道,redis本身对于事务的支持也基本没有,本身很难保证所有操作的一致性,因为是分批发送的,如果方法里提供,可能会让大家产生误解,以为它就是一致的,所以干脆不提供。
改造思路
1、改造JedisCluster,使其connectionHandler对外能够访问。因为JedisClusterConnectionHandler的成员变量JedisClusterInfoCache存储着redis集群的信息以及连接池。我们需要获取连接池。
2、改造JedisClusterConnectionHandler,使其能够根据slot返回JedisPool。
3、将大量的redis操作,根据key计算出相应所在slot,根据slot获取相应的redis实例,然后按照 Map<JedisPool,List>的结构存储起来。然后遍历调用。
JedisClusterConnectionHandler改造
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisSlotBasedConnectionHandler;
import redis.clients.jedis.exceptions.JedisNoReachableClusterNodeException;
import java.util.Set;
public class JedisSlotAdvancedConnectionHandler extends JedisSlotBasedConnectionHandler {
public JedisSlotAdvancedConnectionHandler(Set<HostAndPort> nodes, GenericObjectPoolConfig poolConfig, int timeout) {
super(nodes, poolConfig, timeout);
}
public JedisPool getJedisPoolFromSlot(int slot){
JedisPool connectionPool = cache.getSlotPool(slot);
if(connectionPool != null){
return connectionPool;
}else{
renewSlotCache();
connectionPool = cache.getSlotPool(slot);
if(connectionPool != null){
return connectionPool;
}else{
throw new JedisNoReachableClusterNodeException("No reachable node in cluster for slot "+slot);
}
}
}
}
JedisCluster改造
import com.ai.ocbs.util.ConfigUtil;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;
import java.util.Set;
public class JedisClusterPipeline extends JedisCluster{
public JedisClusterPipeline(Set<HostAndPort> nodes, GenericObjectPoolConfig poolConfig) {
super(nodes, poolConfig);
super.connectionHandler = new JedisSlotAdvancedConnectionHandler(nodes,poolConfig, ConfigUtil.m_timeout);
}
public JedisSlotAdvancedConnectionHandler getConnectionHandler(){
return (JedisSlotAdvancedConnectionHandler)this.connectionHandler;
}
/**
* 刷新集群信息,当集群信息发生变更时调用
* @param
* @return
*/
public void refreshCluster() {
connectionHandler.renewSlotCache();
}
}
使用
以hget为例
public static List<byte[]> hgetPipeline(JedisClusterPipeline jedisClusterPipeline,List<String> keyFields){
List<byte[]> result = new ArrayList<>();
Map<JedisPool,List<String>> poolKeys = new HashMap<>();
JedisSlotAdvancedConnectionHandler jedisSlotAdvancedConnectionHandler = jedisClusterPipeline.getConnectionHandler();
jedisClusterPipeline.refreshCluster();
for(int i=0;i<keyFields.size();i++){
String[] keyField = keyFields.get(i).split(",");
if(keyField.length!=2)
continue;
byte[] key = keyField[0].getBytes();
int slot = JedisClusterCRC16.getSlot(key);
JedisPool jedisPool = jedisSlotAdvancedConnectionHandler.getJedisPoolFromSlot(slot);
if(poolKeys.keySet().contains(jedisPool)){
List<String> keys = poolKeys.get(jedisPool);
keys.add(keyFields.get(i));
}else{
List<String> keys = new ArrayList<>();
keys.add(keyFields.get(i));
poolKeys.put(jedisPool,keys);
}
}
for(JedisPool jedisPool:poolKeys.keySet()){
Jedis jedis = jedisPool.getResource();
Pipeline pipeline = jedis.pipelined();
List<String> keys = poolKeys.get(jedisPool);
for(int i = 0; i < keys.size();){
for(int j=0;j<ConfigUtil.m_pipe_size&&i< keys.size();j++){
String[] keyField = keys.get(i).split(",");
if(keyField.length!=2)
continue;
byte[] key = keyField[0].getBytes();
byte[] field = keyField[1].getBytes();
pipeline.hget(key,field);
i++;
}
List<Object> responses = pipeline.syncAndReturnAll();
if(responses != null && !responses.isEmpty()){
for(Object response : responses){
if(!(response instanceof byte[])) {
//logger.info();
continue;
}
byte[] valueData = (byte[])response;
result.add(valueData);
}
}
}
jedis.close();
}
return result;
}
总结
redis管道可以极大的提升性能。批量操作的数据量越大,性能提升越明显。在改造之后,几百万数据量的情况下,性能提升相当恐怖。
这篇关于改造JedisCluster使其支持pipeline操作的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!