本文主要是介绍聊聊AsyncHttpClient的ListenableFuture,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
序
本文主要研究一下AsyncHttpClient的ListenableFuture
ListenableFuture
org/asynchttpclient/ListenableFuture.java
public interface ListenableFuture<V> extends Future<V> {/*** Terminate and if there is no exception, mark this Future as done and release the internal lock.*/void done();/*** Abort the current processing, and propagate the {@link Throwable} to the {@link AsyncHandler} or {@link Future}** @param t the exception*/void abort(Throwable t);/*** Touch the current instance to prevent external service to times out.*/void touch();/*** Adds a listener and executor to the ListenableFuture.* The listener will be {@linkplain java.util.concurrent.Executor#execute(Runnable) passed* to the executor} for execution when the {@code Future}'s computation is* {@linkplain Future#isDone() complete}.* <br>* Executor can be <code>null</code>, in that case executor will be executed* in the thread where completion happens.* <br>* There is no guaranteed ordering of execution of listeners, they may get* called in the order they were added and they may get called out of order,* but any listener added through this method is guaranteed to be called once* the computation is complete.** @param listener the listener to run when the computation is complete.* @param exec the executor to run the listener in.* @return this Future*/ListenableFuture<V> addListener(Runnable listener, Executor exec);CompletableFuture<V> toCompletableFuture();//......
}
ListenableFuture继承了java.util.concurrent.Future,它定义了done、abort、touch、addListener、toCompletableFuture方法
CompletedFailure
org/asynchttpclient/ListenableFuture.java
class CompletedFailure<T> implements ListenableFuture<T> {private final ExecutionException e;public CompletedFailure(Throwable t) {e = new ExecutionException(t);}public CompletedFailure(String message, Throwable t) {e = new ExecutionException(message, t);}@Overridepublic boolean cancel(boolean mayInterruptIfRunning) {return true;}@Overridepublic boolean isCancelled() {return false;}@Overridepublic boolean isDone() {return true;}@Overridepublic T get() throws ExecutionException {throw e;}@Overridepublic T get(long timeout, TimeUnit unit) throws ExecutionException {throw e;}@Overridepublic void done() {}@Overridepublic void abort(Throwable t) {}@Overridepublic void touch() {}@Overridepublic ListenableFuture<T> addListener(Runnable listener, Executor exec) {if (exec != null) {exec.execute(listener);} else {listener.run();}return this;}@Overridepublic CompletableFuture<T> toCompletableFuture() {CompletableFuture<T> future = new CompletableFuture<>();future.completeExceptionally(e);return future;}}
CompletedFailure实现了ListenableFuture接口,其cancel方法返回true、isDone返回true
NettyResponseFuture
org/asynchttpclient/netty/NettyResponseFuture.java
public final class NettyResponseFuture<V> implements ListenableFuture<V> {private static final Logger LOGGER = LoggerFactory.getLogger(NettyResponseFuture.class);@SuppressWarnings("rawtypes")private static final AtomicIntegerFieldUpdater<NettyResponseFuture> REDIRECT_COUNT_UPDATER = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "redirectCount");@SuppressWarnings("rawtypes")private static final AtomicIntegerFieldUpdater<NettyResponseFuture> CURRENT_RETRY_UPDATER = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "currentRetry");@SuppressWarnings("rawtypes")private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IS_DONE_FIELD = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "isDone");@SuppressWarnings("rawtypes")private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IS_CANCELLED_FIELD = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "isCancelled");@SuppressWarnings("rawtypes")private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IN_AUTH_FIELD = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "inAuth");@SuppressWarnings("rawtypes")private static final AtomicIntegerFieldUpdater<NettyResponseFuture> IN_PROXY_AUTH_FIELD = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "inProxyAuth");@SuppressWarnings("rawtypes")private static final AtomicIntegerFieldUpdater<NettyResponseFuture> CONTENT_PROCESSED_FIELD = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "contentProcessed");@SuppressWarnings("rawtypes")private static final AtomicIntegerFieldUpdater<NettyResponseFuture> ON_THROWABLE_CALLED_FIELD = AtomicIntegerFieldUpdater.newUpdater(NettyResponseFuture.class, "onThrowableCalled");@SuppressWarnings("rawtypes")private static final AtomicReferenceFieldUpdater<NettyResponseFuture, TimeoutsHolder> TIMEOUTS_HOLDER_FIELD = AtomicReferenceFieldUpdater.newUpdater(NettyResponseFuture.class, TimeoutsHolder.class, "timeoutsHolder");@SuppressWarnings("rawtypes")private static final AtomicReferenceFieldUpdater<NettyResponseFuture, Object> PARTITION_KEY_LOCK_FIELD = AtomicReferenceFieldUpdater.newUpdater(NettyResponseFuture.class, Object.class, "partitionKeyLock");private final long start = unpreciseMillisTime();private final ChannelPoolPartitioning connectionPoolPartitioning;private final ConnectionSemaphore connectionSemaphore;private final ProxyServer proxyServer;private final int maxRetry;private final CompletableFuture<V> future = new CompletableFuture<>(); //......@Overridepublic V get() throws InterruptedException, ExecutionException {return future.get();}@Overridepublic V get(long l, TimeUnit tu) throws InterruptedException, TimeoutException, ExecutionException {return future.get(l, tu);}
}
NettyResponseFuture实现了ListenableFuture接口
done
public final void done() {if (terminateAndExit())return;try {loadContent();} catch (ExecutionException ignored) {} catch (RuntimeException t) {future.completeExceptionally(t);} catch (Throwable t) {future.completeExceptionally(t);throw t;}}private boolean terminateAndExit() {releasePartitionKeyLock();cancelTimeouts();this.channel = null;this.reuseChannel = false;return IS_DONE_FIELD.getAndSet(this, 1) != 0 || isCancelled != 0;} private void loadContent() throws ExecutionException {if (future.isDone()) {try {future.get();} catch (InterruptedException e) {throw new RuntimeException("unreachable", e);}}// No more retryCURRENT_RETRY_UPDATER.set(this, maxRetry);if (CONTENT_PROCESSED_FIELD.getAndSet(this, 1) == 0) {try {future.complete(asyncHandler.onCompleted());} catch (Throwable ex) {if (ON_THROWABLE_CALLED_FIELD.getAndSet(this, 1) == 0) {try {try {asyncHandler.onThrowable(ex);} catch (Throwable t) {LOGGER.debug("asyncHandler.onThrowable", t);}} finally {cancelTimeouts();}}future.completeExceptionally(ex);}}future.getNow(null);}
done方法对于terminateAndExit返回true的直接返回,否则执行loadContent,它对于future.isDone()的执行future.get(),然后执行future.complete(asyncHandler.onCompleted())回调
abort
public final void abort(final Throwable t) {if (terminateAndExit())return;future.completeExceptionally(t);if (ON_THROWABLE_CALLED_FIELD.compareAndSet(this, 0, 1)) {try {asyncHandler.onThrowable(t);} catch (Throwable te) {LOGGER.debug("asyncHandler.onThrowable", te);}}}
abort方法也是对于terminateAndExit返回true的直接返回,否则执行future.completeExceptionally(t),然后触发asyncHandler.onThrowable(t)回调
touch
public void touch() {touch = unpreciseMillisTime();}
touch方法用当前时间戳更新touch属性
addListener
public ListenableFuture<V> addListener(Runnable listener, Executor exec) {if (exec == null) {exec = Runnable::run;}future.whenCompleteAsync((r, v) -> listener.run(), exec);return this;}
addListener方法会执行future.whenCompleteAsync((r, v) -> listener.run(), exec)
toCompletableFuture
public CompletableFuture<V> toCompletableFuture() {return future;}
toCompletableFuture方法直接返回future
newNettyResponseFuture
org/asynchttpclient/netty/request/NettyRequestSender.java
private <T> NettyResponseFuture<T> newNettyResponseFuture(Request request,AsyncHandler<T> asyncHandler,NettyRequest nettyRequest,ProxyServer proxyServer) {NettyResponseFuture<T> future = new NettyResponseFuture<>(request,asyncHandler,nettyRequest,config.getMaxRequestRetry(),request.getChannelPoolPartitioning(),connectionSemaphore,proxyServer);String expectHeader = request.getHeaders().get(EXPECT);if (HttpHeaderValues.CONTINUE.contentEqualsIgnoreCase(expectHeader))future.setDontWriteBodyBecauseExpectContinue(true);return future;}private <T> ListenableFuture<T> sendRequestWithCertainForceConnect(Request request,AsyncHandler<T> asyncHandler,NettyResponseFuture<T> future,ProxyServer proxyServer,boolean performConnectRequest) {NettyResponseFuture<T> newFuture = newNettyRequestAndResponseFuture(request, asyncHandler, future, proxyServer,performConnectRequest);Channel channel = getOpenChannel(future, request, proxyServer, asyncHandler);return Channels.isChannelActive(channel)? sendRequestWithOpenChannel(newFuture, asyncHandler, channel): sendRequestWithNewChannel(request, proxyServer, newFuture, asyncHandler);}
NettyRequestSender的newNettyResponseFuture创建的是NettyResponseFuture;sendRequestWithCertainForceConnect则将NettyResponseFuture传递给sendRequestWithOpenChannel或者sendRequestWithNewChannel来发送请求
小结
AsyncHttpClient的ListenableFuture继承了java.util.concurrent.Future,它定义了done、abort、touch、addListener、toCompletableFuture方法;它有两个实现类,分别是CompletedFailure及NettyResponseFuture;NettyRequestSender的sendRequest方法将NettyResponseFuture传递给sendRequestWithOpenChannel或者sendRequestWithNewChannel来发送请求。
这篇关于聊聊AsyncHttpClient的ListenableFuture的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!