Java网络编程(11) - NIOEventLoopGroup源码解读,Netty内存管理机制是怎么样的?BytBuf特点是什么?

本文主要是介绍Java网络编程(11) - NIOEventLoopGroup源码解读,Netty内存管理机制是怎么样的?BytBuf特点是什么?,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

NIOEventLoopGroup源码解读(解决Java Nio Bug)

      NioEvnetLoopGrou继承了MultithreadEventLoopGroup。

class NioEventLoopGroup extends MultithreadEventLoopGroup

 

      NioEventLoopGroup的父类MultithreadEventLoopGroup的父类MultithreadEventExecutorGroup内部维护了一个类型为EventExecutor[] children,默认大小是处理器核心数 * 2,这样就构成了一个线程池。

public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {

 

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(MultithreadEventLoopGroup.class);

 

    private static final int DEFAULT_EVENT_LOOP_THREADS;

 

    static {

        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(

                "io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));

 

        if (logger.isDebugEnabled()) {

            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);

        }

    }

public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {

 

    private final EventExecutor[] children;

    private final Set<EventExecutor> readonlyChildren;

    private final AtomicInteger childIndex = new AtomicInteger();

    private final AtomicInteger terminatedChildren = new AtomicInteger();

    private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);

    private final EventExecutorChooser chooser;

 

      初始化EventExecutor时,NioEventLoopGroup重载newChild方法,所以childrent元素的实际类型为NioEventLoop。

      所以线程池启动的话,执行的是NioEventLoop。

private MultithreadEventExecutorGroup(int nEventExecutors,

                                          Executor executor,

                                          boolean shutdownExecutor,

                                          Object... args) {

        if (nEventExecutors <= 0) {

            throw new IllegalArgumentException(

                    String.format("nEventExecutors: %d (expected: > 0)", nEventExecutors));

        }

 

        if (executor == null) {

            executor = newDefaultExecutorService(nEventExecutors);

            shutdownExecutor = true;

        }

 

        children = new EventExecutor[nEventExecutors];

        if (isPowerOfTwo(children.length)) {

            chooser = new PowerOfTwoEventExecutorChooser();

        } else {

            chooser = new GenericEventExecutorChooser();

        }

 

        for (int i = 0; i < nEventExecutors; i ++) {

            boolean success = false;

            try {

                children[i] = newChild(executor, args);

public class NioEventLoopGroup extends MultithreadEventLoopGroup {

    @Override

    protected EventLoop newChild(Executor executor, Object... args) throws Exception {

        return new NioEventLoop(this, executor, (SelectorProvider) args[0]);

    }

}

 

      那么此时就可以知道,实际执行的是NioEventLoop,那么来看下NioEventLoop的源码。

NioEventLoop的构造方法会先去调用父类SingleThreadEventExecutor的构造方法

父类SingleThreadEventExecutor的构造方法并没有做什么。

public final class NioEventLoop extends SingleThreadEventLoop {

NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider) {

        super(parent, executor, false);

        if (selectorProvider == null) {

            throw new NullPointerException("selectorProvider");

        }

        provider = selectorProvider;

        selector = openSelector();

}

}

public abstract class SingleThreadEventLoop extends SingleThreadEventExecutor implements EventLoop {

    protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp) {

        super(parent, executor, addTaskWakesUp);

    }

}

 

      NioEventLoop构造完成后,会执行自身的run方法,run方法首先去调用hashTasks()方法判断当前taskQueue中是否有元素。

      如果taskQueue中有元素,执行selectNow()方法,最终执行的是selector.selectNow,该方法会立即返回。

      如果taskQueue没有元素,执行select(oldWakenUp)方法。

public final class NioEventLoop extends SingleThreadEventLoop {

    @Override

    protected void run() {

        boolean oldWakenUp = wakenUp.getAndSet(false);

        try {

            if (hasTasks()) {

                selectNow();

            } else {

                select(oldWakenUp);

                if (wakenUp.get()) {

                    selector.wakeup();

                }

            }

   

            cancelledKeys = 0;

            needsToSelectAgain = false;

            final int ioRatio = this.ioRatio;

            if (ioRatio == 100) {

                processSelectedKeys();

                runAllTasks();

            } else {

                final long ioStartTime = System.nanoTime();

   

                processSelectedKeys();

   

                final long ioTime = System.nanoTime() - ioStartTime;

                runAllTasks(ioTime * (100 - ioRatio) / ioRatio);

            }

   

            if (isShuttingDown()) {

                closeAll();

                if (confirmShutdown()) {

                    cleanupAndTerminate(true);

                    return;

                }

            }

        } catch (Throwable t) {

            logger.warn("Unexpected exception in the selector loop.", t);

            try {

                Thread.sleep(1000);

            } catch (InterruptedException e) {

            }

        }

        scheduleExecution();

    }

}

public final class NioEventLoop extends SingleThreadEventLoop {

    void selectNow() throws IOException {

        try {

            selector.selectNow();

        } finally {

            // restore wakup state if needed

            if (wakenUp.get()) {

                selector.wakeup();

            }

        }

    }

}

 

      select(oldWakenUp)方法解决了Nio中的Bug,selectCnt用来记录slector.sleect方法的执行次数和标识是否执行过seelctor.selectNow()。

若触发了epoll的空轮询Bug,则会反复执行selector.select(timeMisllis),变量selectCnt会逐渐变大。

当selectCnt达到阀值(默认512),则执行rebuildSelector方法,进行selector重建,解决cpu占用100%的Bug。

public final class NioEventLoop extends SingleThreadEventLoop {

    private void select(boolean oldWakenUp) throws IOException {

        Selector selector = this.selector;

        try {

            int selectCnt = 0;

            long currentTimeNanos = System.nanoTime();

            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

            for (;;) {

                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;

                if (timeoutMillis <= 0) {

                    if (selectCnt == 0) {

                        selector.selectNow();

                        selectCnt = 1;

                    }

                    break;

                }

 

                int selectedKeys = selector.select(timeoutMillis);

                selectCnt ++;

 

                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {

                    break;

                }

                if (Thread.interrupted()) {

                    if (logger.isDebugEnabled()) {

                        logger.debug("Selector.select() returned prematurely because " +

                                "Thread.currentThread().interrupt() was called. Use " +

                                "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");

                    }

                    selectCnt = 1;

                    break;

                }

 

                long time = System.nanoTime();

                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {

                    selectCnt = 1;

                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&

                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {

                    logger.warn(

                            "Selector.select() returned prematurely {} times in a row; rebuilding selector.",

                            selectCnt);

 

                    rebuildSelector();

                    selector = this.selector;

 

                    selector.selectNow();

                    selectCnt = 1;

                    break;

                }

 

                currentTimeNanos = time;

            }

 

            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {

                if (logger.isDebugEnabled()) {

                    logger.debug("Selector.select() returned prematurely {} times in a row.", selectCnt - 1);

                }

            }

        } catch (CancelledKeyException e) {

            if (logger.isDebugEnabled()) {

                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", e);

            }

        }

    }

}

 

      rebuildSelector方法会先通过openSelector方法创建一个新的selector,然后将old selector的selectionKey执行cancle。最后将old selector的channel重新注册到新的selector中。

      rebuild后,需要重新执行方法selectNow,检查是否有已经ready的selectionKey。

public final class NioEventLoop extends SingleThreadEventLoop {

    public void rebuildSelector() {

        if (!inEventLoop()) {

            execute(new Runnable() {

                @Override

                public void run() {

                    rebuildSelector();

                }

            });

            return;

        }

 

        final Selector oldSelector = selector;

        final Selector newSelector;

 

        if (oldSelector == null) {

            return;

        }

 

        try {

            newSelector = openSelector();

        } catch (Exception e) {

            logger.warn("Failed to create a new Selector.", e);

            return;

        }

 

        int nChannels = 0;

        for (;;) {

            try {

                for (SelectionKey key: oldSelector.keys()) {

                    Object a = key.attachment();

                    try {

                        if (!key.isValid() || key.channel().keyFor(newSelector) != null) {

                            continue;

                        }

                        int interestOps = key.interestOps();

                        key.cancel();

                        SelectionKey newKey = key.channel().register(newSelector, interestOps, a);

                        if (a instanceof AbstractNioChannel) {

                            ((AbstractNioChannel) a).selectionKey = newKey;

                        }

                        nChannels ++;

                    } catch (Exception e) {

                        logger.warn("Failed to re-register a Channel to the new Selector.", e);

                        if (a instanceof AbstractNioChannel) {

                            AbstractNioChannel ch = (AbstractNioChannel) a;

                            ch.unsafe().close(ch.unsafe().voidPromise());

                        } else {

                            @SuppressWarnings("unchecked")

                            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;

                            invokeChannelUnregistered(task, key, e);

                        }

                    }

                }

            } catch (ConcurrentModificationException e) {

                continue;

            }

 

            break;

        }

        selector = newSelector;

        try {

            oldSelector.close();

        } catch (Throwable t) {

            if (logger.isWarnEnabled()) {

                logger.warn("Failed to close the old Selector.", t);

            }

        }

        logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");

    }

}

 

      回到run方法,接下来就调用processSelectedKeys方法(处理IO任务),但selectedKeys != null时,调用processSelectedKeysOptimized方法,迭代selectedKeys获取就绪的IO事件的selectKey存放在数组selectedKeys中国。

      然后为每个事件都调用processSelectedKey来处理它,processSelectedKey中分别处理OP_READ、OP_WRITE、OP_CPNNECT事件。

      最后调用runAllTasks方法(非IO任务),该方法首先会调用fetchFromScheduledTaskQueue方法,把scheduledTaskQueue中已经超过延迟执行的任务移动到taskQueue中等待被执行,然后依次从taskQueue中获取任务执行,每执行64个任务,进行耗时检查,如果已执行时间超过了预先设定的执行时间,则停止执行非IO任务,避免非IO任务太多,影响IO任务的执行。

public final class NioEventLoop extends SingleThreadEventLoop {

    @Override

    protected void run() {

        boolean oldWakenUp = wakenUp.getAndSet(false);

        try {

            if (hasTasks()) {

                selectNow();

            } else {

                select(oldWakenUp);

                if (wakenUp.get()) {

                    selector.wakeup();

                }

            }

   

            cancelledKeys = 0;

            needsToSelectAgain = false;

            final int ioRatio = this.ioRatio;

            if (ioRatio == 100) {

                processSelectedKeys();

                runAllTasks();

            } else {

                final long ioStartTime = System.nanoTime();

   

                processSelectedKeys();

   

                final long ioTime = System.nanoTime() - ioStartTime;

                runAllTasks(ioTime * (100 - ioRatio) / ioRatio);

            }

   

            if (isShuttingDown()) {

                closeAll();

                if (confirmShutdown()) {

                    cleanupAndTerminate(true);

                    return;

                }

            }

        } catch (Throwable t) {

            logger.warn("Unexpected exception in the selector loop.", t);

            try {

                Thread.sleep(1000);

            } catch (InterruptedException e) {

            }

        }

        scheduleExecution();

    }

}

public final class NioEventLoop extends SingleThreadEventLoop {

    private void processSelectedKeys() {

        if (selectedKeys != null) {

            processSelectedKeysOptimized(selectedKeys.flip());

        } else {

            processSelectedKeysPlain(selector.selectedKeys());

        }

    }

}

public final class NioEventLoop extends SingleThreadEventLoop {

    protected boolean runAllTasks() {

        fetchFromScheduledTaskQueue();

        Runnable task = pollTask();

        if (task == null) {

            return false;

        }

 

        for (;;) {

            try {

                task.run();

            } catch (Throwable t) {

                logger.warn("A task raised an exception.", t);

            }

 

            task = pollTask();

            if (task == null) {

                lastExecutionTime = ScheduledFutureTask.nanoTime();

                return true;

            }

        }

    }

}

 

      小结:每个NioEventLoop对应一个线程和一个Sekectir,NioServerSocketChannel会主动注册到某一个NioEventLoop的Selector上,NioEventLoop负责事件轮询。

      Outbound事件都是请求事件,发起者是Channel,处理者是unsafe(不安全),通过Outbound事件进行通知,传播方向是tail到head。

      Inbound事件发起者是unsafe,事件处理者是Channel,是通知事件,传播方向是从头到尾。

 

Netty内存管理机制

      首先会申请一大块内存Arena,Arena由许多的Chunk组成,而每个Chunk默认由2048个page组成。

      Chunk通过AVL树(平衡二叉树)形式组成Page,每一个叶子节点表示一个Page,而中间节点表示内存区域,节点主机记录它在整个Arena中的偏移地址。

      当区域给分配出去后,中间节点上的标记位会被标记,这样就标识这个中间节点以下的所有节点都已被分配了。

      对于大于8K的内存分配在poolChunkList中,而PoolSubpage用于分配小于8k的内存,它会把一个page分割成多个段,进行内存分配。

 

BytBuf特点

      支持自动化扩容(4M),通过内置的符合缓冲类型,实现零拷贝;不需要调用flip()来切换读写模式,读取和写入索引分开;引用计数基于AtomicIntegerFiledUpdateer用于内存回收;PooledByteBuf采用二叉树实现一个内存池,集中管理内存的分配和释放,不用每次使用都新建一个缓冲区对象。UnpooledHeapByteBuf每次都会新建一个缓冲区对象。

这篇关于Java网络编程(11) - NIOEventLoopGroup源码解读,Netty内存管理机制是怎么样的?BytBuf特点是什么?的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/756829

相关文章

Java中对象的创建和销毁过程详析

《Java中对象的创建和销毁过程详析》:本文主要介绍Java中对象的创建和销毁过程,对象的创建过程包括类加载检查、内存分配、初始化零值内存、设置对象头和执行init方法,对象的销毁过程由垃圾回收机... 目录前言对象的创建过程1. 类加载检查2China编程. 分配内存3. 初始化零值4. 设置对象头5. 执行

SpringBoot整合easy-es的详细过程

《SpringBoot整合easy-es的详细过程》本文介绍了EasyES,一个基于Elasticsearch的ORM框架,旨在简化开发流程并提高效率,EasyES支持SpringBoot框架,并提供... 目录一、easy-es简介二、实现基于Spring Boot框架的应用程序代码1.添加相关依赖2.添

通俗易懂的Java常见限流算法具体实现

《通俗易懂的Java常见限流算法具体实现》:本文主要介绍Java常见限流算法具体实现的相关资料,包括漏桶算法、令牌桶算法、Nginx限流和Redis+Lua限流的实现原理和具体步骤,并比较了它们的... 目录一、漏桶算法1.漏桶算法的思想和原理2.具体实现二、令牌桶算法1.令牌桶算法流程:2.具体实现2.1

SpringBoot中整合RabbitMQ(测试+部署上线最新完整)的过程

《SpringBoot中整合RabbitMQ(测试+部署上线最新完整)的过程》本文详细介绍了如何在虚拟机和宝塔面板中安装RabbitMQ,并使用Java代码实现消息的发送和接收,通过异步通讯,可以优化... 目录一、RabbitMQ安装二、启动RabbitMQ三、javascript编写Java代码1、引入

spring-boot-starter-thymeleaf加载外部html文件方式

《spring-boot-starter-thymeleaf加载外部html文件方式》本文介绍了在SpringMVC中使用Thymeleaf模板引擎加载外部HTML文件的方法,以及在SpringBoo... 目录1.Thymeleaf介绍2.springboot使用thymeleaf2.1.引入spring

Java实现检查多个时间段是否有重合

《Java实现检查多个时间段是否有重合》这篇文章主要为大家详细介绍了如何使用Java实现检查多个时间段是否有重合,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录流程概述步骤详解China编程步骤1:定义时间段类步骤2:添加时间段步骤3:检查时间段是否有重合步骤4:输出结果示例代码结语作

Java中String字符串使用避坑指南

《Java中String字符串使用避坑指南》Java中的String字符串是我们日常编程中用得最多的类之一,看似简单的String使用,却隐藏着不少“坑”,如果不注意,可能会导致性能问题、意外的错误容... 目录8个避坑点如下:1. 字符串的不可变性:每次修改都创建新对象2. 使用 == 比较字符串,陷阱满

Java判断多个时间段是否重合的方法小结

《Java判断多个时间段是否重合的方法小结》这篇文章主要为大家详细介绍了Java中判断多个时间段是否重合的方法,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录判断多个时间段是否有间隔判断时间段集合是否与某时间段重合判断多个时间段是否有间隔实体类内容public class D

IDEA编译报错“java: 常量字符串过长”的原因及解决方法

《IDEA编译报错“java:常量字符串过长”的原因及解决方法》今天在开发过程中,由于尝试将一个文件的Base64字符串设置为常量,结果导致IDEA编译的时候出现了如下报错java:常量字符串过长,... 目录一、问题描述二、问题原因2.1 理论角度2.2 源码角度三、解决方案解决方案①:StringBui

Java覆盖第三方jar包中的某一个类的实现方法

《Java覆盖第三方jar包中的某一个类的实现方法》在我们日常的开发中,经常需要使用第三方的jar包,有时候我们会发现第三方的jar包中的某一个类有问题,或者我们需要定制化修改其中的逻辑,那么应该如何... 目录一、需求描述二、示例描述三、操作步骤四、验证结果五、实现原理一、需求描述需求描述如下:需要在