Netty系列-1 NioEventLoopGroup和NioEventLoop介绍

2024-08-25 21:20

本文主要是介绍Netty系列-1 NioEventLoopGroup和NioEventLoop介绍,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

背景

从本文开始开启一个新的专题Netty系列,用于收集Netty相关的文章,内容包含Netty的使用方式、运行原理等。

基于io.netty:netty-all:4.1.49.Final版本进行介绍

1.NioEventLoopGroup

介绍NioEventLoopGroup之前,有几个相关的组件需要提前介绍。

1.1 SelectorProvider

NIO中用于创建通道channel和选择器selector接口的类,本质是对操作系统API进行的封装;属于NIO知识范畴(可参考IO系列文章),不是本文讨论的对象。

1.2 SelectStrategy

选择策略,这里结合接口的定义和实现以及使用进行说明。
接口定义如下:

public interface SelectStrategy {int SELECT = -1;int CONTINUE = -2;int BUSY_WAIT = -3;int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception;
}
// 只有一个get方法,无参地获取一个整型变量。
public interface IntSupplier {int get() throws Exception;
}

SelectStrategy中定义了三个宏定义常量, 其中有个常量SelectStrategy.SELECT值为-1;定义了一个calculateStrategy方法,接收两个参数,并计算出策略结果。

默认实现类如下:

final class DefaultSelectStrategy implements SelectStrategy {static final SelectStrategy INSTANCE = new DefaultSelectStrategy();private DefaultSelectStrategy() {}@Overridepublic int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;}
}

逻辑较为简单:如果hasTasks布尔参数为true, 理解为有任务待执行,则从selectSupplier获取值并返回;如果hasTasks布尔参数为false, 理解为无任务待执行,则直接返回SelectStrategy.SELECT。

使用场景:
为理解方便,再结合SelectStrategy再Netty中的使用场景进行介绍。
在Netty的死循环中,每次循环伊始调用selectStrategy的calculateStrategy方法,计算strategy值,根据strategy值确定进行select阻塞还是执行处理任务:

@Override
protected void run() {//...for (;;) {//...int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());switch (strategy) {case SelectStrategy.SELECT:// 执行select阻塞default://...processSelectedKeys();//...runAllTasks();}//...}//...
}

其中selectNowSupplier的实现如下:

private final IntSupplier selectNowSupplier = new IntSupplier() {@Overridepublic int get() throws Exception {return selectNow();}
};int selectNow() throws IOException {//选择器的selectNow方法,无阻塞地返回已就绪的通道数;没有就绪的通道,否则返回0return selector.selectNow();
}

此时,调用IntSupplier的get()获取的是已就绪(如客户端发送了可读数据)的通道数。
hasTasks()的实现如下:

protected boolean hasTasks() {return super.hasTasks() || !tailTasks.isEmpty();
}protected boolean hasTasks() {return !taskQueue.isEmpty();
}

判断任务队列是否有任务积存。
综上,Netty的死循环中,每次循环判断一下任务队列是否有任务积存,如果有,则执行processSelectedKeys和runAllTasks方法,否则执行select阻塞。

工厂类:
Netty为DefaultSelectStrategy提供了一个工厂类DefaultSelectStrategyFactory:

public final class DefaultSelectStrategyFactory implements SelectStrategyFactory {public static final SelectStrategyFactory INSTANCE = new DefaultSelectStrategyFactory();private DefaultSelectStrategyFactory() { }@Overridepublic SelectStrategy newSelectStrategy() {return DefaultSelectStrategy.INSTANCE;}
}

因此,可通过DefaultSelectStrategyFactory.INSTANCE获取单例DefaultSelectStrategy.INSTANCE对象。

1.3 RejectedExecutionHandler

拒绝处理器,这里结合接口的定义和实现以及使用进行说明。
接口定义:

public interface RejectedExecutionHandler {void rejected(Runnable task, SingleThreadEventExecutor executor);
}

RejectedExecutionHandler仅定义了一个方法,rejected接受两个参数,Runnable任务和SingleThreadEventExecutor线程池对象。
匿名实现类如下:

// RejectedExecutionHandlers.java中// REJECT静态属性
private static final RejectedExecutionHandler REJECT = new RejectedExecutionHandler() {@Overridepublic void rejected(Runnable task, SingleThreadEventExecutor executor) {throw new RejectedExecutionException();}
};// 静态方法,获取REJECT静态属性
public static RejectedExecutionHandler reject() {return REJECT;
}

直接抛出RejectedExecutionException异常。
可通过RejectedExecutionHandlers.reject()获取该RejectedExecutionHandler单例对象。

使用场景:

protected void addTask(Runnable task) {ObjectUtil.checkNotNull(task, "task");if (!offerTask(task)) {reject(task);}
}protected final void reject(Runnable task) {// 抛出RejectedExecutionException异常rejectedExecutionHandler.rejected(task, this);
}

向任务队列(1.2章节中提到的taskQueue和tailTasks队列)中添加任务失败时,调用rejectedExecutionHandler的rejected方法抛出RejectedExecutionException异常。

1.4 EventExecutorChooser

线程选择器,这里结合接口的定义和实现以及使用进行说明。
接口定义:

interface EventExecutorChooser {EventExecutor next();
}

从接口定义可以看出,线程选择器的next()将会从线程池中返回一个线程。
两个实现类:

private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {private final AtomicInteger idx = new AtomicInteger();private final EventExecutor[] executors;PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {this.executors = executors;}@Overridepublic EventExecutor next() {return executors[idx.getAndIncrement() & executors.length - 1];}
}private static final class GenericEventExecutorChooser implements EventExecutorChooser {private final AtomicInteger idx = new AtomicInteger();private final EventExecutor[] executors;GenericEventExecutorChooser(EventExecutor[] executors) {this.executors = executors;}@Overridepublic EventExecutor next() {return executors[Math.abs(idx.getAndIncrement() % executors.length)];}
}

PowerOfTwoEventExecutorChooser和GenericEventExecutorChooser作为EventExecutorChooser实现类,内部都维持了一个 EventExecutor[]线程数组executors对象和AtomicInteger原子计数器idx.
next()方法将根据计数器idx计算出一个下标,根据下标从executors数组中选择一个线程返回。二者的区别是计算下表的方式不同:GenericEventExecutorChooser通过取模获取索引下标;而PowerOfTwoEventExecutorChooser通过位运算(按位与&)来快速计算索引,这种方式在EventLoop数量是2的幂次方时,能够显著提高性能。
工厂方法:

public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();private DefaultEventExecutorChooserFactory() { }@SuppressWarnings("unchecked")@Overridepublic EventExecutorChooser newChooser(EventExecutor[] executors) {// 数组长度是否为2的幂次方if (isPowerOfTwo(executors.length)) {return new PowerOfTwoEventExecutorChooser(executors);} else {return new GenericEventExecutorChooser(executors);}}private static boolean isPowerOfTwo(int val) {return (val & -val) == val;}// ...
}

使用场景:
NioEventLoopGroup可以理解为一个线程池,内部维持了一个NioEventLoop[]线程数组。当NioEventLoopGroup接受任务时,调用next()方法从NioEventLoop[]数组中获取一个NioEventLoop线程对象,并将任务委托给NioEventLoop对象执行,如下所示:

public ChannelFuture register(Channel channel) {return next().register(channel);
}public ChannelFuture register(ChannelPromise promise) {return next().register(promise);
}public ChannelFuture register(Channel channel, ChannelPromise promise) {return next().register(channel, promise);
}public Future<?> submit(Runnable task) {return next().submit(task);
}public <T> Future<T> submit(Runnable task, T result) {return next().submit(task, result);
}public <T> Future<T> submit(Callable<T> task) {return next().submit(task);
}public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {return next().schedule(command, delay, unit);
}public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {return next().schedule(callable, delay, unit);
}public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {return next().scheduleAtFixedRate(command, initialDelay, period, unit);
}public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {return next().scheduleWithFixedDelay(command, initialDelay, delay, unit);
}public void execute(Runnable command) {next().execute(command);
}

1.5 ThreadPerTaskExecutor

ThreadPerTaskExecutor定义如下:

public final class ThreadPerTaskExecutor implements Executor {private final ThreadFactory threadFactory;public ThreadPerTaskExecutor(ThreadFactory threadFactory) {this.threadFactory = ObjectUtil.checkNotNull(threadFactory, "threadFactory");}@Overridepublic void execute(Runnable command) {threadFactory.newThread(command).start();}
}

