本文主要是介绍Camel - DefaultAsyncProcessorAwaitManager,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
DefaultAsyncProcessorAwaitManager
- 参考
主要用于路由异步处理时,等待另一个线程处理结束后,继续后续处理。
等待过程中,会择机执行积压任务。
如果当前任务一直被阻塞,会记录阻塞信息,并记录阻塞时间、总阻塞时间、最短阻塞时间、最长阻塞时间,阻塞中位数等信息。
AsyncProcessAwaitManager utilization[blocked=%s, interrupted=%s, total=%s min=%s, max=%s, mean=%s]
org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager#await
public void await(Exchange exchange, CountDownLatch latch) {ReactiveExecutor reactiveExecutor = exchange.getContext().adapt(ExtendedCamelContext.class).getReactiveExecutor();// Early exit for pending reactive queued workdo {if (latch.getCount() <= 0) {return;}} while (reactiveExecutor.executeFromQueue());if (LOG.isTraceEnabled()) {LOG.trace("Waiting for asynchronous callback before continuing for exchangeId: {} -> {}",exchange.getExchangeId(), exchange);}try {if (statistics.isStatisticsEnabled()) {blockedCounter.incrementAndGet();}inflight.put(exchange, new AwaitThreadEntry(Thread.currentThread(), exchange, latch));latch.await();if (LOG.isTraceEnabled()) {LOG.trace("Asynchronous callback received, will continue routing exchangeId: {} -> {}",exchange.getExchangeId(), exchange);}} catch (InterruptedException e) {if (LOG.isTraceEnabled()) {LOG.trace("Interrupted while waiting for callback, will continue routing exchangeId: {} -> {}",exchange.getExchangeId(), exchange);}exchange.setException(e);} finally {AwaitThread thread = inflight.remove(exchange);if (statistics.isStatisticsEnabled() && thread != null) {long time = thread.getWaitDuration();long total = totalDuration.get() + time;totalDuration.set(total);if (time < minDuration.get()) {minDuration.set(time);} else if (time > maxDuration.get()) {maxDuration.set(time);}// update meanlong count = blockedCounter.get();long mean = count > 0 ? total / count : 0;meanDuration.set(mean);}}
}
参考
AsyncProcessorAwaitManager
这篇关于Camel - DefaultAsyncProcessorAwaitManager的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!