RedisCluster-Pipeline操作,提升10倍以上响应速度2021-03-15

2024-06-02 15:32

本文主要是介绍RedisCluster-Pipeline操作,提升10倍以上响应速度2021-03-15,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!


文章目录
什么是pipeLine 为什么使用pipeLine ?
为什么RedisCluster无法使用pipeline?
如何基于JedisCluster扩展pipeline?
性能对比(提升10倍以上):
本文中的代码来自我正在写的分布式缓存框架(主要解决缓存使用中的各种痛点:缓存穿透\redis-cluster pipeline\注解使用等等)。后续内部推广使用后、成熟后会开源回馈大家。

什么是pipeLine 为什么使用pipeLine ?
管道(pipeline)将客户端 client 与服务器端的交互明确划分为单向的发送请求(Send Request)和接收响应(Receive Response):用户可以将多个操作连续发给服务器,但在此期间服务器端并不对每个操作命令发送响应数据;全部请求发送完毕后用户关闭请求,开始接收响应获取每个操作命令的响应结果。

管道(pipeline)在某些场景下非常有用,比如有多个操作命令需要被迅速提交至服务器端,但用户并不依赖每个操作返回的响应结果,对结果响应也无需立即获得,那么管道就可以用来作为优化性能的批处理工具。性能提升的原因主要是减少了 TCP 连接中交互往返的开销。

不过在程序中使用管道请注意,使用 pipeline 时客户端将独占与服务器端的连接,此期间将不能进行其他“非管道”类型操作,直至 pipeline 被关闭;如果要同时执行其他操作,可以为 pipeline 操作单独建立一个连接,将其与常规操作分离开来。

从原理上来看,pipeline就是用一个redis 的Socket连接 去多次执行redis命令(发送请求)而不必等待响应,当所有请求都执行完毕后再一次性的从这个socket中读取请求。期间减少了在网络上的无用等待,通常会有3-10倍以上的速度提升:

    //非pipeline
    [req1]
         [==waiting===]
                          [resp1]
                                [req2]
                                     [====waiting=====]
                                                      [resp2]

    //pipeline
    [req1][==waiting===]
         [req2][==waiting===]
                     [resp1] [resp2]

pipeline代码示例

@Test

  public void pipeline() throws UnsupportedEncodingException {

    Pipeline p = jedis.pipelined();

    p.set("foo", "bar");

    p.get("foo");
      
    for(int i=0;i<10;i++){
        
        p.set("foo"+i, "bar");
    }


    List<Object> results = p.syncAndReturnAll();

  }

为什么RedisCluster无法使用pipeline?
主要是因为redis-cluster的hash分片,如下图一个3master-3slave 的 redisCluster:

这里写图片描述

具体的redis命令,会根据key计算出一个槽位(slot),然后根据槽位去特定的节点redis上执行操作。

其中master1代表了 0~5460的槽位,master2代表了 5461~10922的槽位,master1代表了 10923~16383的槽位。

    master1(slave1): 0~5460

    master2(slave2):5461~10922

    master3(slave3):10923~16383


以以下代码为例:

        for(int i=0;i<10;i++){
            p.set("foo"+i, "bar");
        }

那么pipeline中每个单独的操作,需要根据“key”运算一个槽位(JedisClusterCRC16.getSlot(key)),然后根据槽位去特定的机器执行命令。也就是说一次pipeline操作会使用多个节点的redis连接,而目前JedisCluster是无法支持的。

如何基于JedisCluster扩展pipeline?
设计思路(ShardedJedis、redisson也可供参考,):

1.首先要根据key计算出此次pipeline会使用到的节点对于的连接(也就是jedis对象,通常每个节点对应一个Pool)。

2.相同槽位的key,使用同一个jedis.pipeline去执行 命令。

3.合并此次pipeline所有的response返回。

4.连接释放返回到池中。

也就是讲一个JedisCluster下的pipeline分解为每个单节点下独立的jedisPipeline操作,最后合并response返回。

分享以下部分核心代码:

/**
 * @author zhangshuo
 */
@Slf4j
public class JedisClusterPipeLine extends PipelineBase implements Closeable {

......
......

    private final Queue<Client> orderedClients = new LinkedList<Client>();

    /** 一次pipeline过程中使用到的jedis缓存 */
    private final Map<JedisPool, Jedis> poolToJedisMap = new HashMap<JedisPool, Jedis>();

    private final JedisSlotBasedConnectionHandler connectionHandler;
    private final JedisClusterInfoCache clusterInfoCache;

    public JedisClusterPipeLine(JedisCluster jedisCluster) {
        this.connectionHandler = ClassUtils.getValue(jedisCluster, SLOT_BASED__CONNECTION_HANDLER_FIELD);
        this.clusterInfoCache = ClassUtils.getValue(connectionHandler, CLUSTER_INFO_CACHE_FIELD);
    }

    @Override
    protected Client getClient(String key) {

        return getClient(SafeEncoder.encode(key));
    }

    @Override
    protected Client getClient(byte[] key) {

        Client client;
        log.debug("size of orderedClients : {} , size of poolToJedis : {} ", orderedClients.size(),
                poolToJedisMap.size());

        int slot = JedisClusterCRC16.getSlot(key);

        JedisPool pool = clusterInfoCache.getSlotPool(slot);

        Jedis borrowedJedis = poolToJedisMap.get(pool);

        if (null == borrowedJedis) {
            borrowedJedis = pool.getResource();
            poolToJedisMap.put(pool, borrowedJedis);
        }

        client = borrowedJedis.getClient();

        orderedClients.add(client);

        return client;
    }

    @Override
    public void close() {
        for (Jedis jedis : poolToJedisMap.values()) {
            jedis.close();
        }

        clean();
        orderedClients.clear();
        poolToJedisMap.clear();
    }

