默认ForkJoinPool引发的Redis lettuceP99升高

2024-01-01 10:38

本文主要是介绍默认ForkJoinPool引发的Redis lettuceP99升高,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

背景:

推荐系统升级RedisCluster4的SDK后,与之前的redis2.8的jedis客户端相比性能下降,具体表现在对应接口P99升高

问题原因:

在项目中使用了parallelStream的并行执行,其和lettuce的异步获取结果的CompletableFuture线程共用了一个ForkJoinPool

解决方案:

去除对于parallelStream的依赖,使用单独的线程池

通过排查堆栈,发现parallelStream产生大量的ForkJoin线程,怀疑其和lettuce的future线程之间产生资源竞争,将parallelStream去掉之后,P99明显改善

这个是接口的P99

 

这个是REDIS4Client的hmget P99

 

这个是REDIS4Client的firstResponse的 P99

 

http://matrix.snowballfinance.com/d/RGsiCO7Zz/recommend-recall?orgId=1&from=1616569005047&to=1616583596889

另外CPU和线程总数也不再出现大的波动

 

 

原理分析:

ParallelStream的执行线程池

对应forEach流

ForEachOps::compute方法打个断点,

或者直接forEach方法的输出语句打个断点,找到ForkJoinWorkerThread类

public class ForkJoinWorkerThread extends Thread {

   final ForkJoinPool pool;                // the pool this thread works in

   final ForkJoinPool.WorkQueue workQueue; // work-stea

   public void run() {

       ....

       pool.runWorker(workQueue);

       ....

    }

 }

completableFuture的执行线程池

private static final Executor asyncPool = useCommonPool ?

    ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

//useCommonPool是什么?

private static final boolean useCommonPool =

    (ForkJoinPool.getCommonPoolParallelism() > 1);

public static int getCommonPoolParallelism() {

    return commonParallelism;

}

 

 

private static ForkJoinPool makeCommonPool() {

    int parallelism = -1;  //这个并发的线程数默认是-1

    ForkJoinWorkerThreadFactory factory = null;

  。。。。。。

    if (parallelism < 0 &&

        (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)  //看到了吧,线程池中的处理线程数=电脑核数-1

        parallelism = 1;

    if (parallelism > MAX_CAP)

        parallelism = MAX_CAP;

    return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,

                            "ForkJoinPool.commonPool-worker-");  //指定线程的名字

}

而lettuce中对于结果的返回使用的LettuceFutures--awaitOrCancel(RedisFuture<T> cmd, long timeout, TimeUnit unit)获取执行结果,

其中RedisFuture的awite实现AsyncCommand类的就是靠CompletableFuture完成的,就会和上面的parallelStream共用一个ForkJoinPool

/**

 * Wait until futures are complete or the supplied timeout is reached. Commands are canceled if the timeout is reached but

 * the command is not finished.

 *

 * @param cmd Command to wait for

 * @param timeout Maximum time to wait for futures to complete

 * @param unit Unit of time for the timeout

 * @param <T> Result type

 *

 * @return Result of the command.

 */

