本文主要是介绍netty 旷视科技_Netty中NioEventLoop源码分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
准备赶趟上春招实习的车,刚开始学netty,感觉项目很简单,所以就想看看源码,非科班看这些是真吃力。。放点小笔记,都是站在巨人的肩膀上总结的(小抄),加油!
前置芝士:
需要知道Executor执行器的一些操作。
Demo
public class NettyServer {
int port;
public NettyServer(int port) {
this.port = port;
}
public void start() {
ServerBootstrap bootstrap = new ServerBootstrap();
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup work = new NioEventLoopGroup();
try {
bootstrap.group(boss, work)
.handler(new LoggingHandler(LogLevel.DEBUG))
.channel(NioServerSocketChannel.class)
.childHandler(new ChatRoomServerInitializer());
ChannelFuture f = bootstrap.bind(new InetSocketAddress(port)).sync();
System.out.println("http server started. port : " + port);
f.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
boss.shutdownGracefully();
work.shutdownGracefully();
}
}
public static void main(String[] args) {
NettyServer server = new NettyServer(8080);// 8080为启动端口
server.start();
}
}
上面是一段比较简单的Netty服务端的代码,我们主要关注:
EventLoopGroup boss = new NioEventLoopGroup(1); // 用于新连接接入的Group,初始化为1
EventLoopGroup work = new NioEventLoopGroup(); // 用于处理channel中的io事件以及任务的group
NioEventLoopGroup初始化过程
跟进到上述构造函数中,最后会来到MultithreadEventLoopGroup 类中的构造函数:
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
传入的参数就是指定Group的大小,默认大小 DEFAULT_EVENT_LOOP_THREADS 是 Runtime.getRuntime().availableProcessors() * 2 也就是两倍的CPU数。
继续跟会来到:
# MultithreadEventExecutorGroup.java
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
// 这个executor是group所包含的executor,其将来会为其所包含的每个eventLoop创建一个线程
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 创建eventLoop
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
// 有创建失败的eventLoop就关闭所有之前创建的
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
// 创建选择器
chooser = chooserFactory.newChooser(children);
final FutureListener terminationListener = new FutureListener() {
@Override
public void operationComplete(Future future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set childrenSet = new LinkedHashSet(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
从类名就可以知道这是多线程的线程组,这里主要完成几件事情:
new ThreadPerTaskExecutor() [线程创建器]
for(){ new Child() } [构造NioEventLoop]
chooserFactory.newChooser() [线程选择器]
线程创建器
先看看名字,是给每个任务创建一个线程的线程创建器,其保存在NioEventGroup中的executor中。主要是为每一个NioEventLoop创建一个对应的线程,1:1。
# ThreadPerTaskExecutor.java
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
// 传入一个线程工厂
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
// 在执行exector的execute()方法时,调用线程工厂创建线程,并start()
threadFactory.newThread(command).start();
}
}
上述代码其实就是初始化NioEventGroup中的executor为一个线程工厂,通过之后调用execute()方法为将来的NioEventLoop创建线程来一一对应。
打住,先来看看NioEventLoop的继承关系:
这图挂了。。
可知NioEventLoop本身就是一个单线程的EventExecutor,因此有下面创建线程组数组
children = new EventExecutor[nThreads];
而实例化创建EventLoop在函数newChild()中。
构造NioEventLoop
我们跟进到构造NioEventLoop的函数newChild():
# NioEventLoopGroup.java
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
继续跟来到NioEventLoop的构造函数:
# NioEventLoop.java
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
// 父类单线程的构造方法,传入的参数executor是group中的executor
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
selector = openSelector(); // 创建selector事件轮询器到NioEventLoop上
selectStrategy = strategy;
}
先跟进父类的构造方法:
# SingleThreadEventLoop.java
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
tailTasks = newTaskQueue(maxPendingTasks); // 创建一个Mpsc任务队列,多生产者,单个消费者的任务队列
}
上述代码父类构造除了继续调用父类构造外,创建了一个Mpsc任务队列,外部线程的任务会被加入到这个任务中,保证只有一个线程去处理这些任务,保证线程安全。
# SingleThreadEventExecutor.java
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
// 这是当前NioEventLoop所包含的executor
this.executor = ObjectUtil.checkNotNull(executor, "executor");
taskQueue = newTaskQueue(this.maxPendingTasks); // 创建一个普通任务队列
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
看博客说关于wakeup的代码比较旧,就不看了。😁
最终父类会创建一个单线程的线程创建器SingleThreadEventExecutor,除此之外,还保存了executor到了当前NioEventLoop中,也就是前面一路下来的group中的executor,帮你回忆一下:
children[i] = newChild(executor, args);
保存这个executor主要是为了之后调用NioEventLoop的execute()方法时其实就是调用传入的这个执行器,也就是executor.execute()。
然后是创建事件轮询器:
selector = openSelector();
# NioEventLoop.java
// 只截取了关心的一些代码
private Selector openSelector() {
final Selector selector;
try {
selector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
if (DISABLE_KEYSET_OPTIMIZATION) {
return selector;
}
... ...
}
其中这个provider是一个java.nio中的SelectorProvider,也就是调用jdk创建了一个selector绑定到NioEventLoop上。
线程选择器
# MultithreadEventExecutorGroup.java
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
在构造函数中,选择器参数传入的是一个默认选择器工厂的实例(单例模式)。
chooser = chooserFactory.newChooser(children);
将线程组交给线程选择器,跟进到chooserFactory.newChooser()
# DefaultEventExecutorChooserFactory.java
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
// 单例
public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();
private DefaultEventExecutorChooserFactory() { }
@SuppressWarnings("unchecked")
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTowEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
// 判断长度是否是2的幂
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}
// 如果是就用这个分配
private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTowEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1]; // 用与运算
}
}
// 否则就是普通的分配
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
}
选择器策略是一个一个往后分配,循环遍历整个线程组给新连接绑定对应的NioEventLoop,实际的调用在MultithreadEventExecutorGroup.next()。
还做了以下的优化:
先判断线程组数组的长度是否是2的幂
如果是,则调用PowerOfTowEventExecutorChooser(),使用的是位运算替代%,效率比较高
否则就是GenericEventExecutorChooser()
NioEventLoop启动过程
NioEventLoop启动触发器:
服务端启动绑定端口
新连接接入通过chooser绑定一个NioEventLoop
以绑定端口为例跟一下启动过程,先说一下总的逻辑:
bind() -> execute(task) [入口]
startThread() -> doStartThread() [创建线程]
ThreadPerTaskExecutor.execute()
thread = Thread.currentThread()
NioEventLoop.run() [启动]
先跟进到入口,bind方法:
# AbstractBootstrap.java
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
将端口绑定作为一个Runnable任务去调用NioEventLoop.execute()方法,具体实现在父类中。
# SingleThreadEventExecutor.java
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
先留意下inEventLoop()这个方法,能够解决netty的异步串行无锁化。
直接跟到startThread()中,还是在这个类中:
private void startThread() {
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { // 如果线程状态为未启动,就cas设置成启动状态,然后执行下面方法
doStartThread();
}
}
}
// 只贴关心的代码
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread(); // 保留当前线程信息,绑定端口任务一开始的线程就是main线程
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run(); // for循环执行任务队列的任务
success = true;
} catch (Throwable t) {
... ...
}
... ...
}
... ...
}
}
上述代码中的executor是NioEventLoop所包含的那个,根据之前创建过程可以知道,这个execute最后会调用NioEventLoopGroup中的execute方法。
NioEventLoop执行流程
先说一下总的执行逻辑:
run() -> for(;;)
select() [检查是否有io事件]
processSelectedKeys() [处理io事件]
runAllTasks() [处理异步任务队列]
run()方法中有一个for循环,一共做三件事,select注册到轮询器上的channel中的io事件,然后调用processSelectedKeys()处理轮询出来的io事件,runAllTasks()处理外部线程扔到taskqueue中的任务。
跟进到SingleThreadEventExecutor.this.run():
# NioEventLoop.java
@Override
protected void run() {
for (;;) {
try {
// selector选择一个已经就绪的channel
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT: // SelectStrategy.SELECT == -1,说明此时任务队列中没有任务
// 阻塞式选择
select(wakenUp.getAndSet(false));
... ...
if (wakenUp.get()) {
selector.wakeup();
}
default:
// fallthrough
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio; // 这是一个处理io和处理任务队列任务的比值
if (ioRatio == 100) {
try {
processSelectedKeys(); // 处理就绪channel的io
} finally {
// Ensure we always run tasks.
runAllTasks(); // 执行任务队列中的任务
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
ioRatio默认是50,说明处理io和任务处理时间是1:1的。
接下来主要讲一下之前提到的三个过程。
select()方法执行逻辑
这一段逻辑就是在以下代码:
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
跟进到主要方法select()中,还是在这个类中:
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
// 判断是否超时
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
// 这一段的逻辑就是如果当前超时了,说明有定时任务要执行,那么如果一次都没有选择过,就执行一次非阻塞的选择,并且将选择计数器加1,break
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
// If a task was submitted when wakenUp value was true, the task didn't get a chance to call
// Selector#wakeup. So we need to check task queue again before executing select operation.
// If we don't, the task might be pended until select operation was timed out.
// It might be pended until idle timeout if IdleStateHandler existed in pipeline.
// 异步任务队列中是否有任务,如果有任务的话,就直接执行,并+1返回
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
// 如果没有定时任务,没有超时,且任务队列中没有任务,就执行阻塞式select,超时时间1s
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
// 如果轮询到了时间 | select被外部线程唤醒 | 有任务 | 有定时任务,当前select操作就终止
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
if (Thread.interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
// 执行到这里说明以及进行了一次阻塞式的select操作
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
// 如果阻塞时间到了,就说明执行了一次阻塞式select,那么计数器就是1
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// 否则没有到阻塞时间,那么说明没有进行阻塞就返回了,执行了空轮询,空轮询到一定阈值就会rebuildSelector()
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn(
"Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector);
rebuildSelector();
selector = this.selector;
// Select again to populate selectedKeys.
selector.selectNow();
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
// Harmless exception - log anyway
}
}
注意:
delayNanos(currentTimeNanos):用来计算当前定时任务队列第一个定时任务还有多久执行。
selector.selectNow():非阻塞选择
看了博客补充一下:Selector 的阻塞选择和非阻塞选择的区别就是,非阻塞选择在当前 select方法执行时判断循环判断所有的 channel 是否就绪并返回所有的就绪数量,而阻塞式选择则是阻塞指定时间直至阻塞时间内获取到就绪 channel 或者阻塞时间超时时立刻返回。
这个select()方法主要干了几件事情:
第一件事情,deadline以及任务穿插逻辑处理:
首先计算deadline,也就是定时任务的执行时间,计算出一个超时时间timeoutMillis,也就是距离最近一次定时任务开始的时间,如果小于0,说明要执行定时任务,则执行一次非阻塞的选择:
// 这一段的逻辑就是如果当前超时了,说明有定时任务要执行,那么如果一次都没有选择过,就执行一次非阻塞的选择,并且将选择计数器加1,break
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
第二件事,select阻塞式选择:
// 如果没有定时任务,没有超时,且任务队列中没有任务,就执行阻塞式select,超时时间1s
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
// 如果轮询到了时间 | select被外部线程唤醒 | 有任务 | 有定时任务,当前select操作就终止
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
第三件事,解决jdk空轮询bug:
// 执行到这里说明以及进行了一次阻塞式的select操作
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
// 如果阻塞时间到了,就说明执行了一次阻塞式select,那么计数器就是1
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// 否则没有到阻塞时间,那么说明没有进行阻塞就返回了,执行了空轮询,空轮询到一定阈值就会rebuildSelector()
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn(
"Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector);
rebuildSelector();
selector = this.selector;
// Select again to populate selectedKeys.
selector.selectNow();
selectCnt = 1;
break;
}
其中阈值SELECTOR_AUTO_REBUILD_THRESHOLD默认512,因此这个selectCnt主要就是用来记录空轮询的次数。
这个解决空轮询的方法其实是有点乐观的,他并没有从根源上解决, 而是rebuildSelector(),期盼着下一次能够不出现空轮询。
processSelectedKey()执行逻辑
NioEventLoop的第二个过程就是处理检测到的io事件。
先来看看netty对于selectKey做了什么小动作。
select操作每次会把就绪状态的io事件添加到底层的hashset当中,而netty会通过反射把这个hashset修改成数组,这样添加的操作就是O(1)的时间复杂度,具体过程如下:
# NioEventLoop.java
private Selector openSelector() {
final Selector selector;
try {
selector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
// 如果不需要优化,就直接返回原生的selector,默认为false
if (DISABLE_KEYSET_OPTIMIZATION) {
return selector;
}
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
... ...
}
在之前创建事件轮询器的代码中,做了这方面的优化,他用一个SelectedSelectionKeySet来替换底层的keyset的数据结构:
# SelectedSelectionKeySet.java
final class SelectedSelectionKeySet extends AbstractSet {
private SelectionKey[] keysA;
private int keysASize;
private SelectionKey[] keysB;
private int keysBSize;
private boolean isA = true;
SelectedSelectionKeySet() {
keysA = new SelectionKey[1024];
keysB = keysA.clone();
}
@Override
public boolean add(SelectionKey o) {
if (o == null) {
return false;
}
if (isA) {
int size = keysASize;
keysA[size ++] = o;
keysASize = size;
if (size == keysA.length) {
doubleCapacityA();
}
} else {
int size = keysBSize;
keysB[size ++] = o;
keysBSize = size;
if (size == keysB.length) {
doubleCapacityB();
}
}
return true;
}
...
}
这个数据结构其实就一个方法有用就是add()方法,是由一个数组和size的方法实现添加的,原来的HashSet的添加在最坏情况下为O(n)。
final Class> selectorImplClass = (Class>) maybeSelectorImplClass;
Object maybeException = AccessController.doPrivileged(new PrivilegedAction() {
@Override
public Object run() {
try {
// 获取到成员变量selectedKeys和publicSelectedKeys
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
selectedKeysField.setAccessible(true); // 允许修改
publicSelectedKeysField.setAccessible(true);
selectedKeysField.set(selector, selectedKeySet); // 进行替换
publicSelectedKeysField.set(selector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e;
} catch (IllegalAccessException e) {
return e;
} catch (RuntimeException e) {
// JDK 9 can throw an inaccessible object exception here; since Netty compiles
// against JDK 7 and this exception was only added in JDK 9, we have to weakly
// check the type
if ("java.lang.reflect.InaccessibleObjectException".equals(e.getClass().getName())) {
return e;
} else {
throw e;
}
}
}
});
if (maybeException instanceof Exception) {
selectedKeys = null;
Exception e = (Exception) maybeException;
logger.trace("failed to instrument a special java.util.Set into: {}", selector, e);
} else {
selectedKeys = selectedKeySet;
logger.trace("instrumented a special java.util.Set into: {}", selector);
}
return selector;
还是在这个方法中,当我们构造好这个数组后,通过反射,修改原来selector中的属性selectedKeys和publicSelectedKeys为上面构造好的数组selectedKeySet(一个披着set皮的array)。
在openSelector()方法的最后也将selectedKeySet保存成一个NioEventLoop的成员变量。
对selected keySet优化完后,开始对这些就绪事件进行处理,调用processSelectedKeysOptimized(),跟到run()方法中的processSelectedKey():
# NioEventLoop.java
private void processSelectedKeys() {
if (selectedKeys != null) { // 这个key是优化过的key
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
selectedKeys.flip():返回底层的数组,也就是selectedKeys背后真正的keysA数组,继续跟进 :
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i ++) {
final SelectionKey k = selectedKeys[i];
if (k == null) {
break;
}
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys[i] = null; // 便于GC
final Object a = k.attachment(); // 拿到key的attachment,也就是一个经过netty封装的channel
if (a instanceof AbstractNioChannel) { // 如果是netty的channel
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask task = (NioTask) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
for (;;) {
i++;
if (selectedKeys[i] == null) {
break;
}
selectedKeys[i] = null;
}
selectAgain();
// Need to flip the optimized selectedKeys to get the right reference to the array
// and reset the index to -1 which will then set to 0 on the for loop
// to start over again.
//
// See https://github.com/netty/netty/issues/1523
selectedKeys = this.selectedKeys.flip();
i = -1;
}
}
}
显然需要继续跟进到方法processSelectedKey(k, (AbstractNioChannel) a)中:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); // unsafe也是和channel唯一绑定的
if (!k.isValid()) { // 如果key不合法,需要关闭channel
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registerd to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop != this || eventLoop == null) {
return;
}
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
// 判断就绪事件类型
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { // 主要关心read和accept事件
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
看了别人博客的总结,这段逻辑就是处理就绪 channel 的 IO 事件的逻辑:
判断当前SelectionKey是否有效。失效结束处理并关闭资源。
判断当前 channel的关注事件,针对处理:获取SelectionKey的 readyOps。这里的判断逻辑都是使用高效的位运算。readyOps 为当前 SelectionKey 的就绪的事件类型。
(readyOps & SelectionKey.OP_CONNECT) != 0:连接就绪事件
这个事件在 server 端不会关注,只有 client 用来连接 server 时才会关注连接就绪事件。
连接就绪后,获取当前SelectionKey的interestOps值,将当前interestOps值修改后,调用底层 unsafe连接server
(readyOps & SelectionKey.OP_WRITE) != 0:写就绪事件
当前 channel 关注的是写就绪事件,此时写操作已经就绪,所以直接调用unsafe将数据写入网卡缓存。
(readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0 :当前channel关注的是读就绪事件,或者前面因为有新增任务而触发的就绪channel处理逻辑,只有因为任务触发的情况下 readyOps才可能会是 0 ,readyOps = 0 意味着没有就绪channel。
直接调用 unsafe 继续读操作,将网卡缓存的数据读取到用户空间。如果是 readyOps = 0 的情况相当于网卡缓存并没有就绪数据,则时进行的读操作不会读取到数据。
unsafe是个啥玩意?现在只知道Unsafe类使Java拥有了像C语言的指针一样操作内存空间的能力,等我有点B树了再说。
runAllTasks()执行逻辑
三件事:对task进行分类和添加、对任务进行聚合、执行任务
第一件事,对task进行分类和添加:
在之前说execute方法时,代码如下:
# SingleThreadEventExecutor.java
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop(); // 是否是外部线程
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
先判断是否是外部线程调用的execute方法,如果是就先startThread();,再将其加入到创建NioEventLoop时创建的任务队列MpscQueue中。
除了普通任务队列还有一个定时任务队列:
# AbstractScheduledEventExecutor.java
ScheduledFuture schedule(final ScheduledFutureTask task) {
if (inEventLoop()) {
scheduledTaskQueue().add(task);
} else {
execute(new Runnable() {
@Override
public void run() {
scheduledTaskQueue().add(task);
}
});
}
return task;
对于定时任务,这里同样进行了判断,是否是外部线程,如果是外部线程在,则调用execute()方法进行线程安全的操作,即现startThread(),再添加任务,保证只有一个线程进行处理,即都在NioEventLoop中处理。
这是为什么呢?这是因为scheduledTaskQueue的实现是非线程安全的,普通优先队列:
Queue> scheduledTaskQueue() {
if (scheduledTaskQueue == null) {
scheduledTaskQueue = new PriorityQueue>();
}
return scheduledTaskQueue;
}
第二件事,任务的聚合:
# SingleThreadEventExecutor.java
protected boolean runAllTasks(long timeoutNanos) {
fetchFromScheduledTaskQueue(); // 任务的聚合
Runnable task = pollTask();
if (task == null) {
afterRunningAllTasks();
return false;
}
... ...
return true;
}
private boolean fetchFromScheduledTaskQueue() {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
Runnable scheduledTask = pollScheduledTask(nanoTime); // 取定时任务队列中当前需要允许的任务
while (scheduledTask != null) {
if (!taskQueue.offer(scheduledTask)) {
// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
scheduledTaskQueue().add((ScheduledFutureTask>) scheduledTask);
return false;
}
scheduledTask = pollScheduledTask(nanoTime);
}
return true;
}
上述while代码中的逻辑,定时任务不为空,即有定时任务:
先讲定时任务添加到普通任务队列中;
如果添加失败,则添加回定时任务队列中,因为之前取的时候poll了,并返回false;
成功添加后,继续从定时任务中取定时任务;
while循环结束,所有定时任务都被添加到了普通任务队列,完成任务的聚合。
第三件事,任务的执行:
protected boolean runAllTasks(long timeoutNanos) {
fetchFromScheduledTaskQueue();
Runnable task = pollTask(); // 从普通任务队列中拿任务
if (task == null) {
afterRunningAllTasks();
return false;
}
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; // 根据超时时间计算截止时间
long runTasks = 0;
long lastExecutionTime;
for (;;) {
safeExecute(task); // 执行任务,Runnable.run()
runTasks ++;
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask(); // 继续拿任务
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
上述代码前半请看注释,关键在于对任务次数的判断:
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
这里也是与操作,即(&63)== (%64),每执行64次就判断一下是否超时(因为需要和io处理时间分配时间,所以有一个超时时间),如果超时就退出,可能去进行io处理了。
为什么不每次都判断一下呢?上面的英文注释说了,nanoTime()老费时间了。
面试相关
问:Netty如何保证异步串行无锁化?
在NioEventLoop中封装了一个线程, 这个IO线程就是用来处理客户端的连接事件, 读写事件, 处理队列中的任务. 没错, 每个NioEventLoop都有一个队列, 这个队列是在创建NioEventLoop时被初始化的,netty比较重任务。这个任务队列是一个多生产者单消费者的队列,因此可以保证线程安全。根据inEventLoop()判断,如果是外部线程,也就不是我们自己的io线程,那么就把他的runnable任务放到我们的Mpsc队列中来保证线程安全,同理与定时任务队列中的任务,我们的io线程只处理普通任务中的任务,因此保证了线程之间不需要同步。
问:默认情况下,Netty服务端起多少线程?何时启动?
默认两倍CPU数,调用execute()方法判断是否在本线程内,如果是,那么就已经启动了,如果是在外部线程中,那么就需要执行startThread()方法判断线程是否启动,未启动就启动此线程。
问:Netty如何解决jdk空轮询bug?
jdk空轮询是在阻塞式select中,没有阻塞timeoutMillis时间就结束了阻塞select操作,我们称之为一次空轮询,因此判断这种空轮询操作是否超过设定的阈值(512),如果超过,就调用rebuildSelector()方法重建Selector把之前的key都移到新的轮询器上,避免bug。
问:简单说说NioEventLoop?
用户创建Boss/Worker EventLoopGroup时创建,默认创建NioEventLoop个数为2*CPU核数;每个NioEventLoop都由线程选择器chooser分配,并且用与运算优化了选择方式;每个NioEventLoop构造过程中都创建了Selector、任务队列,创建Selector时,通过反射使用数组替换集合方式保存selectedKeys ;NioEventLoop执行时调用execute()方法启动/创建FastThreadLocalThread线程,保存到该NioEventLoop的成员变量中进行一对一绑定;NioEventLoop执行逻辑在run方法中,包括检测io事件、处理io事件、执行任务队列。
PS:看了两天了这个。。非科班实在是水平不够,希望大佬们别喷,只是留个纪念,提提学习netty的建议,虚心接受-;-
这篇关于netty 旷视科技_Netty中NioEventLoop源码分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!