    public void sync() {
        for (Client client : orderedClients) {
            generateResponse(client.getOne());
        }
    }

    /**
     * go through all the responses and generate the right response type (warning :
     * usually it is a waste of time).
     * 
     * @return A list of all the responses in the order
     */
    public List<Object> syncAndReturnAll() {
        List<Object> formatted = new ArrayList<Object>();
        for (Client client : orderedClients) {
            formatted.add(generateResponse(client.getOne()).get());
        }
        return formatted;
    }

    public void refreshNodesInfo() {
        connectionHandler.renewSlotCache();
    }
......
......
}

性能对比(提升10倍以上):
    @Test
    public void jedisTest() throws UnsupportedEncodingException {
 
        long start2 = System.currentTimeMillis();
 
        try (JedisClusterClient<Object> jc = jedisClusterClient) {
            for (int i = 0; i < 100; i++) {
                jc.set("NO." + i, "value" + i);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println(System.currentTimeMillis() - start2);// 5688ms
 
    }
 
    /**
     *
     */
    @Test
    public void clusterPipeline() {
        long start = System.currentTimeMillis();
        try (JedisClusterPipeLine pipeline = jedisClusterClient.pipelined()) {
            for (int i = 0; i < 100; i++) {
 
                pipeline.set("NO." + i, "value" + i);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println(System.currentTimeMillis() - start);// 174ms
    }
}

结论:对于批量操作,响应提升明显:如上本机测试中,提升了约50倍。
 

这篇关于RedisCluster-Pipeline操作,提升10倍以上响应速度2021-03-15的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1024362

相关文章

Ilya-AI分享的他在OpenAI学习到的15个提示工程技巧

Ilya(不是本人,claude AI)在社交媒体上分享了他在OpenAI学习到的15个Prompt撰写技巧。 以下是详细的内容: 提示精确化:在编写提示时,力求表达清晰准确。清楚地阐述任务需求和概念定义至关重要。例:不用"分析文本",而用"判断这段话的情感倾向:积极、消极还是中性"。 快速迭代:善于快速连续调整提示。熟练的提示工程师能够灵活地进行多轮优化。例:从"总结文章"到"用

这15个Vue指令,让你的项目开发爽到爆

1. V-Hotkey 仓库地址: github.com/Dafrok/v-ho… Demo: 戳这里 https://dafrok.github.io/v-hotkey 安装: npm install --save v-hotkey 这个指令可以给组件绑定一个或多个快捷键。你想要通过按下 Escape 键后隐藏某个组件,按住 Control 和回车键再显示它吗?小菜一碟: <template

cross-plateform 跨平台应用程序-03-如果只选择一个框架,应该选择哪一个?

跨平台系列 cross-plateform 跨平台应用程序-01-概览 cross-plateform 跨平台应用程序-02-有哪些主流技术栈? cross-plateform 跨平台应用程序-03-如果只选择一个框架,应该选择哪一个? cross-plateform 跨平台应用程序-04-React Native 介绍 cross-plateform 跨平台应用程序-05-Flutte

java学习,进阶,提升

http://how2j.cn/k/hutool/hutool-brief/1930.html?p=73689

动手学深度学习【数据操作+数据预处理】

import osos.makedirs(os.path.join('.', 'data'), exist_ok=True)data_file = os.path.join('.', 'data', 'house_tiny.csv')with open(data_file, 'w') as f:f.write('NumRooms,Alley,Price\n') # 列名f.write('NA

线程的四种操作

所属专栏:Java学习        1. 线程的开启 start和run的区别: run:描述了线程要执行的任务,也可以称为线程的入口 start:调用系统函数,真正的在系统内核中创建线程(创建PCB,加入到链表中),此处的start会根据不同的系统,分别调用不同的api,创建好之后的线程,再单独去执行run(所以说,start的本质是调用系统api,系统的api

JAVA用最简单的方法来构建一个高可用的服务端,提升系统可用性

一、什么是提升系统的高可用性 JAVA服务端,顾名思义就是23体验网为用户提供服务的。停工时间,就是不能向用户提供服务的时间。高可用,就是系统具有高度可用性,尽量减少停工时间。如何用最简单的方法来搭建一个高效率可用的服务端JAVA呢? 停工的原因一般有: 服务器故障。例如服务器宕机,服务器网络出现问题,机房或者机架出现问题等;访问量急剧上升,导致服务器压力过大导致访问量急剧上升的原因;时间和

Java IO 操作——个人理解

之前一直Java的IO操作一知半解。今天看到一个便文章觉得很有道理( 原文章),记录一下。 首先,理解Java的IO操作到底操作的什么内容,过程又是怎么样子。          数据来源的操作: 来源有文件,网络数据。使用File类和Sockets等。这里操作的是数据本身,1,0结构。    File file = new File("path");   字

FreeRTOS内部机制学习03(事件组内部机制)

文章目录 事件组使用的场景事件组的核心以及Set事件API做的事情事件组的特殊之处事件组为什么不关闭中断xEventGroupSetBitsFromISR内部是怎么做的? 事件组使用的场景 学校组织秋游,组长在等待: 张三:我到了 李四:我到了 王五:我到了 组长说:好,大家都到齐了,出发! 秋游回来第二天就要提交一篇心得报告,组长在焦急等待:张三、李四、王五谁先写好就交谁的

MySQL——表操作

目录 一、创建表 二、查看表 2.1 查看表中某成员的数据 2.2 查看整个表中的表成员 2.3 查看创建表时的句柄 三、修改表 alter 3.1 重命名 rename 3.2 新增一列 add 3.3 更改列属性 modify 3.4 更改列名称 change 3.5 删除某列 上一篇博客介绍了库的操作,接下来来看一下表的相关操作。 一、创建表 create