public static <T> T awaitOrCancel(RedisFuture<T> cmd, long timeout, TimeUnit unit) {

 

    try {

        if (!cmd.await(timeout, unit)) {

            cmd.cancel(true);

            throw ExceptionFactory.createTimeoutException(Duration.ofNanos(unit.toNanos(timeout)));

        }

        return cmd.get();

    catch (RuntimeException e) {

        throw e;

    catch (ExecutionException e) {

 

        if (e.getCause() instanceof RedisCommandExecutionException) {

            throw ExceptionFactory.createExecutionException(e.getCause().getMessage(), e.getCause());

        }

 

        if (e.getCause() instanceof RedisCommandTimeoutException) {

            throw new RedisCommandTimeoutException(e.getCause());

        }

 

        throw new RedisException(e.getCause());

    catch (InterruptedException e) {

 

        Thread.currentThread().interrupt();

        throw new RedisCommandInterruptedException(e);

    catch (Exception e) {

        throw ExceptionFactory.createExecutionException(null, e);

    }

}

隔离了这种线程池的资源,这样对redis这种快速的线程就不会被队列中慢的线程影响获取时间片

这里留下一个问题:并发和并行的区别是?

测试代码:

1.使用parallelStream

RedisCluster redisCluster = RedisClusterImpl.create("192.168.64.169:8056,192.168.64.169:8053"4);

Thread thread1 = new Thread(() -> {

    int i = 0;

    while (true) {

        try {

            redisCluster.setex("k" + i, 10000"v" + i);

            Long start = System.currentTimeMillis();

            logger.info("RedisCluster4 info key:{}, value:{}""k" + i, redisCluster.get("k" + i));

            Long costTime = System.currentTimeMillis() - start;

            if (costTime > 10) {

                logger.info("RedisCluster4 slowlog :{}", costTime);

            }

            i++;

        catch (Exception ex) {

            logger.error("RedisCluster4 error, {}", ex.getMessage(), ex);

        }

    }

});

thread1.start();

 

Thread thread2 = new Thread(() -> {

    while (true) {

        try {

            List<Integer> list = new ArrayList<>();

            for (int j = 0; j < 10000; j++) {

                list.add(j);

            }

            list.parallelStream().forEach(f-> {

                logger.info("parallelStream log :{}", f);

                for (int j = 0; j < 10000; j++) {

                }

            });

        catch (Exception ex) {

            logger.error("RedisCluster4 error, {}", ex.getMessage(), ex);

        }

    }

});

 

thread2.start();

打印的监控日志:平均P99≈100ms

2021-03-25 11:38:33.976|192.168.18.128|sep|UNKNOWN|app|TIMER|REDIS4.get||{"count":7509,"delta":7509,"min":0.22,"max":183.88,"mean":20.53,"stddev":27.56,"median":20.53,"p50":5.44,"p75":38.77,"p95":59.57,"p98":104.12,"p99":150.59,"p999":177.77,"mean_rate":739.0,"m1":660.86,"m5":647.65,"m15":645.36,"ratio":7.33,"rate_unit":"events/second","duration_unit":"milliseconds"}
2021-03-25 11:38:33.979|192.168.18.128|sep|UNKNOWN|app|TIMER|REDIS4.setex||{"count":275,"delta":275,"min":0.27,"max":215.22,"mean":19.2,"stddev":30.82,"median":19.2,"p50":3.88,"p75":38.08,"p95":56.55,"p98":107.48,"p99":176.4,"p999":215.22,"mean_rate":27.3,"m1":21.87,"m5":21.02,"m15":20.87,"ratio":9.19,"rate_unit":"events/second","duration_unit":"milliseconds"}

2.使用多线程,但是得做到控制的和ForkJoinPool一样 

RedisCluster redisCluster = RedisClusterImpl.create("192.168.64.169:8056,192.168.64.169:8053"4);

Thread thread1 = new Thread(() -> {

    int i = 0;

    while (true) {

        try {

            redisCluster.setex("k" + i, 10000"v" + i);

            Long start = System.currentTimeMillis();

            logger.info("RedisCluster4 info key:{}, value:{}""k" + i, redisCluster.get("k" + i));

            Long costTime = System.currentTimeMillis() - start;

            if (costTime > 10) {

                logger.info("RedisCluster4 slowlog :{}", costTime);

            }

            i++;

        catch (Exception ex) {

            logger.error("RedisCluster4 error, {}", ex.getMessage(), ex);

        }

    }

});

thread1.start();

 

ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() - 1, ForkJoinPool.defaultForkJoinWorkerThreadFactory, nulltrue);

forkJoinPool.submit(new Runnable() {

    @Override

    public void run() {

        while (true) {

            try {

                for (int j = 0; j < 10000; j++) {

                    logger.info("parallelStream log :{}", j);

                    redisCluster.get("k" + j);

                }

            catch (Exception ex) {

                logger.error("RedisCluster4 error, {}", ex.getMessage(), ex);

            }

        }

    }

});

}

打印的监控日志:平均P99≈36ms

2021-03-25 11:43:58.565|192.168.18.128|sep|UNKNOWN|app|TIMER|REDIS4.get||{"count":22924,"delta":4670,"min":0.2,"max":91.88,"mean":3.45,"stddev":9.19,"median":3.45,"p50":0.8,"p75":1.24,"p95":34.76,"p98":35.57,"p99":36.17,"p999":40.72,"mean_rate":456.99,"m1":445.99,"m5":433.02,"m15":430.13,"ratio":10.5,"rate_unit":"events/second","duration_unit":"milliseconds"}
2021-03-25 11:43:58.575|192.168.18.128|sep|UNKNOWN|app|TIMER|REDIS4.setex||{"count":7421,"delta":1510,"min":0.22,"max":152.41,"mean":3.36,"stddev":9.85,"median":3.36,"p50":0.88,"p75":1.3,"p95":34.92,"p98":36.06,"p99":37.13,"p999":152.41,"mean_rate":147.83,"m1":145.05,"m5":141.57,"m15":140.81,"ratio":11.06,"rate_unit":"events/second","duration_unit":"milliseconds"}

与forkJoin一起在池里面的那个线程栈

java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1695)
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1775)
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
io.lettuce.core.protocol.AsyncCommand.await(AsyncCommand.java:83)
io.lettuce.core.LettuceFutures.awaitOrCancel(LettuceFutures.java:112)

3.最单纯redis4Client查询

RedisCluster redisCluster = RedisClusterImpl.create("192.168.64.169:8056,192.168.64.169:8053"4);

Thread thread1 = new Thread(() -> {

    int i = 0;

    while (true) {

        try {

            redisCluster.setex("k" + i, 10000"v" + i);

            Long start = System.currentTimeMillis();

            logger.info("RedisCluster4 info key:{}, value:{}""k" + i, redisCluster.get("k" + i));

            Long costTime = System.currentTimeMillis() - start;

            if (costTime > 10) {

                logger.info("RedisCluster4 slowlog :{}", costTime);

            }

            i++;

        catch (Exception ex) {

            logger.error("RedisCluster4 error, {}", ex.getMessage(), ex);

        }

    }

});

thread1.start();

打印的监控日志:平均P99≈35ms

2021-03-25 13:47:05.137|192.168.18.128|sep|UNKNOWN|app|TIMER|REDIS4.get||{"count":12846,"delta":2362,"min":0.21,"max":85.23,"mean":2.27,"stddev":7.77,"median":2.27,"p50":0.54,"p75":0.71,"p95":2.55,"p98":34.67,"p99":35.12,"p999":85.23,"mean_rate":213.46,"m1":195.49,"m5":164.06,"m15":156.74,"ratio":15.48,"rate_unit":"events/second","duration_unit":"milliseconds"}
2021-03-25 13:47:05.146|192.168.18.128|sep|UNKNOWN|app|TIMER|REDIS4.setex||{"count":12847,"delta":2362,"min":0.22,"max":84.04,"mean":1.96,"stddev":6.91,"median":1.96,"p50":0.62,"p75":0.79,"p95":1.9,"p98":34.53,"p99":35.03,"p999":84.04,"mean_rate":213.32,"m1":195.42,"m5":163.9,"m15":156.56,"ratio":17.9,"rate_unit":"events/second","duration_unit":"milliseconds"}

所有的监控日志文件: text fileMyselfRedis4Test.java

从三者的对比可以验证上面的那个结论,就是做了资源隔离,是有一定的帮助

建议:

不要在高并发的接口中使用并行流,有i/o操作的一定不要使用并行流,有线程休眠的也一定不要使用并行流,如果有需要,那就全局创建一个Fork-Join线程池自己切分任务来执行。

彩蛋:

对上面的遗留小问题解答:

并行同时执行,并发可以交替执行

 

这篇关于默认ForkJoinPool引发的Redis lettuceP99升高的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/559000

相关文章

禁止平板,iPad长按弹出默认菜单事件

通过监控按下抬起时间差来禁止弹出事件,把以下代码写在要禁止的页面的页面加载事件里面即可     var date;document.addEventListener('touchstart', event => {date = new Date().getTime();});document.addEventListener('touchend', event => {if (new

Android实现任意版本设置默认的锁屏壁纸和桌面壁纸(两张壁纸可不一致)

客户有些需求需要设置默认壁纸和锁屏壁纸  在默认情况下 这两个壁纸是相同的  如果需要默认的锁屏壁纸和桌面壁纸不一样 需要额外修改 Android13实现 替换默认桌面壁纸: 将图片文件替换frameworks/base/core/res/res/drawable-nodpi/default_wallpaper.*  (注意不能是bmp格式) 替换默认锁屏壁纸: 将图片资源放入vendo

零基础学习Redis(10) -- zset类型命令使用

zset是有序集合,内部除了存储元素外,还会存储一个score,存储在zset中的元素会按照score的大小升序排列,不同元素的score可以重复,score相同的元素会按照元素的字典序排列。 1. zset常用命令 1.1 zadd  zadd key [NX | XX] [GT | LT]   [CH] [INCR] score member [score member ...]

Redis中使用布隆过滤器解决缓存穿透问题

一、缓存穿透(失效)问题 缓存穿透是指查询一个一定不存在的数据,由于缓存中没有命中,会去数据库中查询,而数据库中也没有该数据,并且每次查询都不会命中缓存,从而每次请求都直接打到了数据库上,这会给数据库带来巨大压力。 二、布隆过滤器原理 布隆过滤器(Bloom Filter)是一种空间效率很高的随机数据结构,它利用多个不同的哈希函数将一个元素映射到一个位数组中的多个位置,并将这些位置的值置

Lua 脚本在 Redis 中执行时的原子性以及与redis的事务的区别

在 Redis 中,Lua 脚本具有原子性是因为 Redis 保证在执行脚本时,脚本中的所有操作都会被当作一个不可分割的整体。具体来说,Redis 使用单线程的执行模型来处理命令,因此当 Lua 脚本在 Redis 中执行时,不会有其他命令打断脚本的执行过程。脚本中的所有操作都将连续执行,直到脚本执行完成后,Redis 才会继续处理其他客户端的请求。 Lua 脚本在 Redis 中原子性的原因

android系统源码12 修改默认桌面壁纸--SRO方式

1、aosp12修改默认桌面壁纸 代码路径 :frameworks\base\core\res\res\drawable-nodpi 替换成自己的图片即可,不过需要覆盖所有目录下的图片。 由于是静态修改,则需要make一下,重新编译。 2、方法二Overlay方式 由于上述方法有很大缺点,修改多了之后容易遗忘自己修改哪些文件,为此我们采用另外一种方法,使用Overlay方式。

laravel框架实现redis分布式集群原理

在app/config/database.php中配置如下: 'redis' => array('cluster' => true,'default' => array('host' => '172.21.107.247','port' => 6379,),'redis1' => array('host' => '172.21.107.248','port' => 6379,),) 其中cl

Redis的rehash机制

在Redis中,键值对(Key-Value Pair)存储方式是由字典(Dict)保存的,而字典底层是通过哈希表来实现的。通过哈希表中的节点保存字典中的键值对。我们知道当HashMap中由于Hash冲突(负载因子)超过某个阈值时,出于链表性能的考虑,会进行Resize的操作。Redis也一样。 在redis的具体实现中,使用了一种叫做渐进式哈希(rehashing)的机制来提高字典的缩放效率,避

【吊打面试官系列-Redis面试题】说说 Redis 哈希槽的概念?

大家好,我是锋哥。今天分享关于 【说说 Redis 哈希槽的概念?】面试题,希望对大家有帮助; 说说 Redis 哈希槽的概念? Redis 集群没有使用一致性 hash,而是引入了哈希槽的概念,Redis 集群有 16384 个哈希槽,每个 key 通过 CRC16 校验后对 16384 取模来决定放置哪个槽, 集群的每个节点负责一部分 hash 槽。

Redis地理数据类型GEO

通常要计算两个地理位置的距离不是很方便,这里可以直接通过Redis提供的GEO操作来完成地理位置相关的计算 1)添加地理位置 语法:geoadd key longitude latitude member [longitude latitude member] ...字段说明:key:存放地理位置的集合名称longitude:地理坐标的经度latitude:地理坐标的纬度member:表示这