改造JedisCluster使其支持pipeline操作

2024-06-02 15:32

本文主要是介绍改造JedisCluster使其支持pipeline操作,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!


Redis 管道技术
Redis 管道技术可以在服务端未响应时,客户端可以继续向服务端发送请求,并最终一次性读取所有服务端的响应。

管道技术的优势
管道技术最显著的优势是提高了 redis 服务的性能。redis本身性能是很高的,单个redis命令的执行时间很短,大量的redis操作,网络IO的耗时非常大,而redis管道技术大大的减少了程序和redis的交互 次数,性能提升非常明显。

管道的使用----Jedis
public class PipelineTest {

    public static void main(String[] args){
        Jedis jedis = new Jedis("localhost",6060);
        Pipeline pipeline = jedis.pipelined();
        pipeline.set("xiajw","test");
        pipeline.set("hello","world");
        List<Object> result = pipeline.syncAndReturnAll();
        jedis.close();
    }
}

JedisCluster改造
在使用Jedis的时候,pipeline的使用非常简单。但是实际上,大家大部分情况会使用集群JedisCluster来进行操作,当我去JedisCluster类里面查找的时候发现,没有pipeline方法。JedisCluster本身是不支持管道操作的。

为什么不支持,以下仅代表个人观点。JedisCluster底层其实就是对Jedis的调用。redis集群中的每个主节点管理着各自的slot区间,而数据的存储,会计算key所在的slot,然后调用slot所在的redis实例去操作。JedisCluster本身如果支持管道的话,由于要分步调用不同的redis管道,redis本身对于事务的支持也基本没有,本身很难保证所有操作的一致性,因为是分批发送的,如果方法里提供,可能会让大家产生误解,以为它就是一致的,所以干脆不提供。

改造思路
1、改造JedisCluster,使其connectionHandler对外能够访问。因为JedisClusterConnectionHandler的成员变量JedisClusterInfoCache存储着redis集群的信息以及连接池。我们需要获取连接池。
2、改造JedisClusterConnectionHandler,使其能够根据slot返回JedisPool。
3、将大量的redis操作,根据key计算出相应所在slot,根据slot获取相应的redis实例,然后按照 Map<JedisPool,List>的结构存储起来。然后遍历调用。

JedisClusterConnectionHandler改造

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisSlotBasedConnectionHandler;
import redis.clients.jedis.exceptions.JedisNoReachableClusterNodeException;

import java.util.Set;

public class JedisSlotAdvancedConnectionHandler extends JedisSlotBasedConnectionHandler {

    public JedisSlotAdvancedConnectionHandler(Set<HostAndPort> nodes, GenericObjectPoolConfig poolConfig, int timeout) {
        super(nodes, poolConfig, timeout);
    }

    public JedisPool getJedisPoolFromSlot(int slot){
        JedisPool connectionPool = cache.getSlotPool(slot);
        if(connectionPool != null){
            return connectionPool;
        }else{
            renewSlotCache();
            connectionPool = cache.getSlotPool(slot);
            if(connectionPool != null){
                return connectionPool;
            }else{
                throw new JedisNoReachableClusterNodeException("No reachable node in cluster for slot "+slot);
            }
        }
    }
}

JedisCluster改造

import com.ai.ocbs.util.ConfigUtil;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.JedisCluster;

import java.util.Set;

public class JedisClusterPipeline extends JedisCluster{

    public JedisClusterPipeline(Set<HostAndPort> nodes, GenericObjectPoolConfig poolConfig) {
        super(nodes, poolConfig);
        super.connectionHandler = new JedisSlotAdvancedConnectionHandler(nodes,poolConfig, ConfigUtil.m_timeout);
    }

    public JedisSlotAdvancedConnectionHandler getConnectionHandler(){
        return (JedisSlotAdvancedConnectionHandler)this.connectionHandler;
    }

    /**
     * 刷新集群信息,当集群信息发生变更时调用
     * @param
     * @return
     */
    public void refreshCluster() {
        connectionHandler.renewSlotCache();
    }
}

使用
以hget为例

public static List<byte[]> hgetPipeline(JedisClusterPipeline jedisClusterPipeline,List<String> keyFields){
        List<byte[]> result = new ArrayList<>();
        Map<JedisPool,List<String>> poolKeys = new HashMap<>();
        JedisSlotAdvancedConnectionHandler jedisSlotAdvancedConnectionHandler = jedisClusterPipeline.getConnectionHandler();
        jedisClusterPipeline.refreshCluster();
        for(int i=0;i<keyFields.size();i++){
            String[] keyField = keyFields.get(i).split(",");
            if(keyField.length!=2)
                continue;
            byte[] key = keyField[0].getBytes();
            int slot = JedisClusterCRC16.getSlot(key);
            JedisPool jedisPool = jedisSlotAdvancedConnectionHandler.getJedisPoolFromSlot(slot);
            if(poolKeys.keySet().contains(jedisPool)){
                List<String> keys = poolKeys.get(jedisPool);
                keys.add(keyFields.get(i));
            }else{
                List<String> keys = new ArrayList<>();
                keys.add(keyFields.get(i));
                poolKeys.put(jedisPool,keys);
            }
        }
        for(JedisPool jedisPool:poolKeys.keySet()){
            Jedis jedis = jedisPool.getResource();
            Pipeline pipeline = jedis.pipelined();
            List<String> keys = poolKeys.get(jedisPool);
            for(int i = 0; i < keys.size();){
                for(int j=0;j<ConfigUtil.m_pipe_size&&i< keys.size();j++){
                    String[] keyField = keys.get(i).split(",");
                    if(keyField.length!=2)
                        continue;
                    byte[] key = keyField[0].getBytes();
                    byte[] field = keyField[1].getBytes();
                    pipeline.hget(key,field);
                    i++;
                }
                List<Object> responses = pipeline.syncAndReturnAll();
                if(responses != null && !responses.isEmpty()){
                    for(Object response : responses){
                        if(!(response instanceof byte[])) {
                            //logger.info();
                            continue;
                        }
                        byte[] valueData = (byte[])response;
                        result.add(valueData);
                    }
                }
            }
            jedis.close();
        }
        return result;
    }

