Netty源码分析:NioEventLoop启动以及其IO操作和Task任务的处理

2024-05-14 03:48

本文主要是介绍Netty源码分析:NioEventLoop启动以及其IO操作和Task任务的处理,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Netty源码分析:NioEventLoop启动以及其IO操作和Task任务的处理

在上篇博文分析服务端启动的过程中,我们遇到了如下的代码片段,

            if (eventLoop.inEventLoop()) {register0(promise);} else {try {eventLoop.execute(new OneTimeTask() {@Overridepublic void run() {register0(promise);//分析}});}

以及

    private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {// This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up// the pipeline in its channelRegistered() implementation.channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);} else {promise.setFailure(regFuture.cause());}}});} 

这些代码片段中,都调用了NioEventLoop类的execute这个方法来提交任务。 本篇博文将以此函数作为切入点,来分析NioEventLoop 启动以及 EventLoop所负责的IO操作和task任务的处理思路。

NioEventLoop类的execute方法

由于NioEventLoop并没有实现此方法,因此,准确来说,应该是NioEventLoop类的超类SingleThreadEventExecutor中的方法。具体代码如下:

    @Overridepublic void execute(Runnable task) {if (task == null) {throw new NullPointerException("task");}boolean inEventLoop = inEventLoop();//判断当前线程是否为该NioEventLoop所关联的线程,如果是,则添加任务到任务队列中,如果不是,则先启动线程,然后添加任务到任务队列中去if (inEventLoop) {addTask(task);} else {startThread();addTask(task);//如果发现该线程已经关闭则移除任务并拒绝if (isShutdown() && removeTask(task)) {reject();}}if (!addTaskWakesUp && wakesUpForTask(task)) {wakeup(inEventLoop);}}

该方法的主要逻辑如下:

1、判断入参数是否为null,如果为null,则抛空指针异常,如果不为null,则进行第2步。

2、通过调用inEventLoop()方法判断当前工作线程是否为该NioEventLoop所关联的线程,如果是,则调用addTask(task)方法将任务添加到任务队列中,如果不是,则进行第3步。

3、启动该NioEventLoop所关联的线程,然后添加任务到任务队列中去 。

4、如果发现该线程已经关闭则移除任务并拒绝

在看startThread()这个方法的具体代码之前,我觉得有必要介绍下EventLoop.execute第一次被调用的地方,因此第一次被调用的地方就是触发startThread()的调用,进而导致了EventLoop所对应的Java线程的启动。

基于这样一个问题,我们开始在服务端启动时开始跟踪代码来寻找。最快的方式是,在EventLoop.execute方法的 startThread();这一行代码打上一个断点,然后看下其调用链即可。

调用链截图如下:

看到了没有,是在服务器端启动的过程中,调用AbstractChannel#AbstractUnsafe.register方法中调用了execute方法,这是因为整个代码都是在主线程中运行的因此在下面的register方法中 eventLoop.inEventLoop() 就为 false, 于是进入到 else 分支, 在这个分支中调用了 eventLoop.execute。eventLoop 是一个 NioEventLoop 的实例, 而 NioEventLoop 没有实现 execute 方法, 因此调用的是 SingleThreadEventExecutor.execute,继而调用startThread方法完成NioEventLoop所对应的线程的启动。

        @Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) {//省略了部分检查代码AbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) {register0(promise);} else {try {eventLoop.execute(new OneTimeTask() {@Overridepublic void run() {register0(promise);}});} catch (Throwable t) {//省略了部分异常处理代码}}}

结论:当 EventLoop.execute 第一次被调用时, 就会触发 startThread() 的调用, 进而导致了 EventLoop 所对应的 Java 线程的启动。

EventLoop的启动方法:startThread

上面找到了startThread调用的调用链哈,这里看下startThread()方法

 SingleThreadEventExecutor.javaprivate void startThread() {if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {delayedTaskQueue.add(new ScheduledFutureTask<Void>(this, delayedTaskQueue, Executors.<Void>callable(new PurgeTask(), null),ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL));thread.start();}}}

上面startThread方法中的 STATE_UPDATER 是 SingleThreadEventExecutor 内部维护的一个属性, 它的作用是标识当前的 thread 的状态. 在初始的时候, STATE_UPDATER == ST_NOT_STARTED, 因此第一次调用 startThread() 方法时, 就会进入到 if 语句内, 进而调用到 thread.start()完成NioEventLoop所对应的 Java线程的启动.

以上就完成了NioEventLoop所对应的Java线程的启动。那么启动之后,该线程主要用于干什么呢?

这里先说结论:在 Netty 中, 一个 EventLoop 需要负责两个工作, 第一个是作为 IO 线程, 负责相应的 IO 操作; 第二个是作为任务线程, 执行 taskQueue 中的任务.

上面的分析中调用startThread()方法继而调用thread.start()来启动EventLoop 所对应的Java 线程。下面来线程启动具体做了什么哈,也就是看下如下的run方法主要干了什么哈,

        thread = threadFactory.newThread(new Runnable() {@Overridepublic void run() {//...SingleThreadEventExecutor.this.run();//调用了NioEventLoop类中的run方法,下面将主要分析//...});      

上面的源码本来很长很长,上面是精简版本,简单来说,这里我们只关注一行代码: SingleThreadEventExecutor.this.run(),这里就是调用了NioEventLoop的run方法,下面继续跟。

NioEventLoop.run()方法

NioEventLoop类中的run方法的代码如下:

    @Overrideprotected void run() {for (;;) {boolean oldWakenUp = wakenUp.getAndSet(false);try {// 当有任务时为了保证任务及时执行采用不阻塞的selectNow获取准备好I/O的连接 if (hasTasks()) {selectNow();} else {// 当无任务时采用阻塞等待的方式获取连接  select(oldWakenUp);if (wakenUp.get()) {selector.wakeup();}}cancelledKeys = 0;needsToSelectAgain = false;final int ioRatio = this.ioRatio;if (ioRatio == 100) {processSelectedKeys();//分析runAllTasks();//分析} else {final long ioStartTime = System.nanoTime();processSelectedKeys();final long ioTime = System.nanoTime() - ioStartTime;runAllTasks(ioTime * (100 - ioRatio) / ioRatio);}if (isShuttingDown()) {closeAll();if (confirmShutdown()) {break

这篇关于Netty源码分析:NioEventLoop启动以及其IO操作和Task任务的处理的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/987663

相关文章

Python调用Orator ORM进行数据库操作

《Python调用OratorORM进行数据库操作》OratorORM是一个功能丰富且灵活的PythonORM库,旨在简化数据库操作,它支持多种数据库并提供了简洁且直观的API,下面我们就... 目录Orator ORM 主要特点安装使用示例总结Orator ORM 是一个功能丰富且灵活的 python O

python使用fastapi实现多语言国际化的操作指南

《python使用fastapi实现多语言国际化的操作指南》本文介绍了使用Python和FastAPI实现多语言国际化的操作指南,包括多语言架构技术栈、翻译管理、前端本地化、语言切换机制以及常见陷阱和... 目录多语言国际化实现指南项目多语言架构技术栈目录结构翻译工作流1. 翻译数据存储2. 翻译生成脚本

Springboot中分析SQL性能的两种方式详解

《Springboot中分析SQL性能的两种方式详解》文章介绍了SQL性能分析的两种方式:MyBatis-Plus性能分析插件和p6spy框架,MyBatis-Plus插件配置简单,适用于开发和测试环... 目录SQL性能分析的两种方式:功能介绍实现方式:实现步骤:SQL性能分析的两种方式:功能介绍记录

0基础租个硬件玩deepseek,蓝耘元生代智算云|本地部署DeepSeek R1模型的操作流程

《0基础租个硬件玩deepseek,蓝耘元生代智算云|本地部署DeepSeekR1模型的操作流程》DeepSeekR1模型凭借其强大的自然语言处理能力,在未来具有广阔的应用前景,有望在多个领域发... 目录0基础租个硬件玩deepseek,蓝耘元生代智算云|本地部署DeepSeek R1模型,3步搞定一个应

Android里面的Service种类以及启动方式

《Android里面的Service种类以及启动方式》Android中的Service分为前台服务和后台服务,前台服务需要亮身份牌并显示通知,后台服务则有启动方式选择,包括startService和b... 目录一句话总结:一、Service 的两种类型:1. 前台服务(必须亮身份牌)2. 后台服务(偷偷干

最长公共子序列问题的深度分析与Java实现方式

《最长公共子序列问题的深度分析与Java实现方式》本文详细介绍了最长公共子序列(LCS)问题,包括其概念、暴力解法、动态规划解法,并提供了Java代码实现,暴力解法虽然简单,但在大数据处理中效率较低,... 目录最长公共子序列问题概述问题理解与示例分析暴力解法思路与示例代码动态规划解法DP 表的构建与意义动

轻松上手MYSQL之JSON函数实现高效数据查询与操作

《轻松上手MYSQL之JSON函数实现高效数据查询与操作》:本文主要介绍轻松上手MYSQL之JSON函数实现高效数据查询与操作的相关资料,MySQL提供了多个JSON函数,用于处理和查询JSON数... 目录一、jsON_EXTRACT 提取指定数据二、JSON_UNQUOTE 取消双引号三、JSON_KE

使用C++将处理后的信号保存为PNG和TIFF格式

《使用C++将处理后的信号保存为PNG和TIFF格式》在信号处理领域,我们常常需要将处理结果以图像的形式保存下来,方便后续分析和展示,C++提供了多种库来处理图像数据,本文将介绍如何使用stb_ima... 目录1. PNG格式保存使用stb_imagephp_write库1.1 安装和包含库1.2 代码解

Windows设置nginx启动端口的方法

《Windows设置nginx启动端口的方法》在服务器配置与开发过程中,nginx作为一款高效的HTTP和反向代理服务器,被广泛应用,而在Windows系统中,合理设置nginx的启动端口,是确保其正... 目录一、为什么要设置 nginx 启动端口二、设置步骤三、常见问题及解决一、为什么要设置 nginx

springboot启动流程过程

《springboot启动流程过程》SpringBoot简化了Spring框架的使用,通过创建`SpringApplication`对象,判断应用类型并设置初始化器和监听器,在`run`方法中,读取配... 目录springboot启动流程springboot程序启动入口1.创建SpringApplicat