其中,Executor是JUC中的线程池接口:

public interface Executor {void execute(Runnable command);
}

当有任务提交给ThreadPerTaskExecutor线程池时,会通过threadFactory线程工厂创建一个线程,并将该任务提交给该线程执行。

使用场景:
构造ThreadPerTaskExecutor对象时,需要传入一个线程工厂用于确定是否守护线程,名称、线程属组、优先级等,这里newDefaultThreadFactory构造线程工厂的内容不重要,重点在与包装ThreadPerTaskExecutor对象,并最终赋值给NioEventLoop的this.executor属性。这部分将在NioEventLoop章节中介绍。

1.6 NioEventLoopGroup介绍

本章节从构造函数以及属性的角度介绍NioEventLoopGroup:
可以通过参数指定NioEventLoopGroup使用的线程数量,不指定时使用CPU属性*2作为线程数。

// 使用1个线程
EventLoopGroup bossGroup = new NioEventLoopGroup(1);// cpu*2
EventLoopGroup workerGroup = new NioEventLoopGroup();

无参和带参的构造函数如下:

public NioEventLoopGroup() {this(0);
}public NioEventLoopGroup(int nThreads) {this(nThreads, (Executor) null);
}

进入MultithreadEventLoopGroup构造函数中:

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}

而DEFAULT_EVENT_LOOP_THREADS在类加载阶段已确定:

private static final int DEFAULT_EVENT_LOOP_THREADS;static {DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
}

如果设置了"io.netty.eventLoopThreads"环境变量,则使用这个环境变量指定的值,否则使用CPU数*2.
构造函数通过super逐层调用父类的构造函数进入MultithreadEventExecutorGroup中,删除异常分支后的主线逻辑如下:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) {// 步骤1.构造ThreadPerTaskExecutor对象executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());// 步骤2.创建children数组children = new EventExecutor[nThreads];for (int i = 0; i < nThreads; i ++) {        children[i] = newChild(executor, args);           }// 步骤3.创建线程选择器chooser = chooserFactory.newChooser(children);// 添加terminationListener监听器...// 步骤4.再维护一份只读的childrenSet<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);Collections.addAll(childrenSet, children);readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

步骤1: 创建ThreadPerTaskExecutor线程池对象,将作为参数构造NioEventLoop对象。
步骤2:根据线程数创建children数组,调用newChild依次创建线程,得到的children称为子线程组;
步骤3:将子线程组传递给线程选择器构造函数,创建线程选择器;
步骤4:在NioEventLoopGroup内部维护一份只读的children子线程组;
此后,NioEventLoopGroup拥有了一个NioEventLoop数组对象,以及一个chooser线程选择器,选择器的next()方法会依次从NioEventLoop数组对象中返回一个NioEventLoop对象,用于执行提交给NioEventLoopGroup的任务。

继续看一下newChild方法:

protected EventLoop newChild(Executor executor, Object... args) throws Exception {EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;return new NioEventLoop(this, executor, (SelectorProvider) args[0],((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}

这里传递给NioEventLoop构造参数的对象,如executor(ThreadPerTaskExecutor),SelectorProvider,SelectStrategy,RejectedExecutionHandler就是本章节介绍的组件。
另外,从NioEventLoopGroup的继承体系看,继承了ExecutorService具备了线程池的能力,继承了ScheduledExecutorService具备了定时任务的能力。
可以通过案例测试一下:

public static void main(String[] args) {EventLoopGroup group = new NioEventLoopGroup(4);for (int i = 0;i<10;i++) {group.submit(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("[test]"+Thread.currentThread().getName()+" executed.");});}
}

运行结果如下所示:

[test]nioEventLoopGroup-2-3 executed.
[test]nioEventLoopGroup-2-4 executed.
[test]nioEventLoopGroup-2-1 executed.
[test]nioEventLoopGroup-2-2 executed.
[test]nioEventLoopGroup-2-3 executed.
[test]nioEventLoopGroup-2-4 executed.
[test]nioEventLoopGroup-2-2 executed.
[test]nioEventLoopGroup-2-1 executed.
[test]nioEventLoopGroup-2-1 executed.
[test]nioEventLoopGroup-2-2 executed.

可以看出,group内部维护了4个线程,依次将任务提交给这四个线程处理。

2.NioEventLoop

如果将NioEventLoopGroup理解为线程池,NioEventLoop就是实际干活的线程, 且一旦启动后就不断地循环干活,如下所示:
在这里插入图片描述
NioEventLoop执行选择器的select方法阻塞后陷入阻塞,直到有任务或者IO事件才会唤醒NioEventLoop,依次处理IO事件、线程任务队列的任务,然后再次执行选择器的select方法阻塞后陷入阻塞,循环往复。
先从整体上对NioEventLoop的功能了解后,以下结合Netty源码对NioEventLoop进行详细介绍。

2.1 构造NioEventLoop

NioEventLoop构造函数:
继续NioEventLoopGroup中通过newChild方法构造NioEventLoop对象,进入NioEventLoop的构造函数:

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,EventLoopTaskQueueFactory queueFactory) {super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),rejectedExecutionHandler);this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");final SelectorTuple selectorTuple = openSelector();this.selector = selectorTuple.selector;this.unwrappedSelector = selectorTuple.unwrappedSelector;
}

