本文主要是介绍Netty系列-1 NioEventLoopGroup和NioEventLoop介绍,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
背景
从本文开始开启一个新的专题Netty系列,用于收集Netty相关的文章,内容包含Netty的使用方式、运行原理等。
基于io.netty:netty-all:4.1.49.Final版本进行介绍
1.NioEventLoopGroup
介绍NioEventLoopGroup之前,有几个相关的组件需要提前介绍。
1.1 SelectorProvider
NIO中用于创建通道channel和选择器selector接口的类,本质是对操作系统API进行的封装;属于NIO知识范畴(可参考IO系列文章),不是本文讨论的对象。
1.2 SelectStrategy
选择策略,这里结合接口的定义和实现以及使用进行说明。
接口定义如下:
public interface SelectStrategy {int SELECT = -1;int CONTINUE = -2;int BUSY_WAIT = -3;int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception;
}
// 只有一个get方法,无参地获取一个整型变量。
public interface IntSupplier {int get() throws Exception;
}
SelectStrategy中定义了三个宏定义常量, 其中有个常量SelectStrategy.SELECT值为-1;定义了一个calculateStrategy方法,接收两个参数,并计算出策略结果。
默认实现类如下:
final class DefaultSelectStrategy implements SelectStrategy {static final SelectStrategy INSTANCE = new DefaultSelectStrategy();private DefaultSelectStrategy() {}@Overridepublic int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;}
}
逻辑较为简单:如果hasTasks布尔参数为true, 理解为有任务待执行,则从selectSupplier获取值并返回;如果hasTasks布尔参数为false, 理解为无任务待执行,则直接返回SelectStrategy.SELECT。
使用场景:
为理解方便,再结合SelectStrategy再Netty中的使用场景进行介绍。
在Netty的死循环中,每次循环伊始调用selectStrategy的calculateStrategy方法,计算strategy值,根据strategy值确定进行select阻塞还是执行处理任务:
@Override
protected void run() {//...for (;;) {//...int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());switch (strategy) {case SelectStrategy.SELECT:// 执行select阻塞default://...processSelectedKeys();//...runAllTasks();}//...}//...
}
其中selectNowSupplier的实现如下:
private final IntSupplier selectNowSupplier = new IntSupplier() {@Overridepublic int get() throws Exception {return selectNow();}
};int selectNow() throws IOException {//选择器的selectNow方法,无阻塞地返回已就绪的通道数;没有就绪的通道,否则返回0return selector.selectNow();
}
此时,调用IntSupplier的get()获取的是已就绪(如客户端发送了可读数据)的通道数。
hasTasks()的实现如下:
protected boolean hasTasks() {return super.hasTasks() || !tailTasks.isEmpty();
}protected boolean hasTasks() {return !taskQueue.isEmpty();
}
判断任务队列是否有任务积存。
综上,Netty的死循环中,每次循环判断一下任务队列是否有任务积存,如果有,则执行processSelectedKeys和runAllTasks方法,否则执行select阻塞。
工厂类:
Netty为DefaultSelectStrategy提供了一个工厂类DefaultSelectStrategyFactory:
public final class DefaultSelectStrategyFactory implements SelectStrategyFactory {public static final SelectStrategyFactory INSTANCE = new DefaultSelectStrategyFactory();private DefaultSelectStrategyFactory() { }@Overridepublic SelectStrategy newSelectStrategy() {return DefaultSelectStrategy.INSTANCE;}
}
因此,可通过DefaultSelectStrategyFactory.INSTANCE获取单例DefaultSelectStrategy.INSTANCE对象。
1.3 RejectedExecutionHandler
拒绝处理器,这里结合接口的定义和实现以及使用进行说明。
接口定义:
public interface RejectedExecutionHandler {void rejected(Runnable task, SingleThreadEventExecutor executor);
}
RejectedExecutionHandler仅定义了一个方法,rejected接受两个参数,Runnable任务和SingleThreadEventExecutor线程池对象。
匿名实现类如下:
// RejectedExecutionHandlers.java中// REJECT静态属性
private static final RejectedExecutionHandler REJECT = new RejectedExecutionHandler() {@Overridepublic void rejected(Runnable task, SingleThreadEventExecutor executor) {throw new RejectedExecutionException();}
};// 静态方法,获取REJECT静态属性
public static RejectedExecutionHandler reject() {return REJECT;
}
直接抛出RejectedExecutionException异常。
可通过RejectedExecutionHandlers.reject()获取该RejectedExecutionHandler单例对象。
使用场景:
protected void addTask(Runnable task) {ObjectUtil.checkNotNull(task, "task");if (!offerTask(task)) {reject(task);}
}protected final void reject(Runnable task) {// 抛出RejectedExecutionException异常rejectedExecutionHandler.rejected(task, this);
}
向任务队列(1.2章节中提到的taskQueue和tailTasks队列)中添加任务失败时,调用rejectedExecutionHandler的rejected方法抛出RejectedExecutionException异常。
1.4 EventExecutorChooser
线程选择器,这里结合接口的定义和实现以及使用进行说明。
接口定义:
interface EventExecutorChooser {EventExecutor next();
}
从接口定义可以看出,线程选择器的next()将会从线程池中返回一个线程。
两个实现类:
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {private final AtomicInteger idx = new AtomicInteger();private final EventExecutor[] executors;PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {this.executors = executors;}@Overridepublic EventExecutor next() {return executors[idx.getAndIncrement() & executors.length - 1];}
}private static final class GenericEventExecutorChooser implements EventExecutorChooser {private final AtomicInteger idx = new AtomicInteger();private final EventExecutor[] executors;GenericEventExecutorChooser(EventExecutor[] executors) {this.executors = executors;}@Overridepublic EventExecutor next() {return executors[Math.abs(idx.getAndIncrement() % executors.length)];}
}
PowerOfTwoEventExecutorChooser和GenericEventExecutorChooser作为EventExecutorChooser实现类,内部都维持了一个 EventExecutor[]线程数组executors对象和AtomicInteger原子计数器idx.
next()方法将根据计数器idx计算出一个下标,根据下标从executors数组中选择一个线程返回。二者的区别是计算下表的方式不同:GenericEventExecutorChooser通过取模获取索引下标;而PowerOfTwoEventExecutorChooser通过位运算(按位与&)来快速计算索引,这种方式在EventLoop数量是2的幂次方时,能够显著提高性能。
工厂方法:
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();private DefaultEventExecutorChooserFactory() { }@SuppressWarnings("unchecked")@Overridepublic EventExecutorChooser newChooser(EventExecutor[] executors) {// 数组长度是否为2的幂次方if (isPowerOfTwo(executors.length)) {return new PowerOfTwoEventExecutorChooser(executors);} else {return new GenericEventExecutorChooser(executors);}}private static boolean isPowerOfTwo(int val) {return (val & -val) == val;}// ...
}
使用场景:
NioEventLoopGroup可以理解为一个线程池,内部维持了一个NioEventLoop[]线程数组。当NioEventLoopGroup接受任务时,调用next()方法从NioEventLoop[]数组中获取一个NioEventLoop线程对象,并将任务委托给NioEventLoop对象执行,如下所示:
public ChannelFuture register(Channel channel) {return next().register(channel);
}public ChannelFuture register(ChannelPromise promise) {return next().register(promise);
}public ChannelFuture register(Channel channel, ChannelPromise promise) {return next().register(channel, promise);
}public Future<?> submit(Runnable task) {return next().submit(task);
}public <T> Future<T> submit(Runnable task, T result) {return next().submit(task, result);
}public <T> Future<T> submit(Callable<T> task) {return next().submit(task);
}public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {return next().schedule(command, delay, unit);
}public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {return next().schedule(callable, delay, unit);
}public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {return next().scheduleAtFixedRate(command, initialDelay, period, unit);
}public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {return next().scheduleWithFixedDelay(command, initialDelay, delay, unit);
}public void execute(Runnable command) {next().execute(command);
}
1.5 ThreadPerTaskExecutor
ThreadPerTaskExecutor定义如下:
public final class ThreadPerTaskExecutor implements Executor {private final ThreadFactory threadFactory;public ThreadPerTaskExecutor(ThreadFactory threadFactory) {this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");}@Overridepublic void execute(Runnable command) {threadFactory.newThread(command).start();}
}
其中,Executor是JUC中的线程池接口:
public interface Executor {void execute(Runnable command);
}
当有任务提交给ThreadPerTaskExecutor线程池时,会通过threadFactory线程工厂创建一个线程,并将该任务提交给该线程执行。
使用场景:
构造ThreadPerTaskExecutor对象时,需要传入一个线程工厂用于确定是否守护线程,名称、线程属组、优先级等,这里newDefaultThreadFactory构造线程工厂的内容不重要,重点在与包装ThreadPerTaskExecutor对象,并最终赋值给NioEventLoop的this.executor属性。这部分将在NioEventLoop章节中介绍。
1.6 NioEventLoopGroup介绍
本章节从构造函数以及属性的角度介绍NioEventLoopGroup:
可以通过参数指定NioEventLoopGroup使用的线程数量,不指定时使用CPU属性*2作为线程数。
// 使用1个线程
EventLoopGroup bossGroup = new NioEventLoopGroup(1);// cpu*2
EventLoopGroup workerGroup = new NioEventLoopGroup();
无参和带参的构造函数如下:
public NioEventLoopGroup() {this(0);
}public NioEventLoopGroup(int nThreads) {this(nThreads, (Executor) null);
}
进入MultithreadEventLoopGroup构造函数中:
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
而DEFAULT_EVENT_LOOP_THREADS在类加载阶段已确定:
private static final int DEFAULT_EVENT_LOOP_THREADS;static {DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
}
如果设置了"io.netty.eventLoopThreads"环境变量,则使用这个环境变量指定的值,否则使用CPU数*2.
构造函数通过super逐层调用父类的构造函数进入MultithreadEventExecutorGroup中,删除异常分支后的主线逻辑如下:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {// 步骤1.构造ThreadPerTaskExecutor对象executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());// 步骤2.创建children数组children = new EventExecutor[nThreads];for (int i = 0; i < nThreads; i ++) { children[i] = newChild(executor, args); }// 步骤3.创建线程选择器chooser = chooserFactory.newChooser(children);// 添加terminationListener监听器...// 步骤4.再维护一份只读的childrenSet<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);Collections.addAll(childrenSet, children);readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
步骤1: 创建ThreadPerTaskExecutor线程池对象,将作为参数构造NioEventLoop对象。
步骤2:根据线程数创建children数组,调用newChild依次创建线程,得到的children称为子线程组;
步骤3:将子线程组传递给线程选择器构造函数,创建线程选择器;
步骤4:在NioEventLoopGroup内部维护一份只读的children子线程组;
此后,NioEventLoopGroup拥有了一个NioEventLoop数组对象,以及一个chooser线程选择器,选择器的next()方法会依次从NioEventLoop数组对象中返回一个NioEventLoop对象,用于执行提交给NioEventLoopGroup的任务。
继续看一下newChild方法:
protected EventLoop newChild(Executor executor, Object... args) throws Exception {EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;return new NioEventLoop(this, executor, (SelectorProvider) args[0],((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}
这里传递给NioEventLoop构造参数的对象,如executor(ThreadPerTaskExecutor),SelectorProvider,SelectStrategy,RejectedExecutionHandler就是本章节介绍的组件。
另外,从NioEventLoopGroup的继承体系看,继承了ExecutorService具备了线程池的能力,继承了ScheduledExecutorService具备了定时任务的能力。
可以通过案例测试一下:
public static void main(String[] args) {EventLoopGroup group = new NioEventLoopGroup(4);for (int i = 0;i<10;i++) {group.submit(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("[test]"+Thread.currentThread().getName()+" executed.");});}
}
运行结果如下所示:
[test]nioEventLoopGroup-2-3 executed.
[test]nioEventLoopGroup-2-4 executed.
[test]nioEventLoopGroup-2-1 executed.
[test]nioEventLoopGroup-2-2 executed.
[test]nioEventLoopGroup-2-3 executed.
[test]nioEventLoopGroup-2-4 executed.
[test]nioEventLoopGroup-2-2 executed.
[test]nioEventLoopGroup-2-1 executed.
[test]nioEventLoopGroup-2-1 executed.
[test]nioEventLoopGroup-2-2 executed.
可以看出,group内部维护了4个线程,依次将任务提交给这四个线程处理。
2.NioEventLoop
如果将NioEventLoopGroup理解为线程池,NioEventLoop就是实际干活的线程, 且一旦启动后就不断地循环干活,如下所示:
NioEventLoop执行选择器的select方法阻塞后陷入阻塞,直到有任务或者IO事件才会唤醒NioEventLoop,依次处理IO事件、线程任务队列的任务,然后再次执行选择器的select方法阻塞后陷入阻塞,循环往复。
先从整体上对NioEventLoop的功能了解后,以下结合Netty源码对NioEventLoop进行详细介绍。
2.1 构造NioEventLoop
NioEventLoop构造函数:
继续NioEventLoopGroup中通过newChild方法构造NioEventLoop对象,进入NioEventLoop的构造函数:
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,EventLoopTaskQueueFactory queueFactory) {super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),rejectedExecutionHandler);this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");final SelectorTuple selectorTuple = openSelector();this.selector = selectorTuple.selector;this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
参数parent为NioEventLoopGroup, executor/selectorProvider/strategy/rejectedExecutionHandler在NioEventLoopGroup中已介绍过,queueFactory此时为空,将使用Netty默认的队列工厂生产队列,队列用于存放提交给NioEventLoop的任务。
NioEventLoop属性:
随着父类构造器的不断调用,NioEventLoop将拥有以下重要属性:
// 所属的NioEventLoopGroup对象
private final EventExecutorGroup parent;// 用于创建NIO通道和选择器的SelectorProvider对象
private final SelectorProvider provider;// 多路复用选择器
private Selector selector;// 线程池
private final Executor executor;// 两个任务队列
private final Queue<Runnable> taskQueue;
private final Queue<Runnable> tailTasks;// 拒绝处理器
private final RejectedExecutionHandler rejectedExecutionHandler;// 选择策略
private final SelectStrategy selectStrategy;
parent属性记录所属的NioEventLoopGroup对象;
provider用于后续创建NIO的通道和选择器;
selector多路复用选择器,Netty底层是依赖于NIO实现的,需要依赖selector;
taskQueue和tailTasks任务队列,用于存放待执行的任务(提交给NioEventLoop的任务);
rejectedExecutionHandler拒绝处理器,当向队列添加任务失败时,调用拒绝处理器抛出异常;
selectStrategy选择处理器,根据是否有任务确定是执行select阻塞还是执行任务。
executor属性需要详细介绍一下:
this.executor = ThreadExecutorMap.apply(executor, this);
创建给ThreadExecutorMap.apply方法2个参数:ThreadPerTaskExecutor类型的线程池executor对象,对于提交的每个任务都会创建一个线程,并将任务委托给该线程处理;另外一个参数是当前NioEventLoop对象。
public static Executor apply(final Executor executor,final EventExecutor eventExecutor) {return new Executor() {@Overridepublic void execute(final Runnable command) {executor.execute(apply(command, eventExecutor));}};
}
该方法将返回一个线程池对象,当向该线程池对象提交任务时(调用execute并传递Runnable时),将调用ThreadPerTaskExecutor执行apply(command, eventExecutor)
返回的任务,apply对该任务进行了增强:
public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {return new Runnable() {@Overridepublic void run() {setCurrentEventExecutor(eventExecutor);try {command.run();} finally {setCurrentEventExecutor(null);}}};
}
这是常见的ThreadLocal写法:
private static final FastThreadLocal<EventExecutor> mappings = new FastThreadLocal<EventExecutor>();private static void setCurrentEventExecutor(EventExecutor executor) {mappings.set(executor);
}
此时,在任务的前后进行了增强,将NioEventLoop保存到mappings中,并在任务执行完后删除。
总之,this.executor是一个Executor类,当该类的execute(Runnable command)
方法被调用时,ThreadPerTaskExecutor会创建一个线程来执行这个任务,且这个任务前后将NioEventLoop对象添加到ThreadLocal中。
ThreadPerTaskExecutor创建的这个线程将会保存在NioEventLoop的thread属性中,每个NioEventLoop都会绑定一个线程,具体逻辑在下一节进行介绍。
2.2 启动NioEventLoop
NioEventLoopGroup group = new NioEventLoopGroup(4);
在NioEventLoopGroup对象以及NioEventLoop[]数组(以及数组元素)创建完成后,所有的NioEventLoop处于未启动状态,向其提交任务时会启动NioEventLoop循环。
可通过如下方式, 启动一个NioEventLoop:
Runnable task = () -> System.out.println(Thread.currentThread().getName());
group.submit(task);// 或者
NioEventLoop next = (NioEventLoop)group.next();
next.submit(task);
分析:向NioEventLoopGroup提交任务时,NioEventLoopGroup会通过next()方法获取一个子NioEventLoop并将任务提交给改子NioEventLoop,因此上述两种方式完全一致。
状态变量和状态值:
在NioEventLoop(以及其父类)内部定义了两个状态变量,如下所示:
// NioEventLoop绑定的线程
private volatile Thread thread;//NioEventLoop状态, 初始状态为未启动
private volatile int state = ST_NOT_STARTED;//状态值定义如下:
// 未开始
private static final int ST_NOT_STARTED = 1;
// 已开始
private static final int ST_STARTED = 2;
// 正在关闭
private static final int ST_SHUTTING_DOWN = 3;
// 已停止
private static final int ST_SHUTDOWN = 4;
private static final int ST_TERMINATED = 5;
其中thread是NioEventLoop绑定的线程,NioEventLoop的所有工作都由该线程来执行,包括选择器的select、作为线程执行task和定时task;每个NioEventLoop在启动后都会绑定一个thread,且整个生命周期不会发生变化。NioEventLoop可以理解为一个线程的原因就在于此属性,后面使用NioEventLoop线程表示该thread。
启动NioEventLoop
可以向NioEventLoop中提交两种任务,懒加载任务和正常任务。懒加载任务表示不着急执行,可以等NioEventLoop可以执行任务的时候再执行(从select阻塞被唤醒后),正常任务提交后,会强制唤醒select阻塞,并执行任务。
懒加载任务不是重点内容,且懒加载任务与正常任务的区别仅在于是否强制唤醒select, 以下为突出主线逻辑,省略这一部分的介绍。
向NioEventLoop提交任务后,进入如下流程:
public void execute(Runnable task) {execute(task, true);
}private void execute(Runnable task, boolean immediate) {boolean inEventLoop = inEventLoop();// 将任务提交到`Queue<Runnable> taskQueue`队列中,等待执行addTask(task);if (!inEventLoop) {startThread();// NioEventLoop是否已停止,停止则拒绝任务,并抛出异常if (isShutdown()) {boolean reject = false;try {if (removeTask(task)) {reject = true;}} catch (UnsupportedOperationException e) {}if (reject) {reject();}}}// addTaskWakesUp由构造函数传入-固定为falseif (!addTaskWakesUp && immediate) {wakeup(inEventLoop);}
}
这段代码的和核心逻辑在于 addTask(task)和startThread(),前者将任务保存至任务队列,后者启动线程。
boolean inEventLoop = inEventLoop()
判断当前线程是否是NioEventLoop线程,只有NioEventLoop线程自己向NioEventLoop提交任务时,才返回true, 其他线程均返回false。
if (!inEventLoop) {startThread();//...
}
inEventLoop为false时才会执行startThread(),因为inEventLoop为true表示当前任务由NioEventLoop线程提交,即NioEventLoop线程已启动,因此不需要再次调用startThread方法(后续inEventLoop判断逻辑也是这个原因,不再介绍)。
wakeup(inEventLoop)
方法如下所示:
protected void wakeup(boolean inEventLoop) {// nextWakeupNanos稍后介绍if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {selector.wakeup();}
}
调用选择器的wakeup()方法强制唤醒阻塞在selector上的NioEventLoop线程。
进入startThread方法:
private void startThread() {if (state == ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {boolean success = false;try {doStartThread();success = true;} finally {if (!success) {STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);}}}}
}
通过状态变量和原子性的操作保证了doStartThread()方法只会执行一次,doStartThread方法的核心逻辑如下:
private void doStartThread() {executor.execute(new Runnable() {@Overridepublic void run() {thread = Thread.currentThread();// ...SingleThreadEventExecutor.this.run();// ...}});
}
前面已介绍过,executor.execute执行一个Runnable任务的时,直接创建一个新的线程,并将Runnable任务提交给这个新创建的线程。通过thread = Thread.currentThread()
将新创建的线程赋值给thread属性,用于NioEventLoop与thread绑定;这个线程继续执行SingleThreadEventExecutor.this.run()从而启动NioEventLoop。
run方法:
该run方法的实现位于NioEventLoop类中,省去框架代码和简化异常逻辑后如下所示:
protected void run() {int selectCnt = 0;for (;;) {try {int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());switch (strategy) {case SelectStrategy.SELECT:long curDeadlineNanos = nextScheduledTaskDeadlineNanos();if (curDeadlineNanos == -1L) {curDeadlineNanos = NONE;}nextWakeupNanos.set(curDeadlineNanos);try {if (!hasTasks()) {strategy = select(curDeadlineNanos);}} finally {nextWakeupNanos.lazySet(AWAKE);}// fall throughdefault:;}} catch (IOException e) {rebuildSelector0();selectCnt = 0;handleLoopException(e);continue;}selectCnt++;cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;boolean ranTasks;if (ioRatio == 100) {try {if (strategy > 0) {processSelectedKeys();}} finally {ranTasks = runAllTasks();}} else if (strategy > 0) {final long ioStartTime = System.nanoTime();try {processSelectedKeys();} finally {final long ioTime = System.nanoTime() - ioStartTime;ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}} else {ranTasks = runAllTasks(0);}if (ranTasks || strategy > 0) {if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",selectCnt - 1, selector);}selectCnt = 0;} else if (unexpectedSelectorWakeup(selectCnt)) {selectCnt = 0;}}
}
先从整体上看,这段代码的结构如下:
protected void run() {int selectCnt = 0;for (;;) {int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());switch (strategy) {case SelectStrategy.SELECT:// select.select()阻塞default:;}//...// 执行就绪的SelectedKeysprocessSelectedKeys();//...// 处理任务队列的任务ranTasks = runAllTasks();//...}
}
通过hasTasks()
判断NioEventLoop的任务队列中是否有任务:如果没有,strategy返回-1,执行select.select()陷入阻塞
(或者返回已就绪的IO事件);如果任务队列中有任务,则执行strategy返回selector.selectNow()
的值(已就绪的IO事件)且不阻塞。
processSelectedKeys()
会遍历已就绪的IO事件,对应SelectionKey(包含通道、选择器、就绪事件信息),依次处理。
runAllTasks()
依次从任务队列取出任务并执行。
说明:hasTasks()
有任务时,执行selector.selectNow()
不阻塞,从而保证了一定会执行runAllTasks()
方法; 无任务时,执行selector.select()
或者selector.select(timeout)
可能陷入阻塞。
processSelectedKeys()牵连内容较多,后续介绍消息处理流程时再进行介绍。
整体上理解run方法逻辑后,再看一下3处细节。
[1] select逻辑
// 获取下一次定时任务执行的时间
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {// 没有定时任务创建curDeadlineNanos = NONE;
}
nextWakeupNanos.set(curDeadlineNanos);
try {if (!hasTasks()) {// 根据是否有定时任务,执行select或者select(timeout)strategy = select(curDeadlineNanos);}
} finally {nextWakeupNanos.lazySet(AWAKE);
}
因为NioEventLoop继承了ScheduledExecutorService, 自然具备定时线程池的能力,因此存在定时任务队列。
执行select前,判断定时任务队列是否有任务:如果没有,则执行selecor.select()陷入持续阻塞状态;如果有,获取下一次定时任务的执行时间,执行selecor.select(timeout), 在定时任务执行时从阻塞中醒来, 保证定时任务按时执行。
[2] 异常逻辑
protected void run() {int selectCnt = 0;for (;;) {try {//...} catch (IOException e) {rebuildSelector0();selectCnt = 0;handleLoopException(e);continue;}//...}
}
当有IO异常发送时,需要重塑选择器,否则这个NioEventLoop对象将处于异常状态:https://github.com/netty/netty/issues/8566.
重塑过程包括:重新创建selector选择器对象,将注册到就选择器对象的通道全部取消,并重新注册到新的选择器上。
[3] ioRatio比率
ioRatio比率用于控制处理IO事件与执行任务队列任务占用CPU的比率。
这篇关于Netty系列-1 NioEventLoopGroup和NioEventLoop介绍的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!