Netty源码阅读之NioEventLoop简析

2023-11-02 09:40

本文主要是介绍Netty源码阅读之NioEventLoop简析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

     在Netty中NioEventLoop以及NioEventLoopGroup是很重要的两个类,而NioEventLoopGroup主要是对NioEventLoop进行管理;首先来看一下这两个类的关系图(错综复杂):

                                                                                                 图 1

1. NioEventLoopGroup初始化流程

通过分析NioEventLoopGroup的构造方法的调用栈我们能够看到在io.netty.channel.MultithreadEventLoopGroup的构造方法中进行了创建:

当未指定具体的线程数目的时候,Netty会提出一个默认的线程数:DEFAULT_EVENT_LOOP_THREADS

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);}

而该数值在同一类下的静态代码块中进行了设置:

static {DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));if (logger.isDebugEnabled()) {logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);}}

显而易见,默认的线程数量为2*cpu数目。

继续深入,打开io.netty.util.concurrent.MultithreadEventExecutorGroup这个类,查看其构造方法:

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {if (nThreads <= 0) {throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));}if (executor == null) {executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());//创建线程执行器}//构造NioEventLoop的过程children = new EventExecutor[nThreads];for (int i = 0; i < nThreads; i ++) {boolean success = false;try {children[i] = newChild(executor, args);success = true;} catch (Exception e) {// TODO: Think about if this is a good exception typethrow new IllegalStateException("failed to create a child event loop", e);} finally {if (!success) {for (int j = 0; j < i; j ++) {children[j].shutdownGracefully();}for (int j = 0; j < i; j ++) {EventExecutor e = children[j];try {while (!e.isTerminated()) {e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);}} catch (InterruptedException interrupted) {// Let the caller handle the interruption.Thread.currentThread().interrupt();break;}}}}}chooser = chooserFactory.newChooser(children);//生成线程选择器final FutureListener<Object> terminationListener = new FutureListener<Object>() {@Overridepublic void operationComplete(Future<Object> future) throws Exception {if (terminatedChildren.incrementAndGet() == children.length) {terminationFuture.setSuccess(null);}}};for (EventExecutor e: children) {e.terminationFuture().addListener(terminationListener);}Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);Collections.addAll(childrenSet, children);readonlyChildren = Collections.unmodifiableSet(childrenSet);}

通过上面的代码流程可知,首先是创建线程执行器,

线程执行器中传入一个默认的线程工厂:newDefaultThreadFactory,在线程工厂中进行nio线程的创建并进行线程的命名:

public static String toPoolName(Class<?> poolType) {if (poolType == null) {throw new NullPointerException("poolType");}String poolName = StringUtil.simpleClassName(poolType);switch (poolName.length()) {case 0:return "unknown";case 1:return poolName.toLowerCase(Locale.US);default:if (Character.isUpperCase(poolName.charAt(0)) && Character.isLowerCase(poolName.charAt(1))) {return Character.toLowerCase(poolName.charAt(0)) + poolName.substring(1);} else {return poolName;}}}

线程名称类似于:nioEventLoop-x-x这种形式;同时,将根据线程数目创建一个同等容量的EventExecutor数组,数组中通过newChild()方法塞入一个EventExecutor对象,当然这只是一个抽象方法,具体的实现根据不同的类来决定,若在这个过程中有一个线程发生了异常,则会从当前的这个线程开始,将前面从第一个线程开始,关闭对应的线程执行器;之后再初始化线程选择器工厂,并通过轮询算法来处理本次的所有EventLoop事件,加入线程工厂的时候,采用了策略模式,会有一个2次幂的判断,如果上述的数组长度为2的幂次方,那么选用PowerOfTowEventExecutorChooser(executors)

,否则将选用GenericEventExecutorChooser(executors)

public EventExecutorChooser newChooser(EventExecutor[] executors) {if (isPowerOfTwo(executors.length)) {return new PowerOfTowEventExecutorChooser(executors);} else {return new GenericEventExecutorChooser(executors);}}

而这两种方式主要在遍历数组的时候存在区别,当为2的次幂的时候,采用如下方式进行遍历:

public EventExecutor next() {return executors[idx.getAndIncrement() & executors.length - 1];}

反之则采用如下的方式进行数组的遍历:

public EventExecutor next() {return executors[Math.abs(idx.getAndIncrement() % executors.length)];}

由上面的分析,我们可以得出Netty中的EventLoop处理关系图:

                                                                                            图 2

2. NioEventLoop启动逻辑

启动的入口为:io.netty.bootstrap.AbstractBootstrap#doBind0()方法:

private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {// This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up// the pipeline in its channelRegistered() implementation.channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});}

主要是进行端口的绑定。

接着往下查看execute()方法:

public void execute(Runnable task) {if (task == null) {throw new NullPointerException("task");}boolean inEventLoop = inEventLoop();if (inEventLoop) {addTask(task);} else {startThread();addTask(task);if (isShutdown() && removeTask(task)) {reject();}}if (!addTaskWakesUp && wakesUpForTask(task)) {wakeup(inEventLoop);}}

在execute()方法中,首先通过inEventLoop()方法判断当前的线程是否是在eventLoop中,值得注意的是,每一个NioEventLoop都维护着一个taskQueue,读写任务都将被丢进这个队列中进行维护:

@Overrideprotected Queue<Runnable> newTaskQueue(int maxPendingTasks) {// This event loop never calls takeTask()return PlatformDependent.newMpscQueue(maxPendingTasks);}

这是Netty实现异步串行无锁化的关键;回归正题,如果已经在evetLoop中了,那么直接将当前的任务添加到任务队列中,否则将执行doStartThread()方法:

private void doStartThread() {assert thread == null;executor.execute(new Runnable() {@Overridepublic void run() {thread = Thread.currentThread();if (interrupted) {thread.interrupt();}boolean success = false;updateLastExecutionTime();try {SingleThreadEventExecutor.this.run();success = true;} catch (Throwable t) {logger.warn("Unexpected exception from an event executor: ", t);} finally {for (;;) {int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {break;}}// Check if confirmShutdown() was called at the end of the loop.if (success && gracefulShutdownStartTime == 0) {logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +"before run() implementation terminates.");}try {// Run all remaining tasks and shutdown hooks.for (;;) {if (confirmShutdown()) {break;}}} finally {try {cleanup();} finally {STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);threadLock.release();if (!taskQueue.isEmpty()) {logger.warn("An event executor terminated with " +"non-empty task queue (" + taskQueue.size() + ')');}terminationFuture.setSuccess(null);}}}}});}

通过SingleThreadEventExecutor.this.run()方法,Netty中的channel将不断轮询处理channel事件:

 protected void run() {for (;;) {try {switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {case SelectStrategy.CONTINUE:continue;case SelectStrategy.SELECT:select(wakenUp.getAndSet(false));if (wakenUp.get()) {selector.wakeup();}default:// fallthrough}cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;if (ioRatio == 100) {try {processSelectedKeys();} finally {// Ensure we always run tasks.runAllTasks();}} else {final long ioStartTime = System.nanoTime();try {processSelectedKeys();} finally {// Ensure we always run tasks.final long ioTime = System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}}} catch (Throwable t) {handleLoopException(t);}// Always handle shutdown even if the loop processing threw an exception.try {if (isShuttingDown()) {closeAll();if (confirmShutdown()) {return;}}} catch (Throwable t) {handleLoopException(t);}}}

在事件循环中不仅需要处理IO事件也需要处理非IO事件,IO事件处理通过processSelectedKeys方法来进行,而非IO事件通过runAllTasks()方法进行处理,IO事件以及非IO事件的默认占比各为50%,值得注意的是:SelectStrategy.SELECT这种情况:

private void select(boolean oldWakenUp) throws IOException {Selector selector = this.selector;try {int selectCnt = 0;long currentTimeNanos = System.nanoTime();long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);for (;;) {long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;if (timeoutMillis <= 0) {if (selectCnt == 0) {selector.selectNow();selectCnt = 1;}break;}// If a task was submitted when wakenUp value was true, the task didn't get a chance to call// Selector#wakeup. So we need to check task queue again before executing select operation.// If we don't, the task might be pended until select operation was timed out.// It might be pended until idle timeout if IdleStateHandler existed in pipeline.if (hasTasks() && wakenUp.compareAndSet(false, true)) {selector.selectNow();selectCnt = 1;break;}int selectedKeys = selector.select(timeoutMillis);selectCnt ++;if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {// - Selected something,// - waken up by user, or// - the task queue has a pending task.// - a scheduled task is ready for processingbreak;}if (Thread.interrupted()) {// Thread was interrupted so reset selected keys and break so we not run into a busy loop.// As this is most likely a bug in the handler of the user or it's client library we will// also log it.//// See https://github.com/netty/netty/issues/2426if (logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely because " +"Thread.currentThread().interrupt() was called. Use " +"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");}selectCnt = 1;break;}long time = System.nanoTime();if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {// timeoutMillis elapsed without anything selected.selectCnt = 1;} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {// The selector returned prematurely many times in a row.// Rebuild the selector to work around the problem.logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",selectCnt, selector);rebuildSelector();selector = this.selector;// Select again to populate selectedKeys.selector.selectNow();selectCnt = 1;break;}currentTimeNanos = time;}if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {if (logger.isDebugEnabled()) {logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",selectCnt - 1, selector);}}} catch (CancelledKeyException e) {if (logger.isDebugEnabled()) {logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",selector, e);}// Harmless exception - log anyway}}

在这种情况下,每次对selectCnt这个标志位进行自增的操作,后续通过计算:

time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos

若满足,则seletCnt重新置为1,最后若一旦超过SELECTOR_AUTO_REBUILD_THRESHOLD(512),那么需要重建selector,Netty正是通过这种方式规避了空轮询的bug。

 

 

这篇关于Netty源码阅读之NioEventLoop简析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/329722

相关文章

Go中sync.Once源码的深度讲解

《Go中sync.Once源码的深度讲解》sync.Once是Go语言标准库中的一个同步原语,用于确保某个操作只执行一次,本文将从源码出发为大家详细介绍一下sync.Once的具体使用,x希望对大家有... 目录概念简单示例源码解读总结概念sync.Once是Go语言标准库中的一个同步原语,用于确保某个操

Java汇编源码如何查看环境搭建

《Java汇编源码如何查看环境搭建》:本文主要介绍如何在IntelliJIDEA开发环境中搭建字节码和汇编环境,以便更好地进行代码调优和JVM学习,首先,介绍了如何配置IntelliJIDEA以方... 目录一、简介二、在IDEA开发环境中搭建汇编环境2.1 在IDEA中搭建字节码查看环境2.1.1 搭建步

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

如何在Visual Studio中调试.NET源码

今天偶然在看别人代码时,发现在他的代码里使用了Any判断List<T>是否为空。 我一般的做法是先判断是否为null,再判断Count。 看了一下Count的源码如下: 1 [__DynamicallyInvokable]2 public int Count3 {4 [__DynamicallyInvokable]5 get

工厂ERP管理系统实现源码(JAVA)

工厂进销存管理系统是一个集采购管理、仓库管理、生产管理和销售管理于一体的综合解决方案。该系统旨在帮助企业优化流程、提高效率、降低成本,并实时掌握各环节的运营状况。 在采购管理方面,系统能够处理采购订单、供应商管理和采购入库等流程,确保采购过程的透明和高效。仓库管理方面,实现库存的精准管理,包括入库、出库、盘点等操作,确保库存数据的准确性和实时性。 生产管理模块则涵盖了生产计划制定、物料需求计划、

论文阅读笔记: Segment Anything

文章目录 Segment Anything摘要引言任务模型数据引擎数据集负责任的人工智能 Segment Anything Model图像编码器提示编码器mask解码器解决歧义损失和训练 Segment Anything 论文地址: https://arxiv.org/abs/2304.02643 代码地址:https://github.com/facebookresear

Spring 源码解读:自定义实现Bean定义的注册与解析

引言 在Spring框架中,Bean的注册与解析是整个依赖注入流程的核心步骤。通过Bean定义,Spring容器知道如何创建、配置和管理每个Bean实例。本篇文章将通过实现一个简化版的Bean定义注册与解析机制,帮助你理解Spring框架背后的设计逻辑。我们还将对比Spring中的BeanDefinition和BeanDefinitionRegistry,以全面掌握Bean注册和解析的核心原理。

音视频入门基础:WAV专题(10)——FFmpeg源码中计算WAV音频文件每个packet的pts、dts的实现

一、引言 从文章《音视频入门基础:WAV专题(6)——通过FFprobe显示WAV音频文件每个数据包的信息》中我们可以知道,通过FFprobe命令可以打印WAV音频文件每个packet(也称为数据包或多媒体包)的信息,这些信息包含该packet的pts、dts: 打印出来的“pts”实际是AVPacket结构体中的成员变量pts,是以AVStream->time_base为单位的显

kubelet组件的启动流程源码分析

概述 摘要: 本文将总结kubelet的作用以及原理,在有一定基础认识的前提下,通过阅读kubelet源码,对kubelet组件的启动流程进行分析。 正文 kubelet的作用 这里对kubelet的作用做一个简单总结。 节点管理 节点的注册 节点状态更新 容器管理(pod生命周期管理) 监听apiserver的容器事件 容器的创建、删除(CRI) 容器的网络的创建与删除