本文主要是介绍Camel - DefaultReactiveExecutor,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
Camel - DefaultReactiveExecutor
- 参考
DefaultReactiveExecutor
作用:
用于创建Worker,并对监控Worker的运行情况;监控内容包括:
- 已创建Worker
- 正在执行的Worker
- 当前挂起的任务数
一个Worker 对应一个Thread。
每个Worker创建时会记录自己的编号,并且会创建一个双端队列(Deque),如果是异步消息往队列头部添加任务,如果是同步消息往尾部添加任务。
org.apache.camel.impl.engine.DefaultReactiveExecutor.Worker#schedule
void schedule(Runnable runnable, boolean first, boolean main, boolean sync) {if (LOG.isTraceEnabled()) {LOG.trace("Schedule [first={}, main={}, sync={}]: {}", first, main, sync, runnable);}if (main) {if (!queue.isEmpty()) {if (back == null) {back = new ArrayDeque<>();}back.push(queue);queue = new ArrayDeque<>();}}if (first) {queue.addFirst(runnable);executor.pendingTasks.incrementAndGet();} else {queue.addLast(runnable);executor.pendingTasks.incrementAndGet();}if (!running || sync) {running = true;executor.runningWorkers.incrementAndGet();try {for (;;) {final Runnable polled = queue.pollFirst();if (polled == null) {if (back != null && !back.isEmpty()) {queue = back.pollFirst();continue;} else {break;}}try {executor.pendingTasks.decrementAndGet();if (LOG.isTraceEnabled()) {LOG.trace("Worker #{} running: {}", number, runnable);}polled.run();} catch (Throwable t) {LOG.warn("Error executing reactive work due to " + t.getMessage() + ". This exception is ignored.",t);}}} finally {running = false;executor.runningWorkers.decrementAndGet();}} else {if (LOG.isTraceEnabled()) {LOG.trace("Queuing reactive work: {}", runnable);}}
}
参考
async
这篇关于Camel - DefaultReactiveExecutor的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!