参数parent为NioEventLoopGroup, executor/selectorProvider/strategy/rejectedExecutionHandler在NioEventLoopGroup中已介绍过,queueFactory此时为空,将使用Netty默认的队列工厂生产队列,队列用于存放提交给NioEventLoop的任务。
NioEventLoop属性:

随着父类构造器的不断调用,NioEventLoop将拥有以下重要属性:

// 所属的NioEventLoopGroup对象
private final EventExecutorGroup parent;// 用于创建NIO通道和选择器的SelectorProvider对象
private final SelectorProvider provider;// 多路复用选择器
private Selector selector;// 线程池
private final Executor executor;// 两个任务队列
private final Queue<Runnable> taskQueue;
private final Queue<Runnable> tailTasks;// 拒绝处理器
private final RejectedExecutionHandler rejectedExecutionHandler;// 选择策略
private final SelectStrategy selectStrategy;

parent属性记录所属的NioEventLoopGroup对象;
provider用于后续创建NIO的通道和选择器;
selector多路复用选择器,Netty底层是依赖于NIO实现的,需要依赖selector;
taskQueue和tailTasks任务队列,用于存放待执行的任务(提交给NioEventLoop的任务);
rejectedExecutionHandler拒绝处理器,当向队列添加任务失败时,调用拒绝处理器抛出异常;
selectStrategy选择处理器,根据是否有任务确定是执行select阻塞还是执行任务。
executor属性需要详细介绍一下:

this.executor = ThreadExecutorMap.apply(executor, this);

创建给ThreadExecutorMap.apply方法2个参数:ThreadPerTaskExecutor类型的线程池executor对象,对于提交的每个任务都会创建一个线程,并将任务委托给该线程处理;另外一个参数是当前NioEventLoop对象。

public static Executor apply(final Executor executor,final EventExecutor eventExecutor) {return new Executor() {@Overridepublic void execute(final Runnable command) {executor.execute(apply(command, eventExecutor));}};
}

该方法将返回一个线程池对象,当向该线程池对象提交任务时(调用execute并传递Runnable时),将调用ThreadPerTaskExecutor执行apply(command, eventExecutor)返回的任务,apply对该任务进行了增强:

public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {return new Runnable() {@Overridepublic void run() {setCurrentEventExecutor(eventExecutor);try {command.run();} finally {setCurrentEventExecutor(null);}}};
}

这是常见的ThreadLocal写法:

private static final FastThreadLocal<EventExecutor> mappings = new      FastThreadLocal<EventExecutor>();private static void setCurrentEventExecutor(EventExecutor executor) {mappings.set(executor);
}

此时,在任务的前后进行了增强,将NioEventLoop保存到mappings中,并在任务执行完后删除。

总之,this.executor是一个Executor类,当该类的execute(Runnable command)方法被调用时,ThreadPerTaskExecutor会创建一个线程来执行这个任务,且这个任务前后将NioEventLoop对象添加到ThreadLocal中。
ThreadPerTaskExecutor创建的这个线程将会保存在NioEventLoop的thread属性中,每个NioEventLoop都会绑定一个线程,具体逻辑在下一节进行介绍。

2.2 启动NioEventLoop

NioEventLoopGroup group = new NioEventLoopGroup(4);

在NioEventLoopGroup对象以及NioEventLoop[]数组(以及数组元素)创建完成后,所有的NioEventLoop处于未启动状态,向其提交任务时会启动NioEventLoop循环。
可通过如下方式, 启动一个NioEventLoop:

Runnable task = () -> System.out.println(Thread.currentThread().getName());
group.submit(task);// 或者
NioEventLoop next = (NioEventLoop)group.next();
next.submit(task);

分析:向NioEventLoopGroup提交任务时,NioEventLoopGroup会通过next()方法获取一个子NioEventLoop并将任务提交给改子NioEventLoop,因此上述两种方式完全一致。

状态变量和状态值:

在NioEventLoop(以及其父类)内部定义了两个状态变量,如下所示:

// NioEventLoop绑定的线程
private volatile Thread thread;//NioEventLoop状态, 初始状态为未启动
private volatile int state = ST_NOT_STARTED;//状态值定义如下:
// 未开始
private static final int ST_NOT_STARTED = 1;
// 已开始
private static final int ST_STARTED = 2;
// 正在关闭
private static final int ST_SHUTTING_DOWN = 3;
// 已停止
private static final int ST_SHUTDOWN = 4;
private static final int ST_TERMINATED = 5;

其中thread是NioEventLoop绑定的线程,NioEventLoop的所有工作都由该线程来执行,包括选择器的select、作为线程执行task和定时task;每个NioEventLoop在启动后都会绑定一个thread,且整个生命周期不会发生变化。NioEventLoop可以理解为一个线程的原因就在于此属性,后面使用NioEventLoop线程表示该thread。

启动NioEventLoop

可以向NioEventLoop中提交两种任务,懒加载任务和正常任务。懒加载任务表示不着急执行,可以等NioEventLoop可以执行任务的时候再执行(从select阻塞被唤醒后),正常任务提交后,会强制唤醒select阻塞,并执行任务。

懒加载任务不是重点内容,且懒加载任务与正常任务的区别仅在于是否强制唤醒select, 以下为突出主线逻辑,省略这一部分的介绍。

向NioEventLoop提交任务后,进入如下流程:

public void execute(Runnable task) {execute(task, true);
}private void execute(Runnable task, boolean immediate) {boolean inEventLoop = inEventLoop();// 将任务提交到`Queue<Runnable> taskQueue`队列中,等待执行addTask(task);if (!inEventLoop) {startThread();// NioEventLoop是否已停止,停止则拒绝任务,并抛出异常if (isShutdown()) {boolean reject = false;try {if (removeTask(task)) {reject = true;}} catch (UnsupportedOperationException e) {}if (reject) {reject();}}}// addTaskWakesUp由构造函数传入-固定为falseif (!addTaskWakesUp && immediate) {wakeup(inEventLoop);}
}

这段代码的和核心逻辑在于 addTask(task)和startThread(),前者将任务保存至任务队列,后者启动线程。

boolean inEventLoop = inEventLoop() 判断当前线程是否是NioEventLoop线程,只有NioEventLoop线程自己向NioEventLoop提交任务时,才返回true, 其他线程均返回false。

if (!inEventLoop) {startThread();//...
}

inEventLoop为false时才会执行startThread(),因为inEventLoop为true表示当前任务由NioEventLoop线程提交,即NioEventLoop线程已启动,因此不需要再次调用startThread方法(后续inEventLoop判断逻辑也是这个原因,不再介绍)。

wakeup(inEventLoop)方法如下所示:

protected void wakeup(boolean inEventLoop) {// nextWakeupNanos稍后介绍if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {selector.wakeup();}
}

调用选择器的wakeup()方法强制唤醒阻塞在selector上的NioEventLoop线程。
进入startThread方法:

private void startThread() {if (state == ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {boolean success = false;try {doStartThread();success = true;} finally {if (!success) {STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);}}}}
}

通过状态变量和原子性的操作保证了doStartThread()方法只会执行一次,doStartThread方法的核心逻辑如下:

private void doStartThread() {executor.execute(new Runnable() {@Overridepublic void run() {thread = Thread.currentThread();// ...SingleThreadEventExecutor.this.run();// ...}});
}

前面已介绍过,executor.execute执行一个Runnable任务的时,直接创建一个新的线程,并将Runnable任务提交给这个新创建的线程。通过thread = Thread.currentThread()将新创建的线程赋值给thread属性,用于NioEventLoop与thread绑定;这个线程继续执行SingleThreadEventExecutor.this.run()从而启动NioEventLoop。

run方法:
该run方法的实现位于NioEventLoop类中,省去框架代码和简化异常逻辑后如下所示:

protected void run() {int selectCnt = 0;for (;;) {try {int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());switch (strategy) {case SelectStrategy.SELECT:long curDeadlineNanos = nextScheduledTaskDeadlineNanos();if (curDeadlineNanos == -1L) {curDeadlineNanos = NONE;}nextWakeupNanos.set(curDeadlineNanos);try {if (!hasTasks()) {strategy = select(curDeadlineNanos);}} finally {nextWakeupNanos.lazySet(AWAKE);}// fall throughdefault:;}} catch (IOException e) {rebuildSelector0();selectCnt = 0;handleLoopException(e);continue;}selectCnt++;cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;boolean ranTasks;if (ioRatio == 100) {try {if (strategy > 0) {processSelectedKeys();}} finally {ranTasks = runAllTasks();}} else if (strategy > 0) {final long ioStartTime = System.nanoTime();try {processSelectedKeys();} finally {final long ioTime = System.nanoTime() - ioStartTime;ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}} else {ranTasks = runAllTasks(0);}if (ranTasks || strategy > 0) {if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",selectCnt - 1, selector);}selectCnt = 0;} else if (unexpectedSelectorWakeup(selectCnt)) {selectCnt = 0;}}
}

先从整体上看,这段代码的结构如下:

protected void run() {int selectCnt = 0;for (;;) {int strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());switch (strategy) {case SelectStrategy.SELECT:// select.select()阻塞default:;}//...// 执行就绪的SelectedKeysprocessSelectedKeys();//...// 处理任务队列的任务ranTasks = runAllTasks();//...}
}

通过hasTasks()判断NioEventLoop的任务队列中是否有任务:如果没有,strategy返回-1,执行select.select()陷入阻塞(或者返回已就绪的IO事件);如果任务队列中有任务,则执行strategy返回selector.selectNow()的值(已就绪的IO事件)且不阻塞。
processSelectedKeys()会遍历已就绪的IO事件,对应SelectionKey(包含通道、选择器、就绪事件信息),依次处理。
runAllTasks()依次从任务队列取出任务并执行。
说明hasTasks()有任务时,执行selector.selectNow()不阻塞,从而保证了一定会执行runAllTasks()方法; 无任务时,执行selector.select()或者selector.select(timeout)可能陷入阻塞。

processSelectedKeys()牵连内容较多,后续介绍消息处理流程时再进行介绍。

整体上理解run方法逻辑后,再看一下3处细节。
[1] select逻辑

// 获取下一次定时任务执行的时间
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {// 没有定时任务创建curDeadlineNanos = NONE;
}
nextWakeupNanos.set(curDeadlineNanos);
try {if (!hasTasks()) {// 根据是否有定时任务,执行select或者select(timeout)strategy = select(curDeadlineNanos);}
} finally {nextWakeupNanos.lazySet(AWAKE);
}

因为NioEventLoop继承了ScheduledExecutorService, 自然具备定时线程池的能力,因此存在定时任务队列。
执行select前,判断定时任务队列是否有任务:如果没有,则执行selecor.select()陷入持续阻塞状态;如果有,获取下一次定时任务的执行时间,执行selecor.select(timeout), 在定时任务执行时从阻塞中醒来, 保证定时任务按时执行。

[2] 异常逻辑

protected void run() {int selectCnt = 0;for (;;) {try {//...} catch (IOException e) {rebuildSelector0();selectCnt = 0;handleLoopException(e);continue;}//...}
}

当有IO异常发送时,需要重塑选择器,否则这个NioEventLoop对象将处于异常状态:https://github.com/netty/netty/issues/8566.
重塑过程包括:重新创建selector选择器对象,将注册到就选择器对象的通道全部取消,并重新注册到新的选择器上。
[3] ioRatio比率
ioRatio比率用于控制处理IO事件与执行任务队列任务占用CPU的比率。

这篇关于Netty系列-1 NioEventLoopGroup和NioEventLoop介绍的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1106689

相关文章

四种Flutter子页面向父组件传递数据的方法介绍

《四种Flutter子页面向父组件传递数据的方法介绍》在Flutter中,如果父组件需要调用子组件的方法,可以通过常用的四种方式实现,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录方法 1:使用 GlobalKey 和 State 调用子组件方法方法 2:通过回调函数(Callb

Python进阶之Excel基本操作介绍

《Python进阶之Excel基本操作介绍》在现实中,很多工作都需要与数据打交道,Excel作为常用的数据处理工具,一直备受人们的青睐,本文主要为大家介绍了一些Python中Excel的基本操作,希望... 目录概述写入使用 xlwt使用 XlsxWriter读取修改概述在现实中,很多工作都需要与数据打交

java脚本使用不同版本jdk的说明介绍

《java脚本使用不同版本jdk的说明介绍》本文介绍了在Java中执行JavaScript脚本的几种方式,包括使用ScriptEngine、Nashorn和GraalVM,ScriptEngine适用... 目录Java脚本使用不同版本jdk的说明1.使用ScriptEngine执行javascript2.

Python实现NLP的完整流程介绍

《Python实现NLP的完整流程介绍》这篇文章主要为大家详细介绍了Python实现NLP的完整流程,文中的示例代码讲解详细,具有一定的借鉴价值,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1. 编程安装和导入必要的库2. 文本数据准备3. 文本预处理3.1 小写化3.2 分词(Tokenizatio

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

性能测试介绍

性能测试是一种测试方法,旨在评估系统、应用程序或组件在现实场景中的性能表现和可靠性。它通常用于衡量系统在不同负载条件下的响应时间、吞吐量、资源利用率、稳定性和可扩展性等关键指标。 为什么要进行性能测试 通过性能测试,可以确定系统是否能够满足预期的性能要求,找出性能瓶颈和潜在的问题,并进行优化和调整。 发现性能瓶颈:性能测试可以帮助发现系统的性能瓶颈,即系统在高负载或高并发情况下可能出现的问题

水位雨量在线监测系统概述及应用介绍

在当今社会,随着科技的飞速发展,各种智能监测系统已成为保障公共安全、促进资源管理和环境保护的重要工具。其中,水位雨量在线监测系统作为自然灾害预警、水资源管理及水利工程运行的关键技术,其重要性不言而喻。 一、水位雨量在线监测系统的基本原理 水位雨量在线监测系统主要由数据采集单元、数据传输网络、数据处理中心及用户终端四大部分构成,形成了一个完整的闭环系统。 数据采集单元:这是系统的“眼睛”,

Hadoop数据压缩使用介绍

一、压缩原则 (1)运算密集型的Job,少用压缩 (2)IO密集型的Job,多用压缩 二、压缩算法比较 三、压缩位置选择 四、压缩参数配置 1)为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器 2)要在Hadoop中启用压缩,可以配置如下参数

科研绘图系列:R语言扩展物种堆积图(Extended Stacked Barplot)

介绍 R语言的扩展物种堆积图是一种数据可视化工具,它不仅展示了物种的堆积结果,还整合了不同样本分组之间的差异性分析结果。这种图形表示方法能够直观地比较不同物种在各个分组中的显著性差异,为研究者提供了一种有效的数据解读方式。 加载R包 knitr::opts_chunk$set(warning = F, message = F)library(tidyverse)library(phyl

图神经网络模型介绍(1)

我们将图神经网络分为基于谱域的模型和基于空域的模型,并按照发展顺序详解每个类别中的重要模型。 1.1基于谱域的图神经网络         谱域上的图卷积在图学习迈向深度学习的发展历程中起到了关键的作用。本节主要介绍三个具有代表性的谱域图神经网络:谱图卷积网络、切比雪夫网络和图卷积网络。 (1)谱图卷积网络 卷积定理:函数卷积的傅里叶变换是函数傅里叶变换的乘积,即F{f*g}