总结
redis管道可以极大的提升性能。批量操作的数据量越大,性能提升越明显。在改造之后,几百万数据量的情况下,性能提升相当恐怖。
 

这篇关于改造JedisCluster使其支持pipeline操作的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1024361

相关文章

Python 中的 with open文件操作的最佳实践

《Python中的withopen文件操作的最佳实践》在Python中,withopen()提供了一个简洁而安全的方式来处理文件操作,它不仅能确保文件在操作完成后自动关闭,还能处理文件操作中的异... 目录什么是 with open()?为什么使用 with open()?使用 with open() 进行

Linux ls命令操作详解

《Linuxls命令操作详解》通过ls命令,我们可以查看指定目录下的文件和子目录,并结合不同的选项获取详细的文件信息,如权限、大小、修改时间等,:本文主要介绍Linuxls命令详解,需要的朋友可... 目录1. 命令简介2. 命令的基本语法和用法2.1 语法格式2.2 使用示例2.2.1 列出当前目录下的文

Mysql表的简单操作(基本技能)

《Mysql表的简单操作(基本技能)》在数据库中,表的操作主要包括表的创建、查看、修改、删除等,了解如何操作这些表是数据库管理和开发的基本技能,本文给大家介绍Mysql表的简单操作,感兴趣的朋友一起看... 目录3.1 创建表 3.2 查看表结构3.3 修改表3.4 实践案例:修改表在数据库中,表的操作主要

C# WinForms存储过程操作数据库的实例讲解

《C#WinForms存储过程操作数据库的实例讲解》:本文主要介绍C#WinForms存储过程操作数据库的实例,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、存储过程基础二、C# 调用流程1. 数据库连接配置2. 执行存储过程(增删改)3. 查询数据三、事务处

SpringKafka消息发布之KafkaTemplate与事务支持功能

《SpringKafka消息发布之KafkaTemplate与事务支持功能》通过本文介绍的基本用法、序列化选项、事务支持、错误处理和性能优化技术,开发者可以构建高效可靠的Kafka消息发布系统,事务支... 目录引言一、KafkaTemplate基础二、消息序列化三、事务支持机制四、错误处理与重试五、性能优

Java使用Curator进行ZooKeeper操作的详细教程

《Java使用Curator进行ZooKeeper操作的详细教程》ApacheCurator是一个基于ZooKeeper的Java客户端库,它极大地简化了使用ZooKeeper的开发工作,在分布式系统... 目录1、简述2、核心功能2.1 CuratorFramework2.2 Recipes3、示例实践3

Java利用JSONPath操作JSON数据的技术指南

《Java利用JSONPath操作JSON数据的技术指南》JSONPath是一种强大的工具,用于查询和操作JSON数据,类似于SQL的语法,它为处理复杂的JSON数据结构提供了简单且高效... 目录1、简述2、什么是 jsONPath?3、Java 示例3.1 基本查询3.2 过滤查询3.3 递归搜索3.4

Python使用DrissionPage中ChromiumPage进行自动化网页操作

《Python使用DrissionPage中ChromiumPage进行自动化网页操作》DrissionPage作为一款轻量级且功能强大的浏览器自动化库,为开发者提供了丰富的功能支持,本文将使用Dri... 目录前言一、ChromiumPage基础操作1.初始化Drission 和 ChromiumPage

利用Go语言开发文件操作工具轻松处理所有文件

《利用Go语言开发文件操作工具轻松处理所有文件》在后端开发中,文件操作是一个非常常见但又容易出错的场景,本文小编要向大家介绍一个强大的Go语言文件操作工具库,它能帮你轻松处理各种文件操作场景... 目录为什么需要这个工具?核心功能详解1. 文件/目录存javascript在性检查2. 批量创建目录3. 文件

Redis中管道操作pipeline的实现

《Redis中管道操作pipeline的实现》RedisPipeline是一种优化客户端与服务器通信的技术,通过批量发送和接收命令减少网络往返次数,提高命令执行效率,本文就来介绍一下Redis中管道操... 目录什么是pipeline场景一:我要向Redis新增大批量的数据分批处理事务( MULTI/EXE