Netty学习——源码篇5 EventLoop

2024-03-25 12:36
文章标签 源码 学习 eventloop netty

本文主要是介绍Netty学习——源码篇5 EventLoop,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1 Reactor线程模型

        Reactor线程模型 中对Reactor的三种线程模型——单线程模型、多线程模型、主从多线程模型做了介绍,这里具体分析Reactor在Netty中的应用。

1.1单线程模型

单线程模型处理流程如下图:

        单线程模型,即Accept的处理和Handler的处理都在同一个线程中。这个模型的弊端是:当其中某个Handler阻塞时,会导致其他所有的Client的Handler都无法执行,并且更严重是,Handler的阻塞也会导致整个服务不能接收新的Client请求(因为Accept也被阻塞了)。 因为这个缺陷,所以单线程Reactor模型在Netty中的应用场景比较少。

1.2 多线程模型

        Netty中Reactor多线程模型的应用如如下图:

        1、设计一个专门的线程Accept,用于监听客户端的TCP连接请求。

        2、客户端的I/O操作都是由一个特定的NIO线程池负责。每个客户端连接都与一个特定的NIO线程绑定,因此在这个客户端连接中的所有I/O操作都是在同一个线程中完成的。

        3、客户端连接有很多,但是NIO线程数是比较少的,因此一个NIO线程可以同时绑定到多个客户端连接中。

1.3 主从多线程模型

        主从Reactor多线程模型在Netty中的应用,如下图

             一般情况下,Reactor的多线程模型已经适用于大部分业务场景。但如果服务端需要同时处理大量的客户端连接请求,或者需要再客户端连接时增加一些诸如权限的校验等操作,那么单个Accept就很有可能处理不过来,将会造成大量的客户端连接超时。主从Reactor多线程模型将服务端接收客户端的连接请求专门设计为一个独立的连接池。主从Reactor多线程模型和Reactor多线程模型很类似,只是在主动Reactor多线程模型的Accept线程池中获取数据,通过认证鉴权后进行派遣,再分配给Reactor线程池来处理客户端请求。

2 EventLoopGroup与Reactor关联

        不同的设置NioEventLoopGroup的方式对应了不同的Reactor线程模型。

        1、单线程模型在Netty中的应用代码如下:

        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup);

            首先实例化一个NioEventLoopGroup,接着调用bootstrap.group(bossGroup)设置服务端的EventLoopGroup。这里有个疑惑:在启动服务端的Netty程序时,需要设置bossGroup和workerGroup,为什么这里只设置了1个bossGroup?原因很简单,ServerBootstrap重写了group方法,代码如下:

@Overridepublic ServerBootstrap group(EventLoopGroup group) {return group(group, group);}

        因此,当传入一个group时,bossGroup和workerGroup就是同一个NioEventLoopGrouop,并且这个NioEventLoopGroup线程池数量只设置了1个线程,也就是说Netty中的Acceptor和后续的所有客户端连接的I/O操作都是在一个线程中处理的。那么对应到Reactor的线程模型中,这样设置NioEventLoopGroup,就相当于Reactor的单线程模式。

        2、多线程在Netty中的应用代码如下:

        NioEventLoopGroup bossGroup = new NioEventLoopGroup(16);ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup);

        从代码中可以看出,只需要将NioEventLoopGroup的参数设置大于1,就是Reactor多线程模型。

        3、主从Reactor模型在Netty中的应用代码如下:

        NioEventLoopGroup bossGroup = new NioEventLoopGroup();NioEventLoopGroup workGroup = new NioEventLoopGroup();ServerBootstrap server = new ServerBootstrap();server.group(bossGroup,workGroup);

        bossGroup为主线程,而workerGroup中的线程数是CPU核数*2,因此对应到Reactor线程模型中,这样设置的NioevGroup就是主从Reactor多线程模型。

3 EventLoopGroup的实例化

        首先,来看一下EventLoopGroup的类结构图,如下图:

        然后,在通过时序图来了解一下EventLoopGroup初始化的基本过程,如下图:

 

          基本步骤如下:

        1、EventLoopGroup内部维护一个属性为EventExecutor的children的数组,其大小是nThreads,这样就初始化一个线程池。

        2、在实例化NioEventLoopGroup时,如果指定县城池大小,则nThreads就是指定的值,否则是CPU核数 * 2。

        3、在MultithreadEventExecutorGroup中调用newChild()抽象方法来初始化children数组。

        4、newChild()抽象方法实际上是在NioEventLoopGroup中实现的,由它返回一个NioEventLoop实例。

        5、初始化NioEventLoop对象并给属性赋值,具体赋值属性如下:

                (1)provider:就是在NioEventLoopGroup构造器中,调用SelectorProvider的provider()方法获取的SelectorProvider对象。

                (2)selector:就是在NioEventLoop构造器中,调用selector=provider.openSelector()方法获取的Selector对象。

4 执行任务者EventLoop

        NioEventLoop继承自SingleThreadEventLoop,而SingleThreadEventLoop又继承自SingleThreadEventExecutor。SingleThreadEventExecutor是Netty对本地线程的抽象,它内部有一个Thread属性,实际上就是存储了一个本地Java 线程。因此可以简单的认为,一个NioEventLoop对象其实就是一个和特定的线程进行绑定,并且在NioEventLoop声明周期内,其绑定的线程都不会再改变,NioEventLoop的类层次结构图如下:

        NioEventLoop的类层次结构比较复杂,只需要关注重点即可。首先看NioEventLoop的继承关系:NioEventLoop继承SingleThreadEventLoop, SingleThreadEventLoop继承SingleThreadEventExecutor,SingleThreadEventExecutor继承AbstractScheduledEventExecutor。

        在AbstractScheduledEventExecutor,Netty实现了NioEventLoop的Schedule功能,即通过调用一个NioEventLoop实例的schedule方法来运行一些定时任务。而在SingleThreadEventLoop中,又实现了任务队列的功能。通过它,可以调用一个NioEventLoop实例的execute()方法向任务队列中添加一个Task,并由NioEventLoop进行调度执行。

        通常来说,NioEventLoop负责执行两个任务:第一个任务是作为I/O线程,执行与Channel相关的I/O操作,包括调用Selector等待就绪的I/O事件、读写数据与数据处理等;第二个任务是作为任务队列,执行taskQueue中的人物,例如用户调用eventLoop.schedule提交的定时任务也是由这个线程执行的。

4.1 NioEventLoop的实例化过程

        先了解一下EventLoop实例化的运行时序图,如下:

        从上图可以看出,SingleThreadEventExecutor有一个名为thread的Thread类型属性,这个属性就是与SingleThreadEventExecutor关联的本地线程。来看thread是在哪里被赋值的,代码如下:

private void doStartThread() {assert thread == null;executor.execute(new Runnable() {@Overridepublic void run() {thread = Thread.currentThread();if (interrupted) {thread.interrupt();}boolean success = false;updateLastExecutionTime();try {SingleThreadEventExecutor.this.run();success = true;} catch (Throwable t) {logger.warn("Unexpected exception from an event executor: ", t);} finally {for (;;) {int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {break;}}// Check if confirmShutdown() was called at the end of the loop.if (success && gracefulShutdownStartTime == 0) {logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +"before run() implementation terminates.");}try {// Run all remaining tasks and shutdown hooks.for (;;) {if (confirmShutdown()) {break;}}} finally {try {cleanup();} finally {STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);threadLock.release();if (!taskQueue.isEmpty()) {logger.warn("An event executor terminated with " +"non-empty task queue (" + taskQueue.size() + ')');}terminationFuture.setSuccess(null);}}}}});}

         前面分析过,SingleThreadEventExecutor启动时会调用doStartThread方法,然后调用executor.execute方法,将当前线程赋值给thread。在这个线程中所做的事情主要就是调用SingleThreadEventExecutor.this.run()方法,因为NioEventLoop实现了这个方法,所以根据多态性,其实调用的是NioEventLoop.run方法。

4.2 EventLoop与Channel关联

        在Netty中,每个Channel都有且仅有一个EventLoop与之关联,它们的关联过程如下图:

               

        从上图可以看到,当调用AbstractChannel.register()方法后,就完成了Channel和EventLoop的关联,register方法的具体实现如下:

        @Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) {if (eventLoop == null) {throw new NullPointerException("eventLoop");}if (isRegistered()) {promise.setFailure(new IllegalStateException("registered to an event loop already"));return;}if (!isCompatible(eventLoop)) {promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));return;}AbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) {register0(promise);} else {try {eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});} catch (Throwable t) {logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",AbstractChannel.this, t);closeForcibly();closeFuture.setClosed();safeSetFailure(promise, t);}}}

        在register方法中,会将一个EventLoop赋值给AbstractChannel内部的eventLoop属性,这句代码就完成了EventLoop与Channel的关联过程。

4.3 EventLoop 启动

        前面已经介绍NioEventLoop本身就是一个SingleThreadEventExecutor,因此NioEventLoop的启动,其实就是NioEventLoop所绑定的本地Java线程的启动。

        按照这个思路,只需要找到在哪里调用了SingleThreadExecutor中thread属性的start方法就可以知道在哪里启动这个线程了。前面分析过,其实thread.start()被封装在SingleThreadExecutor.startThread()方法中,代码如下:

    private void startThread() {if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {doStartThread();}}}

        STATE_UPDATER是SingleThreadExecutor内部维护的一个属性,它的作用是标识当前的Thread的状态。在初始化的时候,STATE_UPDATER == ST_NOT_STARTED,因此第一次调用startThread方法时,就会进入if语句内,进而调用thread.start方法。而这个关键的startThread方法又是在哪调用的呢?用方法调用关系反向查找功能,就恢复阿贤,startTahread方法是在SingleThreadEventExecutor的execute方法中调用的,代码如下:

    @Overridepublic void execute(Runnable task) {if (task == null) {throw new NullPointerException("task");}boolean inEventLoop = inEventLoop();if (inEventLoop) {addTask(task);} else {startThread();addTask(task);if (isShutdown() && removeTask(task)) {reject();}}if (!addTaskWakesUp && wakesUpForTask(task)) {wakeup(inEventLoop);}}

        既然如此,现在只需要找到第一次调用SingleThreadEventExecutor的execute方法的位置即可。前面在提到注册Channel的过程中,会在AbstractChannel的register方法中调用eventLoop.execute方法,在EventLoop中进行Channel注册代码的执行,AbstractChannel的register方法的关键代码如下:

        @Overridepublic final void register(EventLoop eventLoop, final ChannelPromise promise) {//删除判断代码AbstractChannel.this.eventLoop = eventLoop;if (eventLoop.inEventLoop()) {register0(promise);} else {try {eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);}});} catch (Throwable t) {//删除异常处理代码}}}

        很明显,从Boostrap的bind方法一路跟踪到AbstractChannel的register方法,整个代码都是在主线程中运行的,因此上面的eventLoop.inEventLoop()返回值为false,于是进入else分支,在这个分支中调用eventLoop.execute方法,而NioEventLoop没有实现execute方法,因此调用的是SingleThreadEventExecutor的execute方法,关键代码如下:

    @Overridepublic void execute(Runnable task) {if (task == null) {throw new NullPointerException("task");}boolean inEventLoop = inEventLoop();if (inEventLoop) {addTask(task);} else {startThread();addTask(task);if (isShutdown() && removeTask(task)) {reject();}}if (!addTaskWakesUp && wakesUpForTask(task)) {wakeup(inEventLoop);}}

       由于 inEventLoop ==false,因此直行道else分支就调用startThread方法来启动SingleThreadEventExecutor内部关联的Java本地线程。用一句话总结:当EventLoop的execute方法第一次被调用时,会触发startThread方法的调用,进而启动EventLoop所对应的Java本地线程。

        完整的EventLoop启动时序图如下:

这篇关于Netty学习——源码篇5 EventLoop的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/845030

相关文章

HarmonyOS学习(七)——UI(五)常用布局总结

自适应布局 1.1、线性布局(LinearLayout) 通过线性容器Row和Column实现线性布局。Column容器内的子组件按照垂直方向排列,Row组件中的子组件按照水平方向排列。 属性说明space通过space参数设置主轴上子组件的间距,达到各子组件在排列上的等间距效果alignItems设置子组件在交叉轴上的对齐方式,且在各类尺寸屏幕上表现一致,其中交叉轴为垂直时,取值为Vert

Ilya-AI分享的他在OpenAI学习到的15个提示工程技巧

Ilya(不是本人,claude AI)在社交媒体上分享了他在OpenAI学习到的15个Prompt撰写技巧。 以下是详细的内容: 提示精确化:在编写提示时,力求表达清晰准确。清楚地阐述任务需求和概念定义至关重要。例:不用"分析文本",而用"判断这段话的情感倾向:积极、消极还是中性"。 快速迭代:善于快速连续调整提示。熟练的提示工程师能够灵活地进行多轮优化。例:从"总结文章"到"用

【前端学习】AntV G6-08 深入图形与图形分组、自定义节点、节点动画(下)

【课程链接】 AntV G6:深入图形与图形分组、自定义节点、节点动画(下)_哔哩哔哩_bilibili 本章十吾老师讲解了一个复杂的自定义节点中,应该怎样去计算和绘制图形,如何给一个图形制作不间断的动画,以及在鼠标事件之后产生动画。(有点难,需要好好理解) <!DOCTYPE html><html><head><meta charset="UTF-8"><title>06

学习hash总结

2014/1/29/   最近刚开始学hash,名字很陌生,但是hash的思想却很熟悉,以前早就做过此类的题,但是不知道这就是hash思想而已,说白了hash就是一个映射,往往灵活利用数组的下标来实现算法,hash的作用:1、判重;2、统计次数;

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

零基础学习Redis(10) -- zset类型命令使用

zset是有序集合,内部除了存储元素外,还会存储一个score,存储在zset中的元素会按照score的大小升序排列,不同元素的score可以重复,score相同的元素会按照元素的字典序排列。 1. zset常用命令 1.1 zadd  zadd key [NX | XX] [GT | LT]   [CH] [INCR] score member [score member ...]

【机器学习】高斯过程的基本概念和应用领域以及在python中的实例

引言 高斯过程(Gaussian Process,简称GP)是一种概率模型,用于描述一组随机变量的联合概率分布,其中任何一个有限维度的子集都具有高斯分布 文章目录 引言一、高斯过程1.1 基本定义1.1.1 随机过程1.1.2 高斯分布 1.2 高斯过程的特性1.2.1 联合高斯性1.2.2 均值函数1.2.3 协方差函数(或核函数) 1.3 核函数1.4 高斯过程回归(Gauss

【学习笔记】 陈强-机器学习-Python-Ch15 人工神经网络(1)sklearn

系列文章目录 监督学习:参数方法 【学习笔记】 陈强-机器学习-Python-Ch4 线性回归 【学习笔记】 陈强-机器学习-Python-Ch5 逻辑回归 【课后题练习】 陈强-机器学习-Python-Ch5 逻辑回归(SAheart.csv) 【学习笔记】 陈强-机器学习-Python-Ch6 多项逻辑回归 【学习笔记 及 课后题练习】 陈强-机器学习-Python-Ch7 判别分析 【学

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

系统架构师考试学习笔记第三篇——架构设计高级知识(20)通信系统架构设计理论与实践

本章知识考点:         第20课时主要学习通信系统架构设计的理论和工作中的实践。根据新版考试大纲,本课时知识点会涉及案例分析题(25分),而在历年考试中,案例题对该部分内容的考查并不多,虽在综合知识选择题目中经常考查,但分值也不高。本课时内容侧重于对知识点的记忆和理解,按照以往的出题规律,通信系统架构设计基础知识点多来源于教材内的基础网络设备、网络架构和教材外最新时事热点技术。本课时知识