本文主要是介绍默认ForkJoinPool引发的Redis lettuceP99升高,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
背景:
推荐系统升级RedisCluster4的SDK后,与之前的redis2.8的jedis客户端相比性能下降,具体表现在对应接口P99升高
问题原因:
在项目中使用了parallelStream的并行执行,其和lettuce的异步获取结果的CompletableFuture线程共用了一个ForkJoinPool
解决方案:
去除对于parallelStream的依赖,使用单独的线程池
通过排查堆栈,发现parallelStream产生大量的ForkJoin线程,怀疑其和lettuce的future线程之间产生资源竞争,将parallelStream去掉之后,P99明显改善
这个是接口的P99
这个是REDIS4Client的hmget P99
这个是REDIS4Client的firstResponse的 P99
http://matrix.snowballfinance.com/d/RGsiCO7Zz/recommend-recall?orgId=1&from=1616569005047&to=1616583596889
另外CPU和线程总数也不再出现大的波动
原理分析:
ParallelStream的执行线程池
对应forEach流 ForEachOps::compute方法打个断点, 或者直接forEach方法的输出语句打个断点,找到ForkJoinWorkerThread类 public class ForkJoinWorkerThread extends Thread { final ForkJoinPool pool; // the pool this thread works in final ForkJoinPool.WorkQueue workQueue; // work-stea public void run() { .... pool.runWorker(workQueue); .... } } |
completableFuture的执行线程池
private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); //useCommonPool是什么? private static final boolean useCommonPool = (ForkJoinPool.getCommonPoolParallelism() > 1 ); public static int getCommonPoolParallelism() { return commonParallelism; } private static ForkJoinPool makeCommonPool() { int parallelism = - 1 ; //这个并发的线程数默认是-1 ForkJoinWorkerThreadFactory factory = null ; 。。。。。。 if (parallelism < 0 && (parallelism = Runtime.getRuntime().availableProcessors() - 1 ) <= 0 ) //看到了吧,线程池中的处理线程数=电脑核数-1 parallelism = 1 ; if (parallelism > MAX_CAP) parallelism = MAX_CAP; return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE, "ForkJoinPool.commonPool-worker-" ); //指定线程的名字 } |
而lettuce中对于结果的返回使用的LettuceFutures--awaitOrCancel(RedisFuture<T> cmd, long timeout, TimeUnit unit)获取执行结果,
其中RedisFuture的awite实现AsyncCommand类的就是靠CompletableFuture完成的,就会和上面的parallelStream共用一个ForkJoinPool
/** * Wait until futures are complete or the supplied timeout is reached. Commands are canceled if the timeout is reached but * the command is not finished. * * @param cmd Command to wait for * @param timeout Maximum time to wait for futures to complete * @param unit Unit of time for the timeout * @param <T> Result type * * @return Result of the command. */ public static <T> T awaitOrCancel(RedisFuture<T> cmd, long timeout, TimeUnit unit) { try { if (!cmd.await(timeout, unit)) { cmd.cancel( true ); throw ExceptionFactory.createTimeoutException(Duration.ofNanos(unit.toNanos(timeout))); } return cmd.get(); } catch (RuntimeException e) { throw e; } catch (ExecutionException e) { if (e.getCause() instanceof RedisCommandExecutionException) { throw ExceptionFactory.createExecutionException(e.getCause().getMessage(), e.getCause()); } if (e.getCause() instanceof RedisCommandTimeoutException) { throw new RedisCommandTimeoutException(e.getCause()); } throw new RedisException(e.getCause()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RedisCommandInterruptedException(e); } catch (Exception e) { throw ExceptionFactory.createExecutionException( null , e); } } |
隔离了这种线程池的资源,这样对redis这种快速的线程就不会被队列中慢的线程影响获取时间片
这里留下一个问题:并发和并行的区别是?
测试代码:
1.使用parallelStream
RedisCluster redisCluster = RedisClusterImpl.create( "192.168.64.169:8056,192.168.64.169:8053" , 4 ); Thread thread1 = new Thread(() -> { int i = 0 ; while ( true ) { try { redisCluster.setex( "k" + i, 10000 , "v" + i); Long start = System.currentTimeMillis(); logger.info( "RedisCluster4 info key:{}, value:{}" , "k" + i, redisCluster.get( "k" + i)); Long costTime = System.currentTimeMillis() - start; if (costTime > 10 ) { logger.info( "RedisCluster4 slowlog :{}" , costTime); } i++; } catch (Exception ex) { logger.error( "RedisCluster4 error, {}" , ex.getMessage(), ex); } } }); thread1.start(); Thread thread2 = new Thread(() -> { while ( true ) { try { List<Integer> list = new ArrayList<>(); for ( int j = 0 ; j < 10000 ; j++) { list.add(j); } list.parallelStream().forEach(f-> { logger.info( "parallelStream log :{}" , f); for ( int j = 0 ; j < 10000 ; j++) { } }); } catch (Exception ex) { logger.error( "RedisCluster4 error, {}" , ex.getMessage(), ex); } } }); thread2.start(); |
打印的监控日志:平均P99≈100ms
2021-03-25 11:38:33.976|192.168.18.128|sep|UNKNOWN|app|TIMER|REDIS4.get||{"count":7509,"delta":7509,"min":0.22,"max":183.88,"mean":20.53,"stddev":27.56,"median":20.53,"p50":5.44,"p75":38.77,"p95":59.57,"p98":104.12,"p99":150.59,"p999":177.77,"mean_rate":739.0,"m1":660.86,"m5":647.65,"m15":645.36,"ratio":7.33,"rate_unit":"events/second","duration_unit":"milliseconds"}
2021-03-25 11:38:33.979|192.168.18.128|sep|UNKNOWN|app|TIMER|REDIS4.setex||{"count":275,"delta":275,"min":0.27,"max":215.22,"mean":19.2,"stddev":30.82,"median":19.2,"p50":3.88,"p75":38.08,"p95":56.55,"p98":107.48,"p99":176.4,"p999":215.22,"mean_rate":27.3,"m1":21.87,"m5":21.02,"m15":20.87,"ratio":9.19,"rate_unit":"events/second","duration_unit":"milliseconds"}
2.使用多线程,但是得做到控制的和ForkJoinPool一样
RedisCluster redisCluster = RedisClusterImpl.create( "192.168.64.169:8056,192.168.64.169:8053" , 4 ); Thread thread1 = new Thread(() -> { int i = 0 ; while ( true ) { try { redisCluster.setex( "k" + i, 10000 , "v" + i); Long start = System.currentTimeMillis(); logger.info( "RedisCluster4 info key:{}, value:{}" , "k" + i, redisCluster.get( "k" + i)); Long costTime = System.currentTimeMillis() - start; if (costTime > 10 ) { logger.info( "RedisCluster4 slowlog :{}" , costTime); } i++; } catch (Exception ex) { logger.error( "RedisCluster4 error, {}" , ex.getMessage(), ex); } } }); thread1.start(); ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() - 1 , ForkJoinPool.defaultForkJoinWorkerThreadFactory, null , true ); forkJoinPool.submit( new Runnable() { @Override public void run() { while ( true ) { try { for ( int j = 0 ; j < 10000 ; j++) { logger.info( "parallelStream log :{}" , j); redisCluster.get( "k" + j); } } catch (Exception ex) { logger.error( "RedisCluster4 error, {}" , ex.getMessage(), ex); } } } }); } |
打印的监控日志:平均P99≈36ms
2021-03-25 11:43:58.565|192.168.18.128|sep|UNKNOWN|app|TIMER|REDIS4.get||{"count":22924,"delta":4670,"min":0.2,"max":91.88,"mean":3.45,"stddev":9.19,"median":3.45,"p50":0.8,"p75":1.24,"p95":34.76,"p98":35.57,"p99":36.17,"p999":40.72,"mean_rate":456.99,"m1":445.99,"m5":433.02,"m15":430.13,"ratio":10.5,"rate_unit":"events/second","duration_unit":"milliseconds"}
2021-03-25 11:43:58.575|192.168.18.128|sep|UNKNOWN|app|TIMER|REDIS4.setex||{"count":7421,"delta":1510,"min":0.22,"max":152.41,"mean":3.36,"stddev":9.85,"median":3.36,"p50":0.88,"p75":1.3,"p95":34.92,"p98":36.06,"p99":37.13,"p999":152.41,"mean_rate":147.83,"m1":145.05,"m5":141.57,"m15":140.81,"ratio":11.06,"rate_unit":"events/second","duration_unit":"milliseconds"}
与forkJoin一起在池里面的那个线程栈
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1695)
java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)
java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1775)
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
io.lettuce.core.protocol.AsyncCommand.await(AsyncCommand.java:83)
io.lettuce.core.LettuceFutures.awaitOrCancel(LettuceFutures.java:112)
3.最单纯redis4Client查询
RedisCluster redisCluster = RedisClusterImpl.create( "192.168.64.169:8056,192.168.64.169:8053" , 4 ); Thread thread1 = new Thread(() -> { int i = 0 ; while ( true ) { try { redisCluster.setex( "k" + i, 10000 , "v" + i); Long start = System.currentTimeMillis(); logger.info( "RedisCluster4 info key:{}, value:{}" , "k" + i, redisCluster.get( "k" + i)); Long costTime = System.currentTimeMillis() - start; if (costTime > 10 ) { logger.info( "RedisCluster4 slowlog :{}" , costTime); } i++; } catch (Exception ex) { logger.error( "RedisCluster4 error, {}" , ex.getMessage(), ex); } } }); thread1.start(); |
打印的监控日志:平均P99≈35ms
2021-03-25 13:47:05.137|192.168.18.128|sep|UNKNOWN|app|TIMER|REDIS4.get||{"count":12846,"delta":2362,"min":0.21,"max":85.23,"mean":2.27,"stddev":7.77,"median":2.27,"p50":0.54,"p75":0.71,"p95":2.55,"p98":34.67,"p99":35.12,"p999":85.23,"mean_rate":213.46,"m1":195.49,"m5":164.06,"m15":156.74,"ratio":15.48,"rate_unit":"events/second","duration_unit":"milliseconds"}
2021-03-25 13:47:05.146|192.168.18.128|sep|UNKNOWN|app|TIMER|REDIS4.setex||{"count":12847,"delta":2362,"min":0.22,"max":84.04,"mean":1.96,"stddev":6.91,"median":1.96,"p50":0.62,"p75":0.79,"p95":1.9,"p98":34.53,"p99":35.03,"p999":84.04,"mean_rate":213.32,"m1":195.42,"m5":163.9,"m15":156.56,"ratio":17.9,"rate_unit":"events/second","duration_unit":"milliseconds"}
所有的监控日志文件: text fileMyselfRedis4Test.java
从三者的对比可以验证上面的那个结论,就是做了资源隔离,是有一定的帮助
建议:
不要在高并发的接口中使用并行流,有i/o操作的一定不要使用并行流,有线程休眠的也一定不要使用并行流,如果有需要,那就全局创建一个Fork-Join线程池自己切分任务来执行。
彩蛋:
对上面的遗留小问题解答:
并行同时执行,并发可以交替执行
这篇关于默认ForkJoinPool引发的Redis lettuceP99升高的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!