聊聊AsyncHttpClient的ListenableFuture

2023-12-18 05:45

本文主要是介绍聊聊AsyncHttpClient的ListenableFuture,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

本文主要研究一下AsyncHttpClient的ListenableFuture

ListenableFuture

org/asynchttpclient/ListenableFuture.java

public interface ListenableFuture<V> extends Future<V> {/*** Terminate and if there is no exception, mark this Future as done and release the internal lock.*/void done();/*** Abort the current processing, and propagate the {@link Throwable} to the {@link AsyncHandler} or {@link Future}** @param t the exception*/void abort(Throwable t);/*** Touch the current instance to prevent external service to times out.*/void touch();/*** Adds a listener and executor to the ListenableFuture.* The listener will be {@linkplain java.util.concurrent.Executor#execute(Runnable) passed* to the executor} for execution when the {@code Future}'s computation is* {@linkplain Future#isDone() complete}.* <br>* Executor can be <code>null</code>, in that case executor will be executed* in the thread where completion happens.* <br>* There is no guaranteed ordering of execution of listeners, they may get* called in the order they were added and they may get called out of order,* but any listener added through this method is guaranteed to be called once* the computation is complete.** @param listener the listener to run when the computation is complete.* @param exec     the executor to run the listener in.* @return this Future*/ListenableFuture<V> addListener(Runnable listener, Executor exec);CompletableFuture<V> toCompletableFuture();//......
}  

ListenableFuture继承了java.util.concurrent.Future,它定义了done、abort、touch、addListener、toCompletableFuture方法

CompletedFailure

org/asynchttpclient/ListenableFuture.java

  class CompletedFailure<T> implements ListenableFuture<T> {private final ExecutionException e;public CompletedFailure(Throwable t) {e = new ExecutionException(t);}public CompletedFailure(String message, Throwable t) {e = new ExecutionException(message, t);}@Overridepublic boolean cancel(boolean mayInterruptIfRunning) {return true;}@Overridepublic boolean isCancelled() {return false;}@Overridepublic boolean isDone() {return true;}@Overridepublic T get() throws ExecutionException {throw e;}@Overridepublic T get(long timeout, TimeUnit unit) throws ExecutionException {throw e;}@Overridepublic void done() {}@Overridepublic void abort(Throwable t) {}@Overridepublic void touch() {}@Overridepublic ListenableFuture<T> addListener(Runnable listener, Executor exec) {if (exec != null) {exec.execute(listener);} else {listener.run();}return this;}@Overridepublic CompletableFuture<T> toCompletableFuture() {CompletableFuture<T> future = new CompletableFuture<>();future.completeExceptionally(e);return future;}}

CompletedFailure实现了ListenableFuture接口,其cancel方法返回true、isDone返回true

NettyResponseFuture

org/asynchttpclient/netty/NettyResponseFuture.java

public final class NettyResponseFuture<V> implements ListenableFuture<V> {private static final Logger LOGGER = LoggerFactory.getLogger(NettyResponseFuture.class);@SuppressWarnings("rawtypes")private static final AtomicIntegerFieldUpdater<NettyResponseFuture> REDIRECT_COUNT_UPDATER = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "redirectCount");@SuppressWarnings("rawtypes")private static final AtomicIntegerFieldUpdater<NettyResponseFuture> CURRENT_RETRY_UPDATER = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "currentRetry");@SuppressWarnings("rawtypes")private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IS_DONE_FIELD = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "isDone");@SuppressWarnings("rawtypes")private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IS_CANCELLED_FIELD = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "isCancelled");@SuppressWarnings("rawtypes")private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IN_AUTH_FIELD = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "inAuth");@SuppressWarnings("rawtypes")private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IN_PROXY_AUTH_FIELD = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "inProxyAuth");@SuppressWarnings("rawtypes")private static final AtomicIntegerFieldUpdater<NettyResponseFuture> CONTENT_PROCESSED_FIELD = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "contentProcessed");@SuppressWarnings("rawtypes")private static final AtomicIntegerFieldUpdater<NettyResponseFuture> ON_THROWABLE_CALLED_FIELD = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "onThrowableCalled");@SuppressWarnings("rawtypes")private static final AtomicReferenceFieldUpdater<NettyResponseFuture, TimeoutsHolder> TIMEOUTS_HOLDER_FIELD = AtomicReferenceFieldUpdater.newUpdater(NettyResponseFuture.class, TimeoutsHolder.class, "timeoutsHolder");@SuppressWarnings("rawtypes")private static final AtomicReferenceFieldUpdater<NettyResponseFuture, Object> PARTITION_KEY_LOCK_FIELD = AtomicReferenceFieldUpdater.newUpdater(NettyResponseFuture.class, Object.class, "partitionKeyLock");private final long start = unpreciseMillisTime();private final ChannelPoolPartitioning connectionPoolPartitioning;private final ConnectionSemaphore connectionSemaphore;private final ProxyServer proxyServer;private final int maxRetry;private final CompletableFuture<V> future = new CompletableFuture<>();          //......@Overridepublic V get() throws InterruptedException, ExecutionException {return future.get();}@Overridepublic V get(long l, TimeUnit tu) throws InterruptedException, TimeoutException, ExecutionException {return future.get(l, tu);}          
}          

NettyResponseFuture实现了ListenableFuture接口

done

  public final void done() {if (terminateAndExit())return;try {loadContent();} catch (ExecutionException ignored) {} catch (RuntimeException t) {future.completeExceptionally(t);} catch (Throwable t) {future.completeExceptionally(t);throw t;}}private boolean terminateAndExit() {releasePartitionKeyLock();cancelTimeouts();this.channel = null;this.reuseChannel = false;return IS_DONE_FIELD.getAndSet(this, 1) != 0 || isCancelled != 0;}  private void loadContent() throws ExecutionException {if (future.isDone()) {try {future.get();} catch (InterruptedException e) {throw new RuntimeException("unreachable", e);}}// No more retryCURRENT_RETRY_UPDATER.set(this, maxRetry);if (CONTENT_PROCESSED_FIELD.getAndSet(this, 1) == 0) {try {future.complete(asyncHandler.onCompleted());} catch (Throwable ex) {if (ON_THROWABLE_CALLED_FIELD.getAndSet(this, 1) == 0) {try {try {asyncHandler.onThrowable(ex);} catch (Throwable t) {LOGGER.debug("asyncHandler.onThrowable", t);}} finally {cancelTimeouts();}}future.completeExceptionally(ex);}}future.getNow(null);}  

done方法对于terminateAndExit返回true的直接返回,否则执行loadContent,它对于future.isDone()的执行future.get(),然后执行future.complete(asyncHandler.onCompleted())回调

abort

  public final void abort(final Throwable t) {if (terminateAndExit())return;future.completeExceptionally(t);if (ON_THROWABLE_CALLED_FIELD.compareAndSet(this, 0, 1)) {try {asyncHandler.onThrowable(t);} catch (Throwable te) {LOGGER.debug("asyncHandler.onThrowable", te);}}}

abort方法也是对于terminateAndExit返回true的直接返回,否则执行future.completeExceptionally(t),然后触发asyncHandler.onThrowable(t)回调

touch

  public void touch() {touch = unpreciseMillisTime();}

touch方法用当前时间戳更新touch属性

addListener

  public ListenableFuture<V> addListener(Runnable listener, Executor exec) {if (exec == null) {exec = Runnable::run;}future.whenCompleteAsync((r, v) -> listener.run(), exec);return this;}

addListener方法会执行future.whenCompleteAsync((r, v) -> listener.run(), exec)

toCompletableFuture

  public CompletableFuture<V> toCompletableFuture() {return future;}

toCompletableFuture方法直接返回future

newNettyResponseFuture

org/asynchttpclient/netty/request/NettyRequestSender.java

  private <T> NettyResponseFuture<T> newNettyResponseFuture(Request request,AsyncHandler<T> asyncHandler,NettyRequest nettyRequest,ProxyServer proxyServer) {NettyResponseFuture<T> future = new NettyResponseFuture<>(request,asyncHandler,nettyRequest,config.getMaxRequestRetry(),request.getChannelPoolPartitioning(),connectionSemaphore,proxyServer);String expectHeader = request.getHeaders().get(EXPECT);if (HttpHeaderValues.CONTINUE.contentEqualsIgnoreCase(expectHeader))future.setDontWriteBodyBecauseExpectContinue(true);return future;}private <T> ListenableFuture<T> sendRequestWithCertainForceConnect(Request request,AsyncHandler<T> asyncHandler,NettyResponseFuture<T> future,ProxyServer proxyServer,boolean performConnectRequest) {NettyResponseFuture<T> newFuture = newNettyRequestAndResponseFuture(request, asyncHandler, future, proxyServer,performConnectRequest);Channel channel = getOpenChannel(future, request, proxyServer, asyncHandler);return Channels.isChannelActive(channel)? sendRequestWithOpenChannel(newFuture, asyncHandler, channel): sendRequestWithNewChannel(request, proxyServer, newFuture, asyncHandler);}  

NettyRequestSender的newNettyResponseFuture创建的是NettyResponseFuture;sendRequestWithCertainForceConnect则将NettyResponseFuture传递给sendRequestWithOpenChannel或者sendRequestWithNewChannel来发送请求

小结

AsyncHttpClient的ListenableFuture继承了java.util.concurrent.Future,它定义了done、abort、touch、addListener、toCompletableFuture方法;它有两个实现类,分别是CompletedFailure及NettyResponseFuture;NettyRequestSender的sendRequest方法将NettyResponseFuture传递给sendRequestWithOpenChannel或者sendRequestWithNewChannel来发送请求。

这篇关于聊聊AsyncHttpClient的ListenableFuture的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/507237

相关文章

聊聊说话的习惯

1 在日常生活中,每个人都有固定的说话习惯。心理学研究表明,通过一个人的说话习惯,也可以分析出他的性格特点。对于每一个人来讲,说话习惯已经融为他们生活中的一部分。在社交活动中,一些不良的说话习惯很可能会给他们带来麻烦。因此,了解说话习惯对心理活动的影响是十分有必要的。 2 具有顺畅的说话习惯的人,大多思路清晰、语速适中、用词准确并且声声人耳,是典型的顺畅型说话方式这种类型的人要么不说话,要么

聊聊分布式,再讨论分布式解决方案

前言 最近很久没有写博客了,一方面是因为公司事情最近比较忙,另外一方面是因为在进行 CAP 的下一阶段的开发工作,不过目前已经告一段落了。 接下来还是开始我们今天的话题,说说分布式事务,或者说是我眼中的分布式事务,因为每个人可能对其的理解都不一样。 分布式事务是企业集成中的一个技术难点,也是每一个分布式系统架构中都会涉及到的一个东西,特别是在微服务架构中,几乎可以说是无法避免,本文就分布式事

聊聊资源调度

资源调度 般分为两个阶段: 是实现物理资源的虚拟化(即资源的抽象)于当前机器的性能越来越好,硬件配置越来越高,直接用物理机跑业务比较浪费,所以将物理机分割成更小单位的虚拟机,这样可以显著提升机器的利用效率,在公司内部一般采用容器技术来隔离资源 是将资源虚拟化后进 步在时间和空间上实现更细粒度的编排 ,优化资源的使用。 1 .一些数据 如果公司的几万台机器都是物理机,那么资源的使用率稍低: CP

聊聊Spark中的宽依赖和窄依赖

开门见山,本文就针对一个点,谈谈Spark中的宽依赖和窄依赖,这是Spark计算引擎划分Stage的根源所在,遇到宽依赖,则划分为多个stage,针对每个Stage,提交一个TaskSet: 上图:一张网上的图: 基于此图,分析下这里为什么前面的流程都是窄依赖,而后面的却是宽依赖: 我们仔细看看,map和filter算子中,对于父RDD来说,一个分区内的数据,有且仅有一个子RDD的分区来

聊聊灰度发布

有没有在北京面试java的小伙伴,每家公司面试问的问题都不一样,昨天面试官问到了灰度发布,一脸懵,好像在哪儿听说过,毕竟我都没发布过,之前都是项目组长在干这些事儿,所以聊聊,了解一下 什么是灰度发布 全量发布:把旧服务kill掉,把新服务启动,这个过程就可以理解为全量发布 回滚周期长 如果我们更新完应用之后,我们做线上回归测试的时候发现有BUG,这个时候就要做回滚,过程就是把新服

聊聊随机测试和猴子测试

目录 随机测试的特点 1.不可预测性 2.缺乏针对性 3.自动化 4.资源密集型 猴子测试 随机测试 (Random Testing) 猴子测试 (Monkey Testing) 特点: 区别 1.控制程度 2.目标差异 3.实现方式 在我们测试的过程中,通常会使用到随机测试和猴子测试,其中随机测试侧重于人工测试,猴子测试侧重于借助工具执行命令进行测试。 随机测试

【聊聊经济社会】论阶级跨越

为什么要在市场中寻求自由,在市场中寻求洒脱,原因不胜其数,其中便有一条,现实生活中多是xx,可能社会属性本身就具备党同伐异,像是一股意志,平庸一切不平庸,中和一切特立独行,最终以达到一种变态的稳定. 消其意志,断其未来,耗其钱财 ,而我称之为阶级壁垒 阶级之所以难以跨越,主要也在于这三点 一:没有这样的志向,像那种羡慕有钱,或者羡慕有权,权当做梦。这样的志向,正常人只停留于羡慕的层次,而一旦受到丁

聊聊PC端页面适配

聊聊PC端页面适配  目也pc端有适配的需求:目前我们pc项目的设计稿尺寸是宽度1920,高度最小是1080。 适配目标: 1.在不同分辨率的电脑上,网页可以正常显示 2.放大或者缩小屏幕,网页可以正常显示 对于宽度的适配   对于宽度适配: 首先设置html,body{width:100%;overflow-x:hidden;} 然后我们可以把页面分解为背景层(

来聊聊我用go手写redis这件事

写在文章开头 网上有看过一些实现redis的项目,要么完全脱离go语言的理念,要么又完全去迎合c的实现理念,也不是说这些项目写的不好,只能说不符合笔者所认为的那种"平衡",于是整理了一段时间的设计稿,自己尝试着用go语言写了一版"有redis味道"的mini-redis。 截至目前,笔者已经完成了redis服务端和客户端交互的基本通信架构和实现基调,如下所示,可以看到笔者已经实现了ping

供应链劫持?聊聊什么是RepoJacking

介绍        近几个月来,对开源存储库的主要威胁就包括存储仓库劫持,通常称为RepoJacking。RepoJacking 是指恶意攻击者通过一定手段接管托管存储库的所有权或维护者的账户。通过获取对账户的访问权限,攻击者可以将恶意代码注入到使用对应仓库作为依赖项的项目中。 RepoJacking 如何攻击?        存储库攻击,也称为供应链攻击,通常利用 GitH