Netty源码ByteBuf详解

2023-11-23 16:32
文章标签 源码 详解 netty bytebuf

本文主要是介绍Netty源码ByteBuf详解,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

ByteBuf简介

作为一个高性能网络编程框架,Netty的字节容器ByteBuf相较于JDK标准库自带的ByteBuffer,有着更加灵活、更高效的方式处理字节数据,它有着如下几个特性:

  1. 内存分配:ByteBuf按照分类可以分为堆内内存和直接内存,前者分配在JVM堆内存中,由JVM进行管理和GC,后者直接分配在本地内存上,需要我们进行手动释放,但是性能要好于前者。
  2. 读写指针:ByteBuf为读写操作分别提供读写指针,通过读写指针我们可以快速定位、设置指定偏移量上的数据。
  3. 容量调整:ByteBuf会根据使用情况进行动态扩容和缩容,使得它更加灵活,可以适应不同的数据大小。
  4. 高级操作:ByteBuf提供切片、合并、复制等,使得操作更加方便和高效。

ByteBuf常见操作示例

ByteBuf包含许多灵活的操作,这里读者给出一套详尽的使用演示,读者可以根据注释自行了解:

public static void main(String[] args) {// initial:9 max: 100ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(9, 100);print("allocate ByteBuf(9, 100)", buffer);// 写入字节数组 写指针移动 未到capacity 仍然可写buffer.writeBytes(new byte[]{1, 2, 3, 4});print("writeBytes(1, 2, 3, 4)", buffer);// 写入整型 占用四个字节buffer.writeInt(12);print("writeInt(12)", buffer);// 写入到达初始化的字节大小 9 isWritable未falsebuffer.writeBytes(new byte[]{5});print("writeBytes(5)", buffer);// 扩容,扩容之后可以继续写buffer.writeBytes(new byte[]{6});print("writeBytes(6)", buffer);// get 方法不改变读写指针System.out.println("getByte(3) return: " + buffer.getByte(3));System.out.println("getShort(3) return: " + buffer.getShort(3));System.out.println("getInt(3) return: " + buffer.getInt(3));print("getByte()", buffer);// set 方法不改变读写指针buffer.setByte(buffer.readableBytes() + 1, 0);print("setByte()", buffer);// read 方法改变读指针byte[] bytes = new byte[buffer.readableBytes()];buffer.readBytes(bytes);
//        buffer.readByte();
//        buffer.readInt();// ... 其他各种不同的读方法print("readBytes(" + bytes.length + ")", buffer);// 写入到最大容量for (byte i = 10; i < 100; i++) {buffer.writeByte(i);}print("write to max", buffer);// 超出maxbuffer.writeByte(1);}private static void print(String action, ByteBuf byteBuf) {//打印本次操作行为System.out.println("after ===========" + action + "============");//容量System.out.println("capacity(): " + byteBuf.capacity());//最大容量System.out.println("maxCapacity(): " + byteBuf.maxCapacity());//读指针位置System.out.println("readerIndex(): " + byteBuf.readerIndex());//可读字节数System.out.println("readableBytes(): " + byteBuf.readableBytes());// 读写指针是否重复,即判断是否可读System.out.println("isReadable(): " + byteBuf.isReadable());//写指针位置System.out.println("writerIndex(): " + byteBuf.writerIndex());//可写字节System.out.println("writableBytes(): " + byteBuf.writableBytes());// capacity 和 写指针重合不可写 但是扩容之后还可写System.out.println("isWritable(): " + byteBuf.isWritable());//最大可写字节数System.out.println("maxWritableBytes(): " + byteBuf.maxWritableBytes());System.out.println();}

输出结果:

after ===========allocate ByteBuf(9, 100)============
capacity(): 9
maxCapacity(): 100
readerIndex(): 0
readableBytes(): 0
isReadable(): false
writerIndex(): 0
writableBytes(): 9
isWritable(): true
maxWritableBytes(): 100after ===========writeBytes(1, 2, 3, 4)============
capacity(): 9
maxCapacity(): 100
readerIndex(): 0
readableBytes(): 4
isReadable(): true
writerIndex(): 4
writableBytes(): 5
isWritable(): true
maxWritableBytes(): 96after ===========writeInt(12)============
capacity(): 9
maxCapacity(): 100
readerIndex(): 0
readableBytes(): 8
isReadable(): true
writerIndex(): 8
writableBytes(): 1
isWritable(): true
maxWritableBytes(): 92after ===========writeBytes(5)============
capacity(): 9
maxCapacity(): 100
readerIndex(): 0
readableBytes(): 9
isReadable(): true
writerIndex(): 9
writableBytes(): 0
isWritable(): false
maxWritableBytes(): 91after ===========writeBytes(6)============
capacity(): 64
maxCapacity(): 100
readerIndex(): 0
readableBytes(): 10
isReadable(): true
writerIndex(): 10
writableBytes(): 54
isWritable(): true
maxWritableBytes(): 90getByte(3) return: 4
getShort(3) return: 1024
getInt(3) return: 67108864
after ===========getByte()============
capacity(): 64
maxCapacity(): 100
readerIndex(): 0
readableBytes(): 10
isReadable(): true
writerIndex(): 10
writableBytes(): 54
isWritable(): true
maxWritableBytes(): 90after ===========setByte()============
capacity(): 64
maxCapacity(): 100
readerIndex(): 0
readableBytes(): 10
isReadable(): true
writerIndex(): 10
writableBytes(): 54
isWritable(): true
maxWritableBytes(): 90after ===========readBytes(10)============
capacity(): 64
maxCapacity(): 100
readerIndex(): 10
readableBytes(): 0
isReadable(): false
writerIndex(): 10
writableBytes(): 54
isWritable(): true
maxWritableBytes(): 90after ===========write to max============
capacity(): 100
maxCapacity(): 100
readerIndex(): 10
readableBytes(): 90
isReadable(): true
writerIndex(): 100
writableBytes(): 0
isWritable(): false
maxWritableBytes(): 0Exception in thread "main" java.lang.IndexOutOfBoundsException: writerIndex(100) + minWritableBytes(1) exceeds maxCapacity(100): PooledUnsafeDirectByteBuf(ridx: 10, widx: 100, cap: 100/100)at io.netty.buffer.AbstractByteBuf.ensureWritable0(AbstractByteBuf.java:275)at io.netty.buffer.AbstractByteBuf.writeByte(AbstractByteBuf.java:938)at netty.book.bytebuf.ByteBufTest.main(ByteBufTest.java:60)

ByteBuf源码解析

UML图了解ByteBuf体系结构

我们可以从类图了解ByteBuf的体系结构,由下图的关键字可知,ByteBuf按照分类可以分为三种分类:

  1. 按照获取字节容是否是从容器中获取,可将其分为PooledUnpooled
  2. 按照对于ByteBuf的基本操作的安全性,我们可以分为Unsafe和非Unsafe
  3. 按照ByteBuf在内存所处位置可分为DirectHeap

在这里插入图片描述

Pooled和Unpooled

由名字可知,前者用到了池化思想,即从预先分配好的一块内存中获取一块连续的内存作为ByteBuf,而后者是非池化的,每次需要调用底层的API向操作系统申请一块内存。

Unsafe和非Unsafe

Unsafe和非Unsafe的区别即对于ByteBuf的操作是安全还是非安全的,这一点我们可以通过源码来讲解。

先来看看UnpooledUnsafeHeapByteBuf这个类,由名字可知这个类是Unsafe的,我们可以从getByte方法为入口了解一下Unsafe的操作。

首先getByte会检查索引位置,然后调用_getByte获取指定索引位置的数据,我们步入一探究竟。

@Overridepublic byte getByte(int index) {//检查索引checkIndex(index);//通过_getByte获取bytereturn _getByte(index);}

可以看到_getByte会通过一个工具类UnsafeByteBufUtil获取字节数据,我们不妨步进一下查看其实现细节:

@Overrideprotected byte _getByte(int index) {return UnsafeByteBufUtil.getByte(array, index);}

经过重重步进,我们终于知道了这个操作为什么是UNSAFE的原因,原来这个类获取字节数据的操作并非直接通过索引读取数据,而至通过实际内存地址偏移量配合索引完成字节数据的读取。

static byte getByte(byte[] data, int index) {//通过访问JVM直接内存偏移量BYTE_ARRAY_BASE_OFFSET 加上索引获取当前字节实际位置return UNSAFE.getByte(data, BYTE_ARRAY_BASE_OFFSET + index);}

对比一下,我们再看看非Unsafe的操作,我们仍然以getByte作为入口。可以看到_getByte仍然是通过一个工具类获取字节数据。

@Overrideprotected byte _getByte(int index) {return HeapByteBufUtil.getByte(array, index);}

查阅最终的代码可以了解到,当前字节读取的逻辑是直接通过JVM基本类型byte数组的索引位置获取的,所以操作是安全的。

static byte getByte(byte[] memory, int index) {return memory[index];}
Direct和Heap

Direct的字节容器直接分配在操作系统的本地内存中,不受JVMGC控制,需要用户手动释放。而后者则是分配在JVM的堆内存上,受JVMGC进行管理。我们可以通过UnpooledDirectByteBufUnpooledHeapByteBuf的源码进一步了解两者的区别。

先来看看UnpooledHeapByteBuf,它的取值是通过HeapByteBufUtil获取HeapByteBuf,我们不妨步入查看实现细节:

@Overrideprotected byte _getByte(int index) {return HeapByteBufUtil.getByte(array, index);}

查看getByte方法,可以看到其操作的内存就是分配JVM堆内存上的byte数组。

 static byte getByte(byte[] memory, int index) {return memory[index];}

再查看UnpooledDirectByteBuf_getByte,可以看到它是通过ByteBufferget方法进行读取操作的。

@Overrideprotected byte _getByte(int index) {return buffer.get(index);}

查看其内部逻辑,可以看到它是通过unsafe方法操作由ix方法得到的内存地址从而获取字节数据。

public byte get(int i) {return ((unsafe.getByte(ix(checkIndex(i)))));}

AbstractByteBufAllocator详解

简介

AbstractByteBufAllocatorByteBuf字节容器分配一个抽象类,它继承自ByteBufAllocator,已实现了字节分配容器的大部分行为,可以说是ByteBufAllocator的基本骨架。

buffer方法

查看buffer方法,它是ByteBufAllocator定义的一个方法,它的作用就是分配一个ByteBuf,具体分配直接内存的ByteBuf还是JVM堆内存的ByteBuf是由directByDefault这个变量决定。

 @Overridepublic ByteBuf buffer() {//如果directByDefault返回true则返回本地堆内存的ByteBufif (directByDefault) {return directBuffer();}//返回JVM堆内存的ByteBufreturn heapBuffer();}

关于directByDefault的计算,查阅源码的定义看到,如果用户使用构造方法将preferDirect 设置为true时,且类路径上可以找到Unsafe这个类,那么这个值就为true,用户即可在分配非堆内存。

protected AbstractByteBufAllocator(boolean preferDirect) {//如果preferDirect设置为true且当前这个类路径上存在unsafe类,则directByDefault 为true,用户即可在分配非堆内存directByDefault = preferDirect && PlatformDependent.hasUnsafe();emptyBuf = new EmptyByteBuf(this);}
directBuffer

我们再来看看本次内存获取ByteBuf的方法directBufferdirectBuffer传入initialCapacityInteger.MAX_VALUE,我们不妨步入看看这几个参数和含义和方法内部的逻辑:

@Overridepublic ByteBuf directBuffer(int initialCapacity) {return directBuffer(initialCapacity, Integer.MAX_VALUE);}

可以看到,该方法会基于外部传入的初始化容量和最大容量调用newDirectBuffer创建本地内存的ByteBuf

@Overridepublic ByteBuf directBuffer(int initialCapacity, int maxCapacity) {if (initialCapacity == 0 && maxCapacity == 0) {return emptyBuf;}//校验参数有效性validate(initialCapacity, maxCapacity);//基于初始化容量和最大容量创建直接内存容器return newDirectBuffer(initialCapacity, maxCapacity);}

因为内存分配可能涉及池化分配和非池化分配,所以当前的抽象方法AbstractByteBufAllocatornewDirectBuffer设置为抽象方法交由子类实现,具体实现细节笔者会在后文展开分析:

 protected abstract ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity);
heapBuffer

我们再来看看JVM堆中分配ByteBuf 的逻辑,可以看到它也是直接调用heapBuffer完成创建:

@Overridepublic ByteBuf heapBuffer(int initialCapacity) {return heapBuffer(initialCapacity, Integer.MAX_VALUE);}

查看heapBuffer内部实现,可以看到它完成会容量的校验之后,直接调用newHeapBuffer创建JVM堆上的ByteBuf

@Overridepublic ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {if (initialCapacity == 0 && maxCapacity == 0) {return emptyBuf;}//参数校验validate(initialCapacity, maxCapacity);//基于初始化容量和最大容量完成堆上字节容器的创建return newHeapBuffer(initialCapacity, maxCapacity);}

同样的newHeapBuffer也是交由具体的子类实现:

 protected abstract ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity);

UnpooledByteBufAllocator详解

简介

UnpooledByteBufAllocator对应的字节容器都是未池化的,这意味我们每次通过它获取的byteBuf都是全新创建的。最常见的操作即newHeapBuffernewDirectBuffer,这些方法都是继承自抽象类AbstractByteBufAllocator,我们不妨步入内部查看一下它的实现逻辑。

newHeapBuffer

newHeapBuffer即在JVM堆内存上创建ByteBuf ,它的执行步骤如下:

  1. 基于抽象类AbstractByteBufAllocator的调用initialCapacity256maxCapacity为整型最大值。
  2. 它在执行前会判断当前的类路径上是否存在unSafe类,如果存在则创建UnpooledUnsafeHeapByteBuf,反之创建UnpooledHeapByteBuf
@Overrideprotected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {return PlatformDependent.hasUnsafe() ? new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity): new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);}

我们先来看看UnpooledUnsafeHeapByteBuf的实现,由关键字UnpooledUnsafeHeap,可知其非池化、操作不安全、JVM堆内存创建,我们不妨步入构造方法查看其实现细节:

UnpooledUnsafeHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {super(alloc, initialCapacity, maxCapacity);}

可以看到这个构造方法也仅仅是一个自调用,我们不妨步入查看实现:

 protected UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {this(alloc, new byte[initialCapacity], 0, 0, maxCapacity);}

再次步入步入这个构造方法,可以看到其内部实现逻辑分为5大步骤:

  1. 设置最UnpooledHeapByteBuf大容量。
  2. 参数校验。
  3. 设置ByteBufAllocator
  4. 初始化bytebuf底层实质存储数据的byte数组
  5. 初始化读写索引。
 private UnpooledHeapByteBuf(ByteBufAllocator alloc, byte[] initialArray, int readerIndex, int writerIndex, int maxCapacity) {//设置最大容量super(maxCapacity);//参数校验if (alloc == null) {throw new NullPointerException("alloc");}if (initialArray == null) {throw new NullPointerException("initialArray");}if (initialArray.length > maxCapacity) {throw new IllegalArgumentException(String.format("initialCapacity(%d) > maxCapacity(%d)", initialArray.length, maxCapacity));}this.alloc = alloc;//初始化bytebuf底层实质存储数据的byte数组setArray(initialArray);//初始化读写索引setIndex(readerIndex, writerIndex);}

这里我们查看setArray,逻辑很简单,array设置为JVM堆内存的byte数组。

 private void setArray(byte[] initialArray) {array = initialArray;tmpNioBuf = null;}

再来看看setIndex方法,它在完成参数校验确保读索引位于写索引后面,然后再调用setIndex0,将上文传入的0设置为读写索引的位置:

 @Overridepublic ByteBuf setIndex(int readerIndex, int writerIndex) {//参数校验if (readerIndex < 0 || readerIndex > writerIndex || writerIndex > capacity()) {throw new IndexOutOfBoundsException(String.format("readerIndex: %d, writerIndex: %d (expected: 0 <= readerIndex <= writerIndex <= capacity(%d))",readerIndex, writerIndex, capacity()));}//设置读写索引为0setIndex0(readerIndex, writerIndex);return this;}

步入setIndex0即可看到它会调整当前byteBuf的读写索引为0。

final void setIndex0(int readerIndex, int writerIndex) {this.readerIndex = readerIndex;this.writerIndex = writerIndex;}

了解了UnpooledUnsafeHeapByteBuf,我们再来看看UnpooledHeapByteBuf,从构造方法的调用来看,和前者创建逻辑基本是一致的,既然构造方法都是通过调用UnpooledHeapByteBuf的构造方法实现,那么它们有什么区别呢?

 protected UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {this(alloc, new byte[initialCapacity], 0, 0, maxCapacity);}

我们都知道unSafe和非unsafe的区别则是两者对于内存的操作,前者是直接基于地址去操作byteBuf而后者是基于索引去操作,所以我们不妨看看两者的getByte方法。

先来看看UnpooledUnsafeHeapByteBuf,从源码中我们就可以知晓他读取byte数据时是基于UnsafeByteBufUtil的。

//UnpooledUnsafeHeapByteBuf的getByte
@Overridepublic byte getByte(int index) {checkIndex(index);//直接调用下方_getBytereturn _getByte(index);}@Overrideprotected byte _getByte(int index) {return UnsafeByteBufUtil.getByte(array, index);}

不断步进UnsafeByteBufUtil.getByte实际实现可知,它的读取操作通过UNSAFE基于内存地址偏移量加上byte长度加上索引位置得到实际字节,存取操作相对高效。

static byte getByte(byte[] data, int index) {return UNSAFE.getByte(data, BYTE_ARRAY_BASE_OFFSET + index);}

再来看看UnpooledHeapByteBufgetByte,同样的它也是通过一个HeapByteBufUtil进行字节读取。

@Overridepublic byte getByte(int index) {ensureAccessible();return _getByte(index);}@Overrideprotected byte _getByte(int index) {return HeapByteBufUtil.getByte(array, index);}

步入查看其操作是直接基于索引操作数据,并没有直接通过内存地址进行操作,相较于前者是安全单不高效。

 static byte getByte(byte[] memory, int index) {return memory[index];}
newDirectBuffer

newDirectBuffer的创建的步骤和前者差不多:

  1. 判断类路径上是否有Unsafe,如果有创建newUnsafeDirectByteBuf,反之创建UnpooledDirectByteBuf
  2. 判断是否禁用内存泄漏探测,如有禁止则直接返回ByteBuf ,反之调用toLeakAwareBuffer将其包装为内存泄漏可感知的ByteBuf
 @Overrideprotected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {ByteBuf buf = PlatformDependent.hasUnsafe() ?UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);return disableLeakDetector ? buf : toLeakAwareBuffer(buf);}

步入UnpooledUnsafeDirectByteBuf的构造方法可以看到它的步骤和创建JVM堆区内存的差不多,唯一区别分配内存时会通过allocateDirect得到ByteBuffer,再通过setByteBuffer完成UnpooledUnsafeDirectByteBuf的创建:

protected UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {//设置最大容量super(maxCapacity);//参数校验if (alloc == null) {throw new NullPointerException("alloc");}if (initialCapacity < 0) {throw new IllegalArgumentException("initialCapacity: " + initialCapacity);}if (maxCapacity < 0) {throw new IllegalArgumentException("maxCapacity: " + maxCapacity);}if (initialCapacity > maxCapacity) {throw new IllegalArgumentException(String.format("initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));}this.alloc = alloc;//设置ByteBuffersetByteBuffer(allocateDirect(initialCapacity), false);}

我们先来看看allocateDirect做了什么,查看源码我们发现它会调用ByteBuffer.allocateDirect创建一个全新的直接内存。

 protected ByteBuffer allocateDirect(int initialCapacity) {return ByteBuffer.allocateDirect(initialCapacity);}

查看其内部实现,我们会得到这样一个构造方法,它的整体步骤为:

  1. 基于我们传入的容量即cap,创建一个MappedByteBuffer直接内存对象。
  2. 为创建好的内存对象分配内存大小。
  3. 设置内存地址起始位置。
  4. 清空本段内存的数据。
DirectByteBuffer(int cap) {                   // package-private//基于我们传入的容量即`cap`,创建一个`MappedByteBuffer`直接内存对象super(-1, 0, cap, cap);boolean pa = VM.isDirectMemoryPageAligned();int ps = Bits.pageSize();long size = Math.max(1L, (long)cap + (pa ? ps : 0));Bits.reserveMemory(size, cap);//分配内存long base = 0;try {base = unsafe.allocateMemory(size);} catch (OutOfMemoryError x) {Bits.unreserveMemory(size, cap);throw x;}unsafe.setMemory(base, size, (byte) 0);//设置内存地址if (pa && (base % ps != 0)) {// Round up to page boundaryaddress = base + ps - (base & (ps - 1));} else {address = base;}//清空本段内存的数据cleaner = Cleaner.create(this, new Deallocator(base, size, cap));att = null;}

allocateDirect完成后则调用setByteBuffer进行参数设置,setByteBuffer逻辑很简单,基于上一步创建好的buffer设置memoryAddress 即堆外内存地址,并将当前堆内存的大小设置为capacity

final void setByteBuffer(ByteBuffer buffer, boolean tryFree) {if (tryFree) {ByteBuffer oldBuffer = this.buffer;if (oldBuffer != null) {if (doNotFree) {doNotFree = false;} else {freeDirect(oldBuffer);}}}this.buffer = buffer;memoryAddress = PlatformDependent.directBufferAddress(buffer);tmpNioBuf = null;capacity = buffer.remaining();}

同理查看UnpooledDirectByteBuf构造方法如下,不多赘述,逻辑和前者差不多。

protected UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {super(maxCapacity);if (alloc == null) {throw new NullPointerException("alloc");}if (initialCapacity < 0) {throw new IllegalArgumentException("initialCapacity: " + initialCapacity);}if (maxCapacity < 0) {throw new IllegalArgumentException("maxCapacity: " + maxCapacity);}if (initialCapacity > maxCapacity) {throw new IllegalArgumentException(String.format("initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));}this.alloc = alloc;setByteBuffer(ByteBuffer.allocateDirect(initialCapacity));}

我们还是以getByte看看两者操作区别:

UnpooledUnsafeDirectByteBuf_getByte如下,可以看到在获取字节时调用了一个名为addr的方法:

 protected byte _getByte(int index) {return UnsafeByteBufUtil.getByte(this.addr(index));}

查看addr方法可知,其数据数据是基于内存地址获取的:

long addr(int index) {return this.memoryAddress + (long)index;}

我们不断步入getByte内部即可可以看到基于这个内存地址获取对应的字节数据:

 static byte getByte(long address) {return UNSAFE.getByte(address);}

反之UnpooledDirectByteBuf则是直接通过索引获取,具体地址的计算由底层实现。

@Overrideprotected byte _getByte(int index) {return buffer.get(index);}
小结

自此我们将非池化的UnpooledByteBufAllocatorJVM堆内存byteBuf分配和本地内存的byteBuf内存分配都讲解完成。整体来说只要把握好Heap是在JVM堆区创建字节容器、Direct是在本次内存创建字节容器、Unsafe操作都是基于内存偏移量相较于非Unsafe的操作更加高效且不安全,通过了解这几个关键字对应实现细节即可快速把握源码实现重点。

PooledByteBufAllocator详解

newHeapBuffer

相比于非池化的UnpooledByteBufAllocatorPooledByteBufAllocator的内存分配相比前者会复杂许多,但heapdirect整体分配流程确实大差不差,所以我们不妨以newHeapBuffer为例查看其整体分配流程:

可以看到PooledByteBufAllocatornewHeapBuffer执行步骤为:

  1. 获取当前线程的缓存PoolThreadCache
  2. 基于这个缓存拿到PoolArena
  3. 如果PoolArena不为空则调用allocate获取ByteBuf ,若为空吗则进入步骤4。
  4. 创建一个UnpooledHeapByteBuf
  5. 将其设置为内存泄漏可感知的ByteBuf
 @Overrideprotected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {//获取当前线程的缓存PoolThreadCachePoolThreadCache cache = threadCache.get();//基于这个缓存拿到PoolArenaPoolArena<byte[]> heapArena = cache.heapArena;ByteBuf buf;//如果PoolArena不为空则调用allocate完成byteBuf创建和内存分配if (heapArena != null) {buf = heapArena.allocate(cache, initialCapacity, maxCapacity);} else {//若为空创建一个UnpooledHeapByteBufbuf = new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);}//将其设置为内存泄漏可感知的ByteBuf`return toLeakAwareBuffer(buf);}

了解整体概念后,我们不妨对这其中的每一个概念进行拆解分析,首先是PoolThreadCache,们查看其源码,可以看到其继承了FastThreadLocal这个类,从名字可以猜出FastThreadLocal是一个特殊的ThreadLocal

我们上文提到一个get的操作,实际上若get方法初次尝试从threadCache取值时是为空的,此时它就会调用PoolThreadLocalCacheinitialValue完成初始化,就如下代码所示,会基于当前线程完成一个PoolThreadCache初始化工作。

final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {//略//PoolThreadLocalCache 初始化方法@Overrideprotected synchronized PoolThreadCache initialValue() {//初始化heapArena final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);//初始化directArena final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);//完成PoolThreadCache创建,其内部会完成cache、arena等成员变量初始化和创建工作return new PoolThreadCache(heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);}}

再来看看allocate方法的整体步骤,的我们步入其内部可知其bytebuf创建的逻辑:

  1. 调用newByteBuf完成PooledByteBuf创建。
  2. 调用allocate完成内存分配。
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {PooledByteBuf<T> buf = newByteBuf(maxCapacity);allocate(cache, buf, reqCapacity);return buf;}

查看newByteBuf的创建逻辑可知,PooledByteBuf的创建是基于PooledUnsafeHeapByteBufPooledHeapByteBuf实现的:

@Overrideprotected PooledByteBuf<byte[]> newByteBuf(int maxCapacity) {return HAS_UNSAFE ? PooledUnsafeHeapByteBuf.newUnsafeInstance(maxCapacity): PooledHeapByteBuf.newInstance(maxCapacity);}

我们以PooledUnsafeHeapByteBufnewUnsafeInstance为例查看PooledUnsafeHeapByteBuf 的插件逻辑,从下文代码可知,PooledUnsafeHeapByteBuf 的创建是从RECYCLER获取的,得到buf 后会调用reuse完成读写指针的重置。

static PooledUnsafeHeapByteBuf newUnsafeInstance(int maxCapacity) {//从RECYCLER中获取循环利用或者全新创建的PooledUnsafeHeapByteBuf PooledUnsafeHeapByteBuf buf = RECYCLER.get();//重置读写指针,确保后续读写工作可以正确执行buf.reuse(maxCapacity);return buf;}

我们不妨查看一下Recycler的实现,通过定义可知,当get为空时,Recycler会直接通过newObject创建一个PooledUnsafeHeapByteBuf ,并以handle作为内存地址的指针(关于handle的概念笔者会在后文详尽介绍,这里读者只需了解简单的概念即可)。

private static final Recycler<PooledUnsafeHeapByteBuf> RECYCLER = new Recycler<PooledUnsafeHeapByteBuf>() {@Overrideprotected PooledUnsafeHeapByteBuf newObject(Handle<PooledUnsafeHeapByteBuf> handle) {//以传入的handle作为内存地址的指针return new PooledUnsafeHeapByteBuf(handle, 0);}};

这里我们不妨小结一下池化的步骤:

  1. 无论是heap的创建还是direct的创建,都会先从threadCache中获取arena
  2. 若arena为空则新建一个UnpooledHeapByteBuf返回出去,若不为空则进入步骤3。
  3. 基于RECYCLER获取之前回收的的ByteBuf,若RECYCLER中不存在回收的ByteBuf则创建一个全新的ByteBuf
  4. 将上一步的ByteBuf读写指针重置,确保ByteBuf可用。
  5. 调用allocate完成内存分配。

内存规格介绍

后续我们会对内存分配这一步进行详尽讲解,在此之前,我们必须了解一下Netty中内存规格,设计者为了内存分配时尽可能节省且充分利用空间所设置的不同规格:

  1. huge大小为16M以上,不做缓存。
  2. chunk作为基本单位,其值设置为16M
  3. pagechunk的下一级内存规格,其大小为8k。
  4. subPage大小为0-512B之间。
  5. 设计者定义3种内存范围区间,即tiny(0-512b)small(512B-8k)normal(8k-16M)

在这里插入图片描述

page 级别内存分配

大抵了解了内存分配的大体步骤和内存规格之后,这里我不妨以我们以这段代码为入口了解一下page 级别的内存分配,如下所示,可以看到笔者用page记录了一个page的大小8*1024,然后基于PooledByteBufAllocator尝试得到一个内存为2个pageByteBuf

public class Main {public static void main(String[] args) {int page = 1024*8;PooledByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT;//尝试获取两个page内存大小的byteBuf ByteBuf byteBuf = allocator.directBuffer(2 * page);}
}

首先代码的调用会来到AbstractByteBufAllocatordirectBuffer,它以我们传入的所需要大小作为initialCapacity自调用directBuffer

 @Overridepublic ByteBuf directBuffer(int initialCapacity) {return directBuffer(initialCapacity, Integer.MAX_VALUE);}

步进之后可以看到对参数进行必要的校验之后,直接调用newDirectBuffer

@Overridepublic ByteBuf directBuffer(int initialCapacity, int maxCapacity) {//若需要为0则直接返回空ByteBuf if (initialCapacity == 0 && maxCapacity == 0) {return emptyBuf;}//校验initialCapacity和maxCapacity有效性validate(initialCapacity, maxCapacity);//调用newDirectBuffer完成内存分配return newDirectBuffer(initialCapacity, maxCapacity);}

于是代码又走了PooledByteBufAllocatornewDirectBuffer,其核心步骤即从当前线程缓存中得到directArena 进行bytebuf创建和内存分配,话不多说,我们直接步入:

@Overrideprotected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {//从当前线程的缓存中拿到directArena PoolThreadCache cache = threadCache.get();PoolArena<ByteBuffer> directArena = cache.directArena;ByteBuf buf;//基于directArena 完成bytebuf创建和内存分配if (directArena != null) {buf = directArena.allocate(cache, initialCapacity, maxCapacity);} else {if (PlatformDependent.hasUnsafe()) {buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);} else {buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);}}return toLeakAwareBuffer(buf);}

我们直接查看PoolArenaallocate方法,在通过newByteBuf得到一个PooledByteBuf之后,直接调用allocate进入内存分配:

PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {PooledByteBuf<T> buf = newByteBuf(maxCapacity);//完成bytebuf的内存分配allocate(cache, buf, reqCapacity);return buf;}

来到PoolArenaallocate,因为我们要求的内存大小为16k,属于page,于是先调用cache.allocateNormal尝试基于缓存完成内存分配,因为我们是第一次调用所以缓存没有数据,于是直接调用allocateNormal方法尝试内存分配。

private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {final int normCapacity = normalizeCapacity(reqCapacity);if (isTinyOrSmall(normCapacity)) { // capacity < pageSize//略} else {//略}final PoolSubpage<T> head = table[tableIdx];//略if (normCapacity <= chunkSize) {//尝试缓存分配if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {return;}//缓存分配失败,调用allocateNormal完成buf的内存分配allocateNormal(buf, reqCapacity, normCapacity);} else {// Huge allocations are never served via the cache so just call allocateHugeallocateHuge(buf, reqCapacity);}}

我们直接步入allocateNormal,可以看到它会先尝试中q050q025等这些ChunkList本地内存中进行分配,若分配失败,则后续会创建一个全新的chunk并完成buf内存分配:

private synchronized void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {//尝试从本次chunk列表中分配内存,若成功则直接返回if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||q075.allocate(buf, reqCapacity, normCapacity)) {++allocationsNormal;return;}//若上述本地内存分配失败则创建一个全新的chunk再基于这个chunk完成chunk内存分配PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);long handle = c.allocate(normCapacity);++allocationsNormal;assert handle > 0;//基于上一步的chunk完成buf内存分配c.initBuf(buf, handle, reqCapacity);qInit.add(c);}

所以我们来到了newChunk方法,可以看到其内部实现就是一个PoolChunk的创建:

 @Overrideprotected PoolChunk<ByteBuffer> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) {return new PoolChunk<ByteBuffer>(this, allocateDirect(chunkSize),pageSize, maxOrder, pageShifts, chunkSize);}

步入PoolChunk的构造方法,可以看到以下成员变量的创建和初始化,对应变量的含义笔者都已注释,读者可以自行参阅:

PoolChunk(PoolArena<T> arena, T memory, int pageSize, int maxOrder, int pageShifts, int chunkSize) {//unpooled 设置为false,意为当前为池化的chunkunpooled = false;//将调用的arena作为变量存到arena 中this.arena = arena;//记录外部创建可读写16k的DirectByteBuffer,后续的读写都是存入这个memory 中this.memory = memory;//pageSize为8kthis.pageSize = pageSize;//pageShifts 为13this.pageShifts = pageShifts;//maxOrder 为11,用于后续二叉树的创建this.maxOrder = maxOrder;//chunkSize 为16Mthis.chunkSize = chunkSize;//unusable为12后文用到unusable = (byte) (maxOrder + 1);//log2ChunkSize 为24log2ChunkSize = log2(chunkSize);subpageOverflowMask = ~(pageSize - 1);freeBytes = chunkSize;assert maxOrder < 30 : "maxOrder should be < 30, but is: " + maxOrder;maxSubpageAllocs = 1 << maxOrder;// 创建4096大小的byte数组,表示一颗二叉树,记录每一个节点所在的深度,后续我们就可以基于这个深度得到对应内存大小memoryMap = new byte[maxSubpageAllocs << 1];depthMap = new byte[memoryMap.length];int memoryMapIndex = 1;for (int d = 0; d <= maxOrder; ++ d) {int depth = 1 << d;for (int p = 0; p < depth; ++ p) {// in each level traverse left to right and set value to the depth of subtreememoryMap[memoryMapIndex] = (byte) d;depthMap[memoryMapIndex] = (byte) d;memoryMapIndex ++;}}//创建一个2048大小的PoolSubpage数组subpages = newSubpageArray(maxSubpageAllocs);}

唯一我们需要留心了解的就是memoryMapdepthMap记录的东西也是一样),首先它是一个基于数组创建的二叉树,其次每一个节点都记录着数的深度,注意索引0是无用的,后续我们就可以基于所需的内存大小定位到对应深度空闲未用到的节点完成内存分配。

以上面的代码为例,最终生成的二叉树大致为这样,可以看到数组中的索引0节点没有用到,而是从索引开始:

  1. 第1层1个节点,即索引1节点。
  2. 第2层2个节点,即索引2、3节点。
  3. 如此往复计算,第11层为2048个节点。

在这里插入图片描述

对应的每一个层级都代表一个内存范围,以我们本次代码为例,要求16k,按照当前二叉树结构,即取11层前两个空闲节点:

在这里插入图片描述

自此我们完成了PoolChunk的创建,于是代码来到了:

long handle = c.allocate(normCapacity);

步入allocate方法会和subpageOverflowMask进行&运算,得出结果不为0,说明基于normCapacity二进制高位存在1大于pageSize,所以直接步入allocateRun:

 long allocate(int normCapacity) {//subpageOverflowMask二进制为11111111111111111110000000000000if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSizereturn allocateRun(normCapacity);} else {return allocateSubpage(normCapacity);}}

步入allocateRun,因为我们的需求容量为16k,所以步入allocateRun的执行逻辑为:

  1. 所以经过运算后d为10层,即全是16k那层。
  2. 获取到该层空闲id为1024,即第10层首节点。
  3. freeBytes 减去本次要求的16k,代表当前内存已用16k。
  4. 返回id,该id即作为外部调用handle的值。
private long allocateRun(int normCapacity) {int d = maxOrder - (log2(normCapacity) - pageShifts);int id = allocateNode(d);if (id < 0) {return id;}freeBytes -= runLength(id);return id;}

基于上一步的handle,正式开始内存分配,可以看到我们的入参为创建的buf,对应内存地址handle,需求内存大小reqCapacity

c.initBuf(buf, handle, reqCapacity);

步入后就会看到这样一段代码逻辑:

  1. 基于handle拿到内存对应内存的索引和bitmapIdx。
  2. 如果bitmapIdx不为0则调用initBufWithSubpage,反之进入步骤3。
  3. 直接调用init方法对buf进行内存分配,可以看到入参为,当前调用的buf,handle即内存指针,还有内存偏移量,需要的内存大小,和本次所用的节点内存长度,和当前线程的parent的threadCache。
void initBuf(PooledByteBuf<T> buf, long handle, int reqCapacity) {int memoryMapIdx = memoryMapIdx(handle);int bitmapIdx = bitmapIdx(handle);if (bitmapIdx == 0) {byte val = value(memoryMapIdx);assert val == unusable : String.valueOf(val);buf.init(this, handle, runOffset(memoryMapIdx), reqCapacity, runLength(memoryMapIdx),arena.parent.threadCache());} else {initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity);}}

步入init可以看到该方法做了两件事:

  1. 调用init分配内存。
  2. 记录内存地址。
@Overridevoid init(PoolChunk<ByteBuffer> chunk, long handle, int offset, int length, int maxLength,PoolThreadCache cache) {super.init(chunk, handle, offset, length, maxLength, cache);initMemoryAddress();}

终于看到,init方法分配内存的逻辑,即通过handle得到内存地址,通过chunk.memory得到一块buf,然后记录cache等信息。

void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {assert handle >= 0;assert chunk != null;//记录本地调用的chunkthis.chunk = chunk;//记录handlethis.handle = handle;//记录上述步骤得到bufmemory = chunk.memory;//记录当前节点在二叉树对应层的偏移量,例如笔者本次的节点为1024,即在第10层第1个节点,所以offset为0this.offset = offset;//记录本次所需的长度this.length = length;//记录分配内存节点大小this.maxLength = maxLength;tmpNioBuf = null;this.cache = cache;}

然后就是记录实际使用的内存地址,后续读写时都是以这个memoryAddress 作为标量进行计算,这一点查看上文中对于堆内存的_getByte方法即可印证:

private void initMemoryAddress() {memoryAddress = PlatformDependent.directBufferAddress(memory) + offset;}

自此我们将内存分配的过程全部讲解完成了,来小结一下整体步骤:

  1. 创建chunk
  2. 基于二叉树查找我们所需内存分配大小对应的节点位置,以便后续记录内存分配情况。
  3. 分配内存,记录内存起始地址。

subPage级别的内存分配

subPage级别的内存分配流程也是差不多的,我们以这段代码为入口进行阅读:

public static void main(String[] args) {int page = 1024 * 8;PooledByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT;ByteBuf byteBuf = allocator.directBuffer(16);}

page级别内存分配过程相比,前面bytebuf创建是一样的,我们直接从内存分配这一步开始讲解:

  1. subpage级别的内存属于tiny这个区间,所以会进入isTinyOrSmall这个判断的分支
  2. 第一次在cache.allocateTiny这一步分配失败后,会尝试基于Chunk缓存池进行分配内存,若成功直接返回,反之进入步骤3。
  3. 基于allocateNormal进行内存分配。
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {final int normCapacity = normalizeCapacity(reqCapacity);if (isTinyOrSmall(normCapacity)) { // capacity < pageSizeint tableIdx;PoolSubpage<T>[] table;boolean tiny = isTiny(normCapacity);if (tiny) { // < 512if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {// was able to allocate out of the cache so move onreturn;}tableIdx = tinyIdx(normCapacity);table = tinySubpagePools;} else {//略}final PoolSubpage<T> head = table[tableIdx];/*** Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and* {@link PoolChunk#free(long)} may modify the doubly linked list as well.*/synchronized (head) {final PoolSubpage<T> s = head.next;if (s != head) {assert s.doNotDestroy && s.elemSize == normCapacity;long handle = s.allocate();assert handle >= 0;s.chunk.initBufWithSubpage(buf, handle, reqCapacity);if (tiny) {allocationsTiny.increment();} else {allocationsSmall.increment();}return;}}allocateNormal(buf, reqCapacity, normCapacity);return;}//略}

我们直接按照初次分配流程走到allocateNormal,在前面的PoolChunkList分配失败后,创建一个全新的chunk进行内存分配,newChunk整体初始化好读写的bytebuf以及subpages(用户缓存PoolSubpage),我们之前从 long handle = c.allocate(normCapacity);开始讲解subpage级别内存分配和page级别内存分配的区别:

 private synchronized void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||q075.allocate(buf, reqCapacity, normCapacity)) {++allocationsNormal;return;}// Add a new chunk.PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);long handle = c.allocate(normCapacity);++allocationsNormal;assert handle > 0;c.initBuf(buf, handle, reqCapacity);qInit.add(c);}

与page级别分配不同的是,它会走到allocateSubpage这个方法:

long allocate(int normCapacity) {if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSizereturn allocateRun(normCapacity);} else {return allocateSubpage(normCapacity);}}

进入allocateSubpage,其整体步骤为:

  1. 从PoolSubpage缓存池中找到PoolSubpage。
  2. 基于memoryMap二叉树的叶子节点开始(subpage级别maxOrder从11开始计算),找到合适的节点。
  3. 基于步骤2找到的id到subpages中是否存在对应节点,若存在直接完成分配返回,反之进入步骤4。
  4. 创建一个全新的PoolSubpage,并加入subpages中。
  5. 调用allocate完成内存分配。
private long allocateSubpage(int normCapacity) {// Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.// This is need as we may add it back and so alter the linked-list structure.PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);synchronized (head) {int d = maxOrder; // subpages are only be allocated from pages i.e., leavesint id = allocateNode(d);if (id < 0) {return id;}final PoolSubpage<T>[] subpages = this.subpages;final int pageSize = this.pageSize;freeBytes -= pageSize;int subpageIdx = subpageIdx(id);PoolSubpage<T> subpage = subpages[subpageIdx];if (subpage == null) {subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity);subpages[subpageIdx] = subpage;} else {subpage.init(head, normCapacity);}return subpage.allocate();}}

ByteBuf的回收

了解了ByteBuf的分配,再来讲讲ByteBuf的回收,我们还是给出这样一段示例代码,通过release方法完成内存回收:

public static void main(String[] args) {int page = 1024 * 8;PooledByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT;//尝试获取两个page内存大小的byteBufByteBuf byteBuf = allocator.directBuffer(2 * page);byteBuf.release();}

步入release方法,来到了AbstractReferenceCountedByteBufrelease,可以看到它仅仅是一个release0的自调用,传入一个1:

@Overridepublic boolean release() {return release0(1);}

步入查看release0的可知具体逻辑:

  1. 查看refCnt 是否大于decrement,如果有线程释放过直接抛出异常,反之进入下一步。
  2. 基于CASrefCnt减去传入的decrement,即我们上文说到的入参1。
  3. 调用deallocate完成内存回收。
private boolean release0(int decrement) {for (;;) {int refCnt = this.refCnt;if (refCnt < decrement) {throw new IllegalReferenceCountException(refCnt, -decrement);}if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) {if (refCnt == decrement) {deallocate();return true;}return false;}}}

我们继续步进,来到了PooledByteBufdeallocate可以看到其核心步骤:

  1. 判断handle 是否大于0,不大于0说明已被重置或者是无效handle
  2. 重置handle-1
  3. memory置空,即释放创建allocator分配的ByteBuffer的引用,辅助GC
  4. 对应chunk.arena添加到缓存中,便于后续复用。
  5. PooledByteBuf放到缓存中以便复用。
@Overrideprotected final void deallocate() {//判断handle 是否为有效handle if (handle >= 0) {final long handle = this.handle;//置空handlethis.handle = -1;//清空对应的ByteBuffermemory = null;chunk.arena.free(chunk, handle, maxLength, cache);recycle();}}

我们直接来看看PoolArenafree方法的内部逻辑,它会判断当前的chunk是否是池化的,如果不是则直接销毁,反之进行回收,所以我们的代码走到了分支2,整体逻辑为:

  1. 基于我们分配的内存大小得到sizeClass
  2. 基于sizeClass 回收到指定缓存中。
void free(PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache) {if (chunk.unpooled) {int size = chunk.chunkSize();destroyChunk(chunk);activeBytesHuge.add(-size);deallocationsHuge.increment();} else {//基于分配大小得到sizeClassSizeClass sizeClass = sizeClass(normCapacity);//将分配的内存回收到缓存中if (cache != null && cache.add(this, chunk, handle, normCapacity, sizeClass)) {// cached so not free it.return;}freeChunk(chunk, handle, sizeClass);}}

基于normCapacity(16k)可以得到我们的sizeClassnormal这个内存区间,我们看到cacheadd做了如下几件事:

  1. 基于外部入参得到normal级别的cache,并返回这个cache对象。
  2. chunkhandle存到这个cache的队列中,便于后续内存分配直接用这几个对象完成内存分配。
boolean add(PoolArena<?> area, PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass) {//基于外部入参得到`normal`级别的cache,并返回这个cache对象MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);if (cache == null) {return false;}// 将chunk和handle存到这个cache的队列中,便于后续内存分配直接用这几个对象完成内存分配return cache.add(chunk, handle);}

我们先来看看cache方法的逻辑,因为我们的sizeClassNormal,所以直接调用cacheForNormal

 private MemoryRegionCache<?> cache(PoolArena<?> area, int normCapacity, SizeClass sizeClass) {switch (sizeClass) {case Normal:return cacheForNormal(area, normCapacity);case Small:return cacheForSmall(area, normCapacity);case Tiny:return cacheForTiny(area, normCapacity);default:throw new Error();}}

因为我们的area是直接内存的,所以直接步入if逻辑,它会基于我们的normCapacity得到一个索引位置idx,然后调用cache得到normalDirectCaches中一个缓存:

private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int normCapacity) {if (area.isDirect()) {//基于我们的normCapacity得到一个索引位置idxint idx = log2(normCapacity >> numShiftsNormalDirect);//调用cache得到normalDirectCaches中的一个cachereturn cache(normalDirectCaches, idx);}int idx = log2(normCapacity >> numShiftsNormalHeap);return cache(normalHeapCaches, idx);}

得到cache之后就是调用add方法,可以看到其内部逻辑:

  1. 基于chunkhandle创建一个entry
  2. 将其存入当前cache的queue中,便于后续内存分配时,直接复用这两个对象,避免后续分配时频繁的handle计算和chunk各种memoryMap等对象计算和创建:
public final boolean add(PoolChunk<T> chunk, long handle) {Entry<T> entry = newEntry(chunk, handle);boolean queued = queue.offer(entry);if (!queued) {// If it was not possible to cache the chunk, immediately recycle the entryentry.recycle();}return queued;}

自此我们也将Bytebuf内存回收的逻辑讲完了,我们也来简单小结一下其核心步骤:

  1. 基于CAS完成refCnt自减。
  2. 清空PooledByteBuf的handle值。
  3. chunkhandle回收到normalDirectCaches中便于后续内存分配时直接复用。

缓存数据结构

上文提到了复用缓存即MemoryRegionCache的,每当新创建的chunk分配的内存被释放时,线程中的MemoryRegionCache数组就会将其回收以便复用,这也就是为什么我们使用PooledByteBufAllocator创建bytebuf时内存分配都会经过cache.allocateTinycache.allocateSmall等方法尝试先分配,就是为了确保之前回收的内存能够被复用。

查看其源码可以发现其核心成员变量如下,其中:

  1. size为缓存数组queue的容量大小。
  2. queue中的Entry存储handlechunkhandle即内存指针,而chunk则是bytebuffer以及内存相关描述信息的,通过两者可以得到一个分配好内存的bytebuf
  3. sizeClass代表当前MemoryRegionCache的内存规格。
		//queue的容量大小private final int size;// queue中的Entry存储handle和chunk,我们基于handle和chunk可以完成一个bytebuf的内存分配private final Queue<Entry<T>> queue;//记录当前MemoryRegionCache规格例如:Tiny,Small,Normalprivate final SizeClass sizeClass;

我们都知道每个线程都有自己的PoolThreadCache ,它们按照堆和直接内存将MemoryRegionCache细拆可以分为6种:

final class PoolThreadCache {private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;private final MemoryRegionCache<byte[]>[] normalHeapCaches;private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;

对应的我们以tinySubPageHeapCaches查看这些缓存的初始化逻辑,从该方法可知,它想创建一个大小为tinyCacheSize512b,容量为PoolArena.numTinySubpagePools即32,SizeClasstinyMemoryRegionCache数组:

tinySubPageHeapCaches = createSubPageCaches(tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);

步入后即可看到对应的创建逻辑:

  1. 创建MemoryRegionCache数组。
  2. 基于numCaches循环变量完成数组初始化。
private static <T> MemoryRegionCache<T>[] createSubPageCaches(int cacheSize, int numCaches, SizeClass sizeClass) {if (cacheSize > 0) {@SuppressWarnings("unchecked")MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];for (int i = 0; i < cache.length; i++) {// TODO: maybe use cacheSize / cache.lengthcache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);}return cache;} else {return null;}}

自此每个线程中的MemoryRegionCache数组都初始化好了,每当全新创建的chunk使用完时,MemoryRegionCache数组都会按照制定算法从数组中找到合适的MemoryRegionCache完成回收以便复用。

PoolArena基本概念详解

从上文我们都知道内存分配时若MemoryRegionCache分配不成功则会通过PoolArena,当PoolArena分配的内存使用完毕之后MemoryRegionCache会将PoolArenaPoolChunkhandle回收以便下次内存分配可以复用 :

final class PoolThreadCache {final PoolArena<byte[]> heapArena;final PoolArena<ByteBuffer> directArena;}

我们查看PoolArena的源码,首先会看到这也一个枚举,它定义了PoolArena的3中规格区间:

enum SizeClass {//0-512BTiny,//512B-8kSmall,//8K-16M Normal}

我们直接以构造方法为入口了解一下PoolArena,其核心步骤为:

  1. parent记录当前分配的PooledByteBufAllocator ,pageSize为8k,maxOrder 为11即memoryMap层级,pageShifts为13用于后续内存分配时二叉树的计算,chunkSize为chunk的大小即16M,subpageOverflowMask用于判断每次进行内存分配时所需内存是否page还是subpage。
  2. 初始化tinySubpagePools 、smallSubpagePools用于缓存用户每次新分配的内存信息。
  3. 初始化PoolChunkList,PoolChunkList会按照使用率存储不同的PoolChunk,例如qInit记录使用率为未使用~25,q000 记录使用率1 ~ 50依次类推。
protected PoolArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize) {this.parent = parent;this.pageSize = pageSize;this.maxOrder = maxOrder;this.pageShifts = pageShifts;this.chunkSize = chunkSize;subpageOverflowMask = ~(pageSize - 1);tinySubpagePools = newSubpagePoolArray(numTinySubpagePools);for (int i = 0; i < tinySubpagePools.length; i ++) {tinySubpagePools[i] = newSubpagePoolHead(pageSize);}numSmallSubpagePools = pageShifts - 9;smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);for (int i = 0; i < smallSubpagePools.length; i ++) {smallSubpagePools[i] = newSubpagePoolHead(pageSize);}q100 = new PoolChunkList<T>(null, 100, Integer.MAX_VALUE, chunkSize);q075 = new PoolChunkList<T>(q100, 75, 100, chunkSize);q050 = new PoolChunkList<T>(q075, 50, 100, chunkSize);q025 = new PoolChunkList<T>(q050, 25, 75, chunkSize);q000 = new PoolChunkList<T>(q025, 1, 50, chunkSize);qInit = new PoolChunkList<T>(q000, Integer.MIN_VALUE, 25, chunkSize);q100.prevList(q075);q075.prevList(q050);q050.prevList(q025);q025.prevList(q000);q000.prevList(null);qInit.prevList(qInit);List<PoolChunkListMetric> metrics = new ArrayList<PoolChunkListMetric>(6);metrics.add(qInit);metrics.add(q000);metrics.add(q025);metrics.add(q050);metrics.add(q075);metrics.add(q100);chunkListMetrics = Collections.unmodifiableList(metrics);}

查看PoolChunkList构造,就是记录列表前后节点以及使用率,还有当前列表中每个节点可分配的最大容量maxCapacity 。

PoolChunkList(PoolChunkList<T> nextList, int minUsage, int maxUsage, int chunkSize) {assert minUsage <= maxUsage;this.nextList = nextList;this.minUsage = minUsage;this.maxUsage = maxUsage;maxCapacity = calculateMaxCapacity(minUsage, chunkSize);}

按照上述PoolChunkList前驱和后继的组装最终可以构成这样一个链表+列表的数据结构:

在这里插入图片描述

PoolArena是中还有这两个池,用于缓存每一次新创建的PoolSubpage:

 private final PoolSubpage<T>[] tinySubpagePools;private final PoolSubpage<T>[] smallSubpagePools;

查阅PoolSubpage其核心成员变量如下:

final class PoolSubpage<T> implements PoolSubpageMetric {//记录内存分配用到的chunkfinal PoolChunk<T> chunk;//记录内存分配二叉树的索引数private final int memoryMapIdx;//当前chunk在depthMap所在层级的偏移量private final int runOffset;//一个page为8kprivate final int pageSize;//用于记录chunk的使用情况,每当smallSubpagePools某个chunk被使用,bitmap就会在对应索引位置+1private final long[] bitmap;//记录前后节点PoolSubpage<T> prev;PoolSubpage<T> next;//判断当前PoolSubpage是否被销毁boolean doNotDestroy;//当前元素大小int elemSize;//最大可用的元素private int maxNumElems;//bitmapLength长度private int bitmapLength;//下一个可用的bitmapIdx索引位置private int nextAvail;//记录还可以分配的subpage个数private int numAvail;

ByteBuf常见面试题

ByteBuf内存的类别有哪些?

我们可以从3种不同的方式进行分类:

  1. 按照分配的内存是否可以复用可分为Pooled和Unpooled。
  2. 按照是否可以直接操作内存地址可分为Unsafe和非Unsafe。
  3. 按照内存是否是在JVM上分配可分为Direct和Heap。

ByteBuf如何减少多线程内存分配之间的竞争?

PooledByteBufAllocator内部维护了几个Arena数组,所有的内存分配都会基于这些初始化好的Arena数组进行分配。并且Netty通过PoolThreadCache使得线程和Arena意义对应,从而尽可能避免了多线程间分配的竞争。

不同大小的内存是如何进行分配的?

对于Page级别的内存分配,Netty通过完全二叉树,即上文说的memoryMap查找到某段未使用的连续内存,然后通过这个索引位置定位到实际的内存地址完成分配,而Page级别以下也是到memoryMap找对应索引位置的内存,然后按照subPage大小进行划分在进行内存分配,并基于bitMap记录当前这个内存的使用情况。

ByteBuf如何实现复用的

当我们使用PooledByteBufAllocator 常见Bytebuf时,PooledByteBufAllocator 在初始化时会创建一定数量的池化对象(PoolChunk)。每个池化对象都表示一块内存区域,用于存储 ByteBuf 对象。
然后PooledByteBufAllocator 会从可用的池化对象(PoolChunk)按照请求的大小选择最接近或稍微大于请求大小的池化对象。如果没有可用的池化对象,PooledByteBufAllocator 会创建一个新的池化对象。
当一个 ByteBuf 对象不再使用时,它会被返回给 PooledByteBufAllocatorPooledByteBufAllocator 会将该对象内存分配时用到的handle池化对象(PoolChunk)加入缓存中,以便下次内存分配时进行复用。

参考

【Netty源码】ByteBuf源码剖析:https://blog.csdn.net/baiye_xing/article/details/76551937

Netty零拷贝之CompositeByteBuf实际用法:https://blog.csdn.net/youxijishu/article/details/104815309/

吃透Netty源码系列三十五之CompositeByteBuf详解一:https://blog.csdn.net/wangwei19871103/article/details/104486129

netty源码阅读之ByteBuf之ByteBuf结构和重要API:https://blog.csdn.net/fst438060684/article/details/81915064

netty源码阅读之ByteBuf之内存page级别内存的分配:https://blog.csdn.net/fst438060684/article/details/82532093

Netty之ByteBuf深入分析:https://www.jianshu.com/p/2498db9c91fe

这篇关于Netty源码ByteBuf详解的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/419259

相关文章

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

OpenHarmony鸿蒙开发( Beta5.0)无感配网详解

1、简介 无感配网是指在设备联网过程中无需输入热点相关账号信息,即可快速实现设备配网,是一种兼顾高效性、可靠性和安全性的配网方式。 2、配网原理 2.1 通信原理 手机和智能设备之间的信息传递,利用特有的NAN协议实现。利用手机和智能设备之间的WiFi 感知订阅、发布能力,实现了数字管家应用和设备之间的发现。在完成设备间的认证和响应后,即可发送相关配网数据。同时还支持与常规Sof

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

6.1.数据结构-c/c++堆详解下篇(堆排序,TopK问题)

上篇:6.1.数据结构-c/c++模拟实现堆上篇(向下,上调整算法,建堆,增删数据)-CSDN博客 本章重点 1.使用堆来完成堆排序 2.使用堆解决TopK问题 目录 一.堆排序 1.1 思路 1.2 代码 1.3 简单测试 二.TopK问题 2.1 思路(求最小): 2.2 C语言代码(手写堆) 2.3 C++代码(使用优先级队列 priority_queue)

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

如何在Visual Studio中调试.NET源码

今天偶然在看别人代码时,发现在他的代码里使用了Any判断List<T>是否为空。 我一般的做法是先判断是否为null,再判断Count。 看了一下Count的源码如下: 1 [__DynamicallyInvokable]2 public int Count3 {4 [__DynamicallyInvokable]5 get

工厂ERP管理系统实现源码(JAVA)

工厂进销存管理系统是一个集采购管理、仓库管理、生产管理和销售管理于一体的综合解决方案。该系统旨在帮助企业优化流程、提高效率、降低成本,并实时掌握各环节的运营状况。 在采购管理方面,系统能够处理采购订单、供应商管理和采购入库等流程,确保采购过程的透明和高效。仓库管理方面,实现库存的精准管理,包括入库、出库、盘点等操作,确保库存数据的准确性和实时性。 生产管理模块则涵盖了生产计划制定、物料需求计划、

K8S(Kubernetes)开源的容器编排平台安装步骤详解

K8S(Kubernetes)是一个开源的容器编排平台,用于自动化部署、扩展和管理容器化应用程序。以下是K8S容器编排平台的安装步骤、使用方式及特点的概述: 安装步骤: 安装Docker:K8S需要基于Docker来运行容器化应用程序。首先要在所有节点上安装Docker引擎。 安装Kubernetes Master:在集群中选择一台主机作为Master节点,安装K8S的控制平面组件,如AP

嵌入式Openharmony系统构建与启动详解

大家好,今天主要给大家分享一下,如何构建Openharmony子系统以及系统的启动过程分解。 第一:OpenHarmony系统构建      首先熟悉一下,构建系统是一种自动化处理工具的集合,通过将源代码文件进行一系列处理,最终生成和用户可以使用的目标文件。这里的目标文件包括静态链接库文件、动态链接库文件、可执行文件、脚本文件、配置文件等。      我们在编写hellowor

LabVIEW FIFO详解

在LabVIEW的FPGA开发中,FIFO(先入先出队列)是常用的数据传输机制。通过配置FIFO的属性,工程师可以在FPGA和主机之间,或不同FPGA VIs之间进行高效的数据传输。根据具体需求,FIFO有多种类型与实现方式,包括目标范围内FIFO(Target-Scoped)、DMA FIFO以及点对点流(Peer-to-Peer)。 FIFO类型 **目标范围FIFO(Target-Sc