本文主要是介绍Netty源码ByteBuf详解,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
ByteBuf简介
作为一个高性能网络编程框架,Netty
的字节容器ByteBuf
相较于JDK
标准库自带的ByteBuffer
,有着更加灵活、更高效的方式处理字节数据,它有着如下几个特性:
- 内存分配:
ByteBuf
按照分类可以分为堆内内存和直接内存,前者分配在JVM堆内存中,由JVM进行管理和GC,后者直接分配在本地内存上,需要我们进行手动释放,但是性能要好于前者。 - 读写指针:
ByteBuf
为读写操作分别提供读写指针,通过读写指针我们可以快速定位、设置指定偏移量上的数据。 - 容量调整:
ByteBuf
会根据使用情况进行动态扩容和缩容,使得它更加灵活,可以适应不同的数据大小。 - 高级操作:
ByteBuf
提供切片、合并、复制等,使得操作更加方便和高效。
ByteBuf常见操作示例
ByteBuf
包含许多灵活的操作,这里读者给出一套详尽的使用演示,读者可以根据注释自行了解:
public static void main(String[] args) {// initial:9 max: 100ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(9, 100);print("allocate ByteBuf(9, 100)", buffer);// 写入字节数组 写指针移动 未到capacity 仍然可写buffer.writeBytes(new byte[]{1, 2, 3, 4});print("writeBytes(1, 2, 3, 4)", buffer);// 写入整型 占用四个字节buffer.writeInt(12);print("writeInt(12)", buffer);// 写入到达初始化的字节大小 9 isWritable未falsebuffer.writeBytes(new byte[]{5});print("writeBytes(5)", buffer);// 扩容,扩容之后可以继续写buffer.writeBytes(new byte[]{6});print("writeBytes(6)", buffer);// get 方法不改变读写指针System.out.println("getByte(3) return: " + buffer.getByte(3));System.out.println("getShort(3) return: " + buffer.getShort(3));System.out.println("getInt(3) return: " + buffer.getInt(3));print("getByte()", buffer);// set 方法不改变读写指针buffer.setByte(buffer.readableBytes() + 1, 0);print("setByte()", buffer);// read 方法改变读指针byte[] bytes = new byte[buffer.readableBytes()];buffer.readBytes(bytes);
// buffer.readByte();
// buffer.readInt();// ... 其他各种不同的读方法print("readBytes(" + bytes.length + ")", buffer);// 写入到最大容量for (byte i = 10; i < 100; i++) {buffer.writeByte(i);}print("write to max", buffer);// 超出maxbuffer.writeByte(1);}private static void print(String action, ByteBuf byteBuf) {//打印本次操作行为System.out.println("after ===========" + action + "============");//容量System.out.println("capacity(): " + byteBuf.capacity());//最大容量System.out.println("maxCapacity(): " + byteBuf.maxCapacity());//读指针位置System.out.println("readerIndex(): " + byteBuf.readerIndex());//可读字节数System.out.println("readableBytes(): " + byteBuf.readableBytes());// 读写指针是否重复,即判断是否可读System.out.println("isReadable(): " + byteBuf.isReadable());//写指针位置System.out.println("writerIndex(): " + byteBuf.writerIndex());//可写字节System.out.println("writableBytes(): " + byteBuf.writableBytes());// capacity 和 写指针重合不可写 但是扩容之后还可写System.out.println("isWritable(): " + byteBuf.isWritable());//最大可写字节数System.out.println("maxWritableBytes(): " + byteBuf.maxWritableBytes());System.out.println();}
输出结果:
after ===========allocate ByteBuf(9, 100)============
capacity(): 9
maxCapacity(): 100
readerIndex(): 0
readableBytes(): 0
isReadable(): false
writerIndex(): 0
writableBytes(): 9
isWritable(): true
maxWritableBytes(): 100after ===========writeBytes(1, 2, 3, 4)============
capacity(): 9
maxCapacity(): 100
readerIndex(): 0
readableBytes(): 4
isReadable(): true
writerIndex(): 4
writableBytes(): 5
isWritable(): true
maxWritableBytes(): 96after ===========writeInt(12)============
capacity(): 9
maxCapacity(): 100
readerIndex(): 0
readableBytes(): 8
isReadable(): true
writerIndex(): 8
writableBytes(): 1
isWritable(): true
maxWritableBytes(): 92after ===========writeBytes(5)============
capacity(): 9
maxCapacity(): 100
readerIndex(): 0
readableBytes(): 9
isReadable(): true
writerIndex(): 9
writableBytes(): 0
isWritable(): false
maxWritableBytes(): 91after ===========writeBytes(6)============
capacity(): 64
maxCapacity(): 100
readerIndex(): 0
readableBytes(): 10
isReadable(): true
writerIndex(): 10
writableBytes(): 54
isWritable(): true
maxWritableBytes(): 90getByte(3) return: 4
getShort(3) return: 1024
getInt(3) return: 67108864
after ===========getByte()============
capacity(): 64
maxCapacity(): 100
readerIndex(): 0
readableBytes(): 10
isReadable(): true
writerIndex(): 10
writableBytes(): 54
isWritable(): true
maxWritableBytes(): 90after ===========setByte()============
capacity(): 64
maxCapacity(): 100
readerIndex(): 0
readableBytes(): 10
isReadable(): true
writerIndex(): 10
writableBytes(): 54
isWritable(): true
maxWritableBytes(): 90after ===========readBytes(10)============
capacity(): 64
maxCapacity(): 100
readerIndex(): 10
readableBytes(): 0
isReadable(): false
writerIndex(): 10
writableBytes(): 54
isWritable(): true
maxWritableBytes(): 90after ===========write to max============
capacity(): 100
maxCapacity(): 100
readerIndex(): 10
readableBytes(): 90
isReadable(): true
writerIndex(): 100
writableBytes(): 0
isWritable(): false
maxWritableBytes(): 0Exception in thread "main" java.lang.IndexOutOfBoundsException: writerIndex(100) + minWritableBytes(1) exceeds maxCapacity(100): PooledUnsafeDirectByteBuf(ridx: 10, widx: 100, cap: 100/100)at io.netty.buffer.AbstractByteBuf.ensureWritable0(AbstractByteBuf.java:275)at io.netty.buffer.AbstractByteBuf.writeByte(AbstractByteBuf.java:938)at netty.book.bytebuf.ByteBufTest.main(ByteBufTest.java:60)
ByteBuf源码解析
UML图了解ByteBuf体系结构
我们可以从类图了解ByteBuf
的体系结构,由下图的关键字可知,ByteBuf
按照分类可以分为三种分类:
- 按照获取字节容是否是从容器中获取,可将其分为
Pooled
和Unpooled
。 - 按照对于
ByteBuf
的基本操作的安全性,我们可以分为Unsafe
和非Unsafe
。 - 按照
ByteBuf
在内存所处位置可分为Direct
和Heap
。
Pooled和Unpooled
由名字可知,前者用到了池化思想,即从预先分配好的一块内存中获取一块连续的内存作为ByteBuf
,而后者是非池化的,每次需要调用底层的API
向操作系统申请一块内存。
Unsafe和非Unsafe
Unsafe
和非Unsafe
的区别即对于ByteBuf
的操作是安全还是非安全的,这一点我们可以通过源码来讲解。
先来看看UnpooledUnsafeHeapByteBuf
这个类,由名字可知这个类是Unsafe
的,我们可以从getByte
方法为入口了解一下Unsafe
的操作。
首先getByte
会检查索引位置,然后调用_getByte
获取指定索引位置的数据,我们步入一探究竟。
@Overridepublic byte getByte(int index) {//检查索引checkIndex(index);//通过_getByte获取bytereturn _getByte(index);}
可以看到_getByte
会通过一个工具类UnsafeByteBufUtil
获取字节数据,我们不妨步进一下查看其实现细节:
@Overrideprotected byte _getByte(int index) {return UnsafeByteBufUtil.getByte(array, index);}
经过重重步进,我们终于知道了这个操作为什么是UNSAFE
的原因,原来这个类获取字节数据的操作并非直接通过索引读取数据,而至通过实际内存地址偏移量配合索引完成字节数据的读取。
static byte getByte(byte[] data, int index) {//通过访问JVM直接内存偏移量BYTE_ARRAY_BASE_OFFSET 加上索引获取当前字节实际位置return UNSAFE.getByte(data, BYTE_ARRAY_BASE_OFFSET + index);}
对比一下,我们再看看非Unsafe
的操作,我们仍然以getByte
作为入口。可以看到_getByte
仍然是通过一个工具类获取字节数据。
@Overrideprotected byte _getByte(int index) {return HeapByteBufUtil.getByte(array, index);}
查阅最终的代码可以了解到,当前字节读取的逻辑是直接通过JVM
基本类型byte
数组的索引位置获取的,所以操作是安全的。
static byte getByte(byte[] memory, int index) {return memory[index];}
Direct和Heap
Direct
的字节容器直接分配在操作系统的本地内存中,不受JVM
的GC
控制,需要用户手动释放。而后者则是分配在JVM
的堆内存上,受JVM
的GC
进行管理。我们可以通过UnpooledDirectByteBuf
和UnpooledHeapByteBuf
的源码进一步了解两者的区别。
先来看看UnpooledHeapByteBuf
,它的取值是通过HeapByteBufUtil
获取HeapByteBuf
,我们不妨步入查看实现细节:
@Overrideprotected byte _getByte(int index) {return HeapByteBufUtil.getByte(array, index);}
查看getByte
方法,可以看到其操作的内存就是分配JVM
堆内存上的byte
数组。
static byte getByte(byte[] memory, int index) {return memory[index];}
再查看UnpooledDirectByteBuf
的_getByte
,可以看到它是通过ByteBuffer
的get
方法进行读取操作的。
@Overrideprotected byte _getByte(int index) {return buffer.get(index);}
查看其内部逻辑,可以看到它是通过unsafe
方法操作由ix
方法得到的内存地址从而获取字节数据。
public byte get(int i) {return ((unsafe.getByte(ix(checkIndex(i)))));}
AbstractByteBufAllocator详解
简介
AbstractByteBufAllocator
是ByteBuf
字节容器分配一个抽象类,它继承自ByteBufAllocator
,已实现了字节分配容器的大部分行为,可以说是ByteBufAllocator
的基本骨架。
buffer方法
查看buffer
方法,它是ByteBufAllocator
定义的一个方法,它的作用就是分配一个ByteBuf
,具体分配直接内存的ByteBuf
还是JVM
堆内存的ByteBuf
是由directByDefault
这个变量决定。
@Overridepublic ByteBuf buffer() {//如果directByDefault返回true则返回本地堆内存的ByteBufif (directByDefault) {return directBuffer();}//返回JVM堆内存的ByteBufreturn heapBuffer();}
关于directByDefault
的计算,查阅源码的定义看到,如果用户使用构造方法将preferDirect
设置为true
时,且类路径上可以找到Unsafe
这个类,那么这个值就为true
,用户即可在分配非堆内存。
protected AbstractByteBufAllocator(boolean preferDirect) {//如果preferDirect设置为true且当前这个类路径上存在unsafe类,则directByDefault 为true,用户即可在分配非堆内存directByDefault = preferDirect && PlatformDependent.hasUnsafe();emptyBuf = new EmptyByteBuf(this);}
directBuffer
我们再来看看本次内存获取ByteBuf
的方法directBuffer
。directBuffer
传入initialCapacity
和Integer.MAX_VALUE
,我们不妨步入看看这几个参数和含义和方法内部的逻辑:
@Overridepublic ByteBuf directBuffer(int initialCapacity) {return directBuffer(initialCapacity, Integer.MAX_VALUE);}
可以看到,该方法会基于外部传入的初始化容量和最大容量调用newDirectBuffer
创建本地内存的ByteBuf
:
@Overridepublic ByteBuf directBuffer(int initialCapacity, int maxCapacity) {if (initialCapacity == 0 && maxCapacity == 0) {return emptyBuf;}//校验参数有效性validate(initialCapacity, maxCapacity);//基于初始化容量和最大容量创建直接内存容器return newDirectBuffer(initialCapacity, maxCapacity);}
因为内存分配可能涉及池化分配和非池化分配,所以当前的抽象方法AbstractByteBufAllocator
将newDirectBuffer
设置为抽象方法交由子类实现,具体实现细节笔者会在后文展开分析:
protected abstract ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity);
heapBuffer
我们再来看看JVM
堆中分配ByteBuf
的逻辑,可以看到它也是直接调用heapBuffer
完成创建:
@Overridepublic ByteBuf heapBuffer(int initialCapacity) {return heapBuffer(initialCapacity, Integer.MAX_VALUE);}
查看heapBuffer
内部实现,可以看到它完成会容量的校验之后,直接调用newHeapBuffer
创建JVM
堆上的ByteBuf
。
@Overridepublic ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {if (initialCapacity == 0 && maxCapacity == 0) {return emptyBuf;}//参数校验validate(initialCapacity, maxCapacity);//基于初始化容量和最大容量完成堆上字节容器的创建return newHeapBuffer(initialCapacity, maxCapacity);}
同样的newHeapBuffer
也是交由具体的子类实现:
protected abstract ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity);
UnpooledByteBufAllocator详解
简介
UnpooledByteBufAllocator
对应的字节容器都是未池化的,这意味我们每次通过它获取的byteBuf
都是全新创建的。最常见的操作即newHeapBuffer
和newDirectBuffer
,这些方法都是继承自抽象类AbstractByteBufAllocator
,我们不妨步入内部查看一下它的实现逻辑。
newHeapBuffer
newHeapBuffer
即在JVM
堆内存上创建ByteBuf
,它的执行步骤如下:
- 基于抽象类
AbstractByteBufAllocator
的调用initialCapacity
为256
,maxCapacity
为整型最大值。 - 它在执行前会判断当前的类路径上是否存在
unSafe
类,如果存在则创建UnpooledUnsafeHeapByteBuf
,反之创建UnpooledHeapByteBuf
。
@Overrideprotected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {return PlatformDependent.hasUnsafe() ? new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity): new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);}
我们先来看看UnpooledUnsafeHeapByteBuf
的实现,由关键字Unpooled
、Unsafe
、Heap
,可知其非池化、操作不安全、JVM
堆内存创建,我们不妨步入构造方法查看其实现细节:
UnpooledUnsafeHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {super(alloc, initialCapacity, maxCapacity);}
可以看到这个构造方法也仅仅是一个自调用,我们不妨步入查看实现:
protected UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {this(alloc, new byte[initialCapacity], 0, 0, maxCapacity);}
再次步入步入这个构造方法,可以看到其内部实现逻辑分为5大步骤:
- 设置最
UnpooledHeapByteBuf
大容量。 - 参数校验。
- 设置
ByteBufAllocator
。 - 初始化
bytebuf
底层实质存储数据的byte
数组 - 初始化读写索引。
private UnpooledHeapByteBuf(ByteBufAllocator alloc, byte[] initialArray, int readerIndex, int writerIndex, int maxCapacity) {//设置最大容量super(maxCapacity);//参数校验if (alloc == null) {throw new NullPointerException("alloc");}if (initialArray == null) {throw new NullPointerException("initialArray");}if (initialArray.length > maxCapacity) {throw new IllegalArgumentException(String.format("initialCapacity(%d) > maxCapacity(%d)", initialArray.length, maxCapacity));}this.alloc = alloc;//初始化bytebuf底层实质存储数据的byte数组setArray(initialArray);//初始化读写索引setIndex(readerIndex, writerIndex);}
这里我们查看setArray
,逻辑很简单,array
设置为JVM
堆内存的byte
数组。
private void setArray(byte[] initialArray) {array = initialArray;tmpNioBuf = null;}
再来看看setIndex
方法,它在完成参数校验确保读索引位于写索引后面,然后再调用setIndex0
,将上文传入的0设置为读写索引的位置:
@Overridepublic ByteBuf setIndex(int readerIndex, int writerIndex) {//参数校验if (readerIndex < 0 || readerIndex > writerIndex || writerIndex > capacity()) {throw new IndexOutOfBoundsException(String.format("readerIndex: %d, writerIndex: %d (expected: 0 <= readerIndex <= writerIndex <= capacity(%d))",readerIndex, writerIndex, capacity()));}//设置读写索引为0setIndex0(readerIndex, writerIndex);return this;}
步入setIndex0
即可看到它会调整当前byteBuf
的读写索引为0。
final void setIndex0(int readerIndex, int writerIndex) {this.readerIndex = readerIndex;this.writerIndex = writerIndex;}
了解了UnpooledUnsafeHeapByteBuf
,我们再来看看UnpooledHeapByteBuf
,从构造方法的调用来看,和前者创建逻辑基本是一致的,既然构造方法都是通过调用UnpooledHeapByteBuf
的构造方法实现,那么它们有什么区别呢?
protected UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {this(alloc, new byte[initialCapacity], 0, 0, maxCapacity);}
我们都知道unSafe
和非unsafe
的区别则是两者对于内存的操作,前者是直接基于地址去操作byteBuf
而后者是基于索引去操作,所以我们不妨看看两者的getByte
方法。
先来看看UnpooledUnsafeHeapByteBuf
,从源码中我们就可以知晓他读取byte
数据时是基于UnsafeByteBufUtil
的。
//UnpooledUnsafeHeapByteBuf的getByte
@Overridepublic byte getByte(int index) {checkIndex(index);//直接调用下方_getBytereturn _getByte(index);}@Overrideprotected byte _getByte(int index) {return UnsafeByteBufUtil.getByte(array, index);}
不断步进UnsafeByteBufUtil.getByte
实际实现可知,它的读取操作通过UNSAFE
基于内存地址偏移量加上byte
长度加上索引位置得到实际字节,存取操作相对高效。
static byte getByte(byte[] data, int index) {return UNSAFE.getByte(data, BYTE_ARRAY_BASE_OFFSET + index);}
再来看看UnpooledHeapByteBuf
的getByte
,同样的它也是通过一个HeapByteBufUtil
进行字节读取。
@Overridepublic byte getByte(int index) {ensureAccessible();return _getByte(index);}@Overrideprotected byte _getByte(int index) {return HeapByteBufUtil.getByte(array, index);}
步入查看其操作是直接基于索引操作数据,并没有直接通过内存地址进行操作,相较于前者是安全单不高效。
static byte getByte(byte[] memory, int index) {return memory[index];}
newDirectBuffer
newDirectBuffer
的创建的步骤和前者差不多:
- 判断类路径上是否有
Unsafe
,如果有创建newUnsafeDirectByteBuf
,反之创建UnpooledDirectByteBuf
。 - 判断是否禁用内存泄漏探测,如有禁止则直接返回
ByteBuf
,反之调用toLeakAwareBuffer
将其包装为内存泄漏可感知的ByteBuf
。
@Overrideprotected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {ByteBuf buf = PlatformDependent.hasUnsafe() ?UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);return disableLeakDetector ? buf : toLeakAwareBuffer(buf);}
步入UnpooledUnsafeDirectByteBuf
的构造方法可以看到它的步骤和创建JVM
堆区内存的差不多,唯一区别分配内存时会通过allocateDirect
得到ByteBuffer
,再通过setByteBuffer
完成UnpooledUnsafeDirectByteBuf
的创建:
protected UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {//设置最大容量super(maxCapacity);//参数校验if (alloc == null) {throw new NullPointerException("alloc");}if (initialCapacity < 0) {throw new IllegalArgumentException("initialCapacity: " + initialCapacity);}if (maxCapacity < 0) {throw new IllegalArgumentException("maxCapacity: " + maxCapacity);}if (initialCapacity > maxCapacity) {throw new IllegalArgumentException(String.format("initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));}this.alloc = alloc;//设置ByteBuffersetByteBuffer(allocateDirect(initialCapacity), false);}
我们先来看看allocateDirect
做了什么,查看源码我们发现它会调用ByteBuffer.allocateDirect
创建一个全新的直接内存。
protected ByteBuffer allocateDirect(int initialCapacity) {return ByteBuffer.allocateDirect(initialCapacity);}
查看其内部实现,我们会得到这样一个构造方法,它的整体步骤为:
- 基于我们传入的容量即
cap
,创建一个MappedByteBuffer
直接内存对象。 - 为创建好的内存对象分配内存大小。
- 设置内存地址起始位置。
- 清空本段内存的数据。
DirectByteBuffer(int cap) { // package-private//基于我们传入的容量即`cap`,创建一个`MappedByteBuffer`直接内存对象super(-1, 0, cap, cap);boolean pa = VM.isDirectMemoryPageAligned();int ps = Bits.pageSize();long size = Math.max(1L, (long)cap + (pa ? ps : 0));Bits.reserveMemory(size, cap);//分配内存long base = 0;try {base = unsafe.allocateMemory(size);} catch (OutOfMemoryError x) {Bits.unreserveMemory(size, cap);throw x;}unsafe.setMemory(base, size, (byte) 0);//设置内存地址if (pa && (base % ps != 0)) {// Round up to page boundaryaddress = base + ps - (base & (ps - 1));} else {address = base;}//清空本段内存的数据cleaner = Cleaner.create(this, new Deallocator(base, size, cap));att = null;}
allocateDirect
完成后则调用setByteBuffer
进行参数设置,setByteBuffer
逻辑很简单,基于上一步创建好的buffer
设置memoryAddress
即堆外内存地址,并将当前堆内存的大小设置为capacity
。
final void setByteBuffer(ByteBuffer buffer, boolean tryFree) {if (tryFree) {ByteBuffer oldBuffer = this.buffer;if (oldBuffer != null) {if (doNotFree) {doNotFree = false;} else {freeDirect(oldBuffer);}}}this.buffer = buffer;memoryAddress = PlatformDependent.directBufferAddress(buffer);tmpNioBuf = null;capacity = buffer.remaining();}
同理查看UnpooledDirectByteBuf
构造方法如下,不多赘述,逻辑和前者差不多。
protected UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {super(maxCapacity);if (alloc == null) {throw new NullPointerException("alloc");}if (initialCapacity < 0) {throw new IllegalArgumentException("initialCapacity: " + initialCapacity);}if (maxCapacity < 0) {throw new IllegalArgumentException("maxCapacity: " + maxCapacity);}if (initialCapacity > maxCapacity) {throw new IllegalArgumentException(String.format("initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));}this.alloc = alloc;setByteBuffer(ByteBuffer.allocateDirect(initialCapacity));}
我们还是以getByte
看看两者操作区别:
UnpooledUnsafeDirectByteBuf
的_getByte
如下,可以看到在获取字节时调用了一个名为addr
的方法:
protected byte _getByte(int index) {return UnsafeByteBufUtil.getByte(this.addr(index));}
查看addr
方法可知,其数据数据是基于内存地址获取的:
long addr(int index) {return this.memoryAddress + (long)index;}
我们不断步入getByte
内部即可可以看到基于这个内存地址获取对应的字节数据:
static byte getByte(long address) {return UNSAFE.getByte(address);}
反之UnpooledDirectByteBuf
则是直接通过索引获取,具体地址的计算由底层实现。
@Overrideprotected byte _getByte(int index) {return buffer.get(index);}
小结
自此我们将非池化的UnpooledByteBufAllocator
的JVM
堆内存byteBuf
分配和本地内存的byteBuf
内存分配都讲解完成。整体来说只要把握好Heap
是在JVM
堆区创建字节容器、Direct
是在本次内存创建字节容器、Unsafe
操作都是基于内存偏移量相较于非Unsafe
的操作更加高效且不安全,通过了解这几个关键字对应实现细节即可快速把握源码实现重点。
PooledByteBufAllocator详解
newHeapBuffer
相比于非池化的UnpooledByteBufAllocator
,PooledByteBufAllocator
的内存分配相比前者会复杂许多,但heap
和direct
整体分配流程确实大差不差,所以我们不妨以newHeapBuffer
为例查看其整体分配流程:
可以看到PooledByteBufAllocator
的newHeapBuffer
执行步骤为:
- 获取当前线程的缓存
PoolThreadCache
。 - 基于这个缓存拿到
PoolArena
。 - 如果
PoolArena
不为空则调用allocate
获取ByteBuf
,若为空吗则进入步骤4。 - 创建一个
UnpooledHeapByteBuf
。 - 将其设置为内存泄漏可感知的
ByteBuf
。
@Overrideprotected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {//获取当前线程的缓存PoolThreadCachePoolThreadCache cache = threadCache.get();//基于这个缓存拿到PoolArenaPoolArena<byte[]> heapArena = cache.heapArena;ByteBuf buf;//如果PoolArena不为空则调用allocate完成byteBuf创建和内存分配if (heapArena != null) {buf = heapArena.allocate(cache, initialCapacity, maxCapacity);} else {//若为空创建一个UnpooledHeapByteBufbuf = new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);}//将其设置为内存泄漏可感知的ByteBuf`return toLeakAwareBuffer(buf);}
了解整体概念后,我们不妨对这其中的每一个概念进行拆解分析,首先是PoolThreadCache
,们查看其源码,可以看到其继承了FastThreadLocal
这个类,从名字可以猜出FastThreadLocal
是一个特殊的ThreadLocal
。
我们上文提到一个get
的操作,实际上若get
方法初次尝试从threadCache取值时是为空的,此时它就会调用PoolThreadLocalCache
的initialValue
完成初始化,就如下代码所示,会基于当前线程完成一个PoolThreadCache
初始化工作。
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {//略//PoolThreadLocalCache 初始化方法@Overrideprotected synchronized PoolThreadCache initialValue() {//初始化heapArena final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);//初始化directArena final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);//完成PoolThreadCache创建,其内部会完成cache、arena等成员变量初始化和创建工作return new PoolThreadCache(heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);}}
再来看看allocate
方法的整体步骤,的我们步入其内部可知其bytebuf创建的逻辑:
- 调用
newByteBuf
完成PooledByteBuf
创建。 - 调用
allocate
完成内存分配。
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {PooledByteBuf<T> buf = newByteBuf(maxCapacity);allocate(cache, buf, reqCapacity);return buf;}
查看newByteBuf
的创建逻辑可知,PooledByteBuf
的创建是基于PooledUnsafeHeapByteBuf
或PooledHeapByteBuf
实现的:
@Overrideprotected PooledByteBuf<byte[]> newByteBuf(int maxCapacity) {return HAS_UNSAFE ? PooledUnsafeHeapByteBuf.newUnsafeInstance(maxCapacity): PooledHeapByteBuf.newInstance(maxCapacity);}
我们以PooledUnsafeHeapByteBuf
的newUnsafeInstance
为例查看PooledUnsafeHeapByteBuf
的插件逻辑,从下文代码可知,PooledUnsafeHeapByteBuf
的创建是从RECYCLER
获取的,得到buf
后会调用reuse
完成读写指针的重置。
static PooledUnsafeHeapByteBuf newUnsafeInstance(int maxCapacity) {//从RECYCLER中获取循环利用或者全新创建的PooledUnsafeHeapByteBuf PooledUnsafeHeapByteBuf buf = RECYCLER.get();//重置读写指针,确保后续读写工作可以正确执行buf.reuse(maxCapacity);return buf;}
我们不妨查看一下Recycler
的实现,通过定义可知,当get
为空时,Recycler
会直接通过newObject
创建一个PooledUnsafeHeapByteBuf
,并以handle
作为内存地址的指针(关于handle
的概念笔者会在后文详尽介绍,这里读者只需了解简单的概念即可)。
private static final Recycler<PooledUnsafeHeapByteBuf> RECYCLER = new Recycler<PooledUnsafeHeapByteBuf>() {@Overrideprotected PooledUnsafeHeapByteBuf newObject(Handle<PooledUnsafeHeapByteBuf> handle) {//以传入的handle作为内存地址的指针return new PooledUnsafeHeapByteBuf(handle, 0);}};
这里我们不妨小结一下池化的步骤:
- 无论是
heap
的创建还是direct
的创建,都会先从threadCache
中获取arena
。 - 若arena为空则新建一个
UnpooledHeapByteBuf
返回出去,若不为空则进入步骤3。 - 基于
RECYCLER
获取之前回收的的ByteBuf
,若RECYCLER
中不存在回收的ByteBuf
则创建一个全新的ByteBuf
。 - 将上一步的
ByteBuf
读写指针重置,确保ByteBuf
可用。 - 调用
allocate
完成内存分配。
内存规格介绍
后续我们会对内存分配这一步进行详尽讲解,在此之前,我们必须了解一下Netty
中内存规格,设计者为了内存分配时尽可能节省且充分利用空间所设置的不同规格:
huge
大小为16M
以上,不做缓存。chunk
作为基本单位,其值设置为16M
。page
为chunk
的下一级内存规格,其大小为8k。subPage
大小为0-512B
之间。- 设计者定义3种内存范围区间,即
tiny(0-512b)
、small(512B-8k)
、normal(8k-16M)
。
page 级别内存分配
大抵了解了内存分配的大体步骤和内存规格之后,这里我不妨以我们以这段代码为入口了解一下page
级别的内存分配,如下所示,可以看到笔者用page
记录了一个page
的大小8*1024
,然后基于PooledByteBufAllocator
尝试得到一个内存为2个page
的ByteBuf
。
public class Main {public static void main(String[] args) {int page = 1024*8;PooledByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT;//尝试获取两个page内存大小的byteBuf ByteBuf byteBuf = allocator.directBuffer(2 * page);}
}
首先代码的调用会来到AbstractByteBufAllocator
的directBuffer
,它以我们传入的所需要大小作为initialCapacity
自调用directBuffer
:
@Overridepublic ByteBuf directBuffer(int initialCapacity) {return directBuffer(initialCapacity, Integer.MAX_VALUE);}
步进之后可以看到对参数进行必要的校验之后,直接调用newDirectBuffer
:
@Overridepublic ByteBuf directBuffer(int initialCapacity, int maxCapacity) {//若需要为0则直接返回空ByteBuf if (initialCapacity == 0 && maxCapacity == 0) {return emptyBuf;}//校验initialCapacity和maxCapacity有效性validate(initialCapacity, maxCapacity);//调用newDirectBuffer完成内存分配return newDirectBuffer(initialCapacity, maxCapacity);}
于是代码又走了PooledByteBufAllocator
的newDirectBuffer
,其核心步骤即从当前线程缓存中得到directArena
进行bytebuf
创建和内存分配,话不多说,我们直接步入:
@Overrideprotected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {//从当前线程的缓存中拿到directArena PoolThreadCache cache = threadCache.get();PoolArena<ByteBuffer> directArena = cache.directArena;ByteBuf buf;//基于directArena 完成bytebuf创建和内存分配if (directArena != null) {buf = directArena.allocate(cache, initialCapacity, maxCapacity);} else {if (PlatformDependent.hasUnsafe()) {buf = UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);} else {buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);}}return toLeakAwareBuffer(buf);}
我们直接查看PoolArena
的allocate
方法,在通过newByteBuf
得到一个PooledByteBuf
之后,直接调用allocate
进入内存分配:
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {PooledByteBuf<T> buf = newByteBuf(maxCapacity);//完成bytebuf的内存分配allocate(cache, buf, reqCapacity);return buf;}
来到PoolArena
的allocate
,因为我们要求的内存大小为16k
,属于page
,于是先调用cache.allocateNormal
尝试基于缓存完成内存分配,因为我们是第一次调用所以缓存没有数据,于是直接调用allocateNormal
方法尝试内存分配。
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {final int normCapacity = normalizeCapacity(reqCapacity);if (isTinyOrSmall(normCapacity)) { // capacity < pageSize//略} else {//略}final PoolSubpage<T> head = table[tableIdx];//略if (normCapacity <= chunkSize) {//尝试缓存分配if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {return;}//缓存分配失败,调用allocateNormal完成buf的内存分配allocateNormal(buf, reqCapacity, normCapacity);} else {// Huge allocations are never served via the cache so just call allocateHugeallocateHuge(buf, reqCapacity);}}
我们直接步入allocateNormal
,可以看到它会先尝试中q050
、q025
等这些ChunkList
本地内存中进行分配,若分配失败,则后续会创建一个全新的chunk
并完成buf
内存分配:
private synchronized void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {//尝试从本次chunk列表中分配内存,若成功则直接返回if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||q075.allocate(buf, reqCapacity, normCapacity)) {++allocationsNormal;return;}//若上述本地内存分配失败则创建一个全新的chunk再基于这个chunk完成chunk内存分配PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);long handle = c.allocate(normCapacity);++allocationsNormal;assert handle > 0;//基于上一步的chunk完成buf内存分配c.initBuf(buf, handle, reqCapacity);qInit.add(c);}
所以我们来到了newChunk
方法,可以看到其内部实现就是一个PoolChunk
的创建:
@Overrideprotected PoolChunk<ByteBuffer> newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) {return new PoolChunk<ByteBuffer>(this, allocateDirect(chunkSize),pageSize, maxOrder, pageShifts, chunkSize);}
步入PoolChunk
的构造方法,可以看到以下成员变量的创建和初始化,对应变量的含义笔者都已注释,读者可以自行参阅:
PoolChunk(PoolArena<T> arena, T memory, int pageSize, int maxOrder, int pageShifts, int chunkSize) {//unpooled 设置为false,意为当前为池化的chunkunpooled = false;//将调用的arena作为变量存到arena 中this.arena = arena;//记录外部创建可读写16k的DirectByteBuffer,后续的读写都是存入这个memory 中this.memory = memory;//pageSize为8kthis.pageSize = pageSize;//pageShifts 为13this.pageShifts = pageShifts;//maxOrder 为11,用于后续二叉树的创建this.maxOrder = maxOrder;//chunkSize 为16Mthis.chunkSize = chunkSize;//unusable为12后文用到unusable = (byte) (maxOrder + 1);//log2ChunkSize 为24log2ChunkSize = log2(chunkSize);subpageOverflowMask = ~(pageSize - 1);freeBytes = chunkSize;assert maxOrder < 30 : "maxOrder should be < 30, but is: " + maxOrder;maxSubpageAllocs = 1 << maxOrder;// 创建4096大小的byte数组,表示一颗二叉树,记录每一个节点所在的深度,后续我们就可以基于这个深度得到对应内存大小memoryMap = new byte[maxSubpageAllocs << 1];depthMap = new byte[memoryMap.length];int memoryMapIndex = 1;for (int d = 0; d <= maxOrder; ++ d) {int depth = 1 << d;for (int p = 0; p < depth; ++ p) {// in each level traverse left to right and set value to the depth of subtreememoryMap[memoryMapIndex] = (byte) d;depthMap[memoryMapIndex] = (byte) d;memoryMapIndex ++;}}//创建一个2048大小的PoolSubpage数组subpages = newSubpageArray(maxSubpageAllocs);}
唯一我们需要留心了解的就是memoryMap
(depthMap
记录的东西也是一样),首先它是一个基于数组创建的二叉树,其次每一个节点都记录着数的深度,注意索引0是无用的,后续我们就可以基于所需的内存大小定位到对应深度空闲未用到的节点完成内存分配。
以上面的代码为例,最终生成的二叉树大致为这样,可以看到数组中的索引0节点没有用到,而是从索引开始:
- 第1层1个节点,即索引1节点。
- 第2层2个节点,即索引2、3节点。
- 如此往复计算,第11层为2048个节点。
对应的每一个层级都代表一个内存范围,以我们本次代码为例,要求16k,按照当前二叉树结构,即取11层前两个空闲节点:
自此我们完成了PoolChunk的创建,于是代码来到了:
long handle = c.allocate(normCapacity);
步入allocate方法会和subpageOverflowMask进行&运算,得出结果不为0,说明基于normCapacity二进制高位存在1大于pageSize,所以直接步入allocateRun:
long allocate(int normCapacity) {//subpageOverflowMask二进制为11111111111111111110000000000000if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSizereturn allocateRun(normCapacity);} else {return allocateSubpage(normCapacity);}}
步入allocateRun
,因为我们的需求容量为16k,所以步入allocateRun
的执行逻辑为:
- 所以经过运算后d为10层,即全是16k那层。
- 获取到该层空闲id为1024,即第10层首节点。
freeBytes
减去本次要求的16k,代表当前内存已用16k。- 返回id,该id即作为外部调用
handle
的值。
private long allocateRun(int normCapacity) {int d = maxOrder - (log2(normCapacity) - pageShifts);int id = allocateNode(d);if (id < 0) {return id;}freeBytes -= runLength(id);return id;}
基于上一步的handle
,正式开始内存分配,可以看到我们的入参为创建的buf,对应内存地址handle
,需求内存大小reqCapacity
:
c.initBuf(buf, handle, reqCapacity);
步入后就会看到这样一段代码逻辑:
- 基于handle拿到内存对应内存的索引和bitmapIdx。
- 如果bitmapIdx不为0则调用initBufWithSubpage,反之进入步骤3。
- 直接调用init方法对buf进行内存分配,可以看到入参为,当前调用的buf,handle即内存指针,还有内存偏移量,需要的内存大小,和本次所用的节点内存长度,和当前线程的parent的threadCache。
void initBuf(PooledByteBuf<T> buf, long handle, int reqCapacity) {int memoryMapIdx = memoryMapIdx(handle);int bitmapIdx = bitmapIdx(handle);if (bitmapIdx == 0) {byte val = value(memoryMapIdx);assert val == unusable : String.valueOf(val);buf.init(this, handle, runOffset(memoryMapIdx), reqCapacity, runLength(memoryMapIdx),arena.parent.threadCache());} else {initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity);}}
步入init可以看到该方法做了两件事:
- 调用init分配内存。
- 记录内存地址。
@Overridevoid init(PoolChunk<ByteBuffer> chunk, long handle, int offset, int length, int maxLength,PoolThreadCache cache) {super.init(chunk, handle, offset, length, maxLength, cache);initMemoryAddress();}
终于看到,init方法分配内存的逻辑,即通过handle得到内存地址,通过chunk.memory得到一块buf,然后记录cache等信息。
void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {assert handle >= 0;assert chunk != null;//记录本地调用的chunkthis.chunk = chunk;//记录handlethis.handle = handle;//记录上述步骤得到bufmemory = chunk.memory;//记录当前节点在二叉树对应层的偏移量,例如笔者本次的节点为1024,即在第10层第1个节点,所以offset为0this.offset = offset;//记录本次所需的长度this.length = length;//记录分配内存节点大小this.maxLength = maxLength;tmpNioBuf = null;this.cache = cache;}
然后就是记录实际使用的内存地址,后续读写时都是以这个memoryAddress
作为标量进行计算,这一点查看上文中对于堆内存的_getByte
方法即可印证:
private void initMemoryAddress() {memoryAddress = PlatformDependent.directBufferAddress(memory) + offset;}
自此我们将内存分配的过程全部讲解完成了,来小结一下整体步骤:
- 创建
chunk
。 - 基于二叉树查找我们所需内存分配大小对应的节点位置,以便后续记录内存分配情况。
- 分配内存,记录内存起始地址。
subPage级别的内存分配
subPage
级别的内存分配流程也是差不多的,我们以这段代码为入口进行阅读:
public static void main(String[] args) {int page = 1024 * 8;PooledByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT;ByteBuf byteBuf = allocator.directBuffer(16);}
和page
级别内存分配过程相比,前面bytebuf
创建是一样的,我们直接从内存分配这一步开始讲解:
- subpage级别的内存属于tiny这个区间,所以会进入isTinyOrSmall这个判断的分支
- 第一次在cache.allocateTiny这一步分配失败后,会尝试基于Chunk缓存池进行分配内存,若成功直接返回,反之进入步骤3。
- 基于allocateNormal进行内存分配。
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {final int normCapacity = normalizeCapacity(reqCapacity);if (isTinyOrSmall(normCapacity)) { // capacity < pageSizeint tableIdx;PoolSubpage<T>[] table;boolean tiny = isTiny(normCapacity);if (tiny) { // < 512if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {// was able to allocate out of the cache so move onreturn;}tableIdx = tinyIdx(normCapacity);table = tinySubpagePools;} else {//略}final PoolSubpage<T> head = table[tableIdx];/*** Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and* {@link PoolChunk#free(long)} may modify the doubly linked list as well.*/synchronized (head) {final PoolSubpage<T> s = head.next;if (s != head) {assert s.doNotDestroy && s.elemSize == normCapacity;long handle = s.allocate();assert handle >= 0;s.chunk.initBufWithSubpage(buf, handle, reqCapacity);if (tiny) {allocationsTiny.increment();} else {allocationsSmall.increment();}return;}}allocateNormal(buf, reqCapacity, normCapacity);return;}//略}
我们直接按照初次分配流程走到allocateNormal,在前面的PoolChunkList分配失败后,创建一个全新的chunk进行内存分配,newChunk整体初始化好读写的bytebuf以及subpages(用户缓存PoolSubpage),我们之前从 long handle = c.allocate(normCapacity);
开始讲解subpage级别内存分配和page级别内存分配的区别:
private synchronized void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||q075.allocate(buf, reqCapacity, normCapacity)) {++allocationsNormal;return;}// Add a new chunk.PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);long handle = c.allocate(normCapacity);++allocationsNormal;assert handle > 0;c.initBuf(buf, handle, reqCapacity);qInit.add(c);}
与page级别分配不同的是,它会走到allocateSubpage这个方法:
long allocate(int normCapacity) {if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSizereturn allocateRun(normCapacity);} else {return allocateSubpage(normCapacity);}}
进入allocateSubpage,其整体步骤为:
- 从PoolSubpage缓存池中找到PoolSubpage。
- 基于memoryMap二叉树的叶子节点开始(subpage级别maxOrder从11开始计算),找到合适的节点。
- 基于步骤2找到的id到subpages中是否存在对应节点,若存在直接完成分配返回,反之进入步骤4。
- 创建一个全新的PoolSubpage,并加入subpages中。
- 调用allocate完成内存分配。
private long allocateSubpage(int normCapacity) {// Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.// This is need as we may add it back and so alter the linked-list structure.PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);synchronized (head) {int d = maxOrder; // subpages are only be allocated from pages i.e., leavesint id = allocateNode(d);if (id < 0) {return id;}final PoolSubpage<T>[] subpages = this.subpages;final int pageSize = this.pageSize;freeBytes -= pageSize;int subpageIdx = subpageIdx(id);PoolSubpage<T> subpage = subpages[subpageIdx];if (subpage == null) {subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity);subpages[subpageIdx] = subpage;} else {subpage.init(head, normCapacity);}return subpage.allocate();}}
ByteBuf的回收
了解了ByteBuf
的分配,再来讲讲ByteBuf
的回收,我们还是给出这样一段示例代码,通过release
方法完成内存回收:
public static void main(String[] args) {int page = 1024 * 8;PooledByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT;//尝试获取两个page内存大小的byteBufByteBuf byteBuf = allocator.directBuffer(2 * page);byteBuf.release();}
步入release
方法,来到了AbstractReferenceCountedByteBuf
的release
,可以看到它仅仅是一个release0
的自调用,传入一个1:
@Overridepublic boolean release() {return release0(1);}
步入查看release0
的可知具体逻辑:
- 查看
refCnt
是否大于decrement
,如果有线程释放过直接抛出异常,反之进入下一步。 - 基于
CAS
将refCnt
减去传入的decrement
,即我们上文说到的入参1。 - 调用
deallocate
完成内存回收。
private boolean release0(int decrement) {for (;;) {int refCnt = this.refCnt;if (refCnt < decrement) {throw new IllegalReferenceCountException(refCnt, -decrement);}if (refCntUpdater.compareAndSet(this, refCnt, refCnt - decrement)) {if (refCnt == decrement) {deallocate();return true;}return false;}}}
我们继续步进,来到了PooledByteBuf
的deallocate
可以看到其核心步骤:
- 判断
handle
是否大于0,不大于0说明已被重置或者是无效handle
。 - 重置
handle
为-1
。 - 将
memory
置空,即释放创建allocator
分配的ByteBuffer
的引用,辅助GC
。 - 对应
chunk.arena
添加到缓存中,便于后续复用。 - 将
PooledByteBuf
放到缓存中以便复用。
@Overrideprotected final void deallocate() {//判断handle 是否为有效handle if (handle >= 0) {final long handle = this.handle;//置空handlethis.handle = -1;//清空对应的ByteBuffermemory = null;chunk.arena.free(chunk, handle, maxLength, cache);recycle();}}
我们直接来看看PoolArena
的free
方法的内部逻辑,它会判断当前的chunk是否是池化的,如果不是则直接销毁,反之进行回收,所以我们的代码走到了分支2,整体逻辑为:
- 基于我们分配的内存大小得到
sizeClass
。 - 基于
sizeClass
回收到指定缓存中。
void free(PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache) {if (chunk.unpooled) {int size = chunk.chunkSize();destroyChunk(chunk);activeBytesHuge.add(-size);deallocationsHuge.increment();} else {//基于分配大小得到sizeClassSizeClass sizeClass = sizeClass(normCapacity);//将分配的内存回收到缓存中if (cache != null && cache.add(this, chunk, handle, normCapacity, sizeClass)) {// cached so not free it.return;}freeChunk(chunk, handle, sizeClass);}}
基于normCapacity(16k)
可以得到我们的sizeClass
为normal
这个内存区间,我们看到cache
的add
做了如下几件事:
- 基于外部入参得到
normal
级别的cache
,并返回这个cache
对象。 - 将
chunk
和handle
存到这个cache
的队列中,便于后续内存分配直接用这几个对象完成内存分配。
boolean add(PoolArena<?> area, PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass) {//基于外部入参得到`normal`级别的cache,并返回这个cache对象MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);if (cache == null) {return false;}// 将chunk和handle存到这个cache的队列中,便于后续内存分配直接用这几个对象完成内存分配return cache.add(chunk, handle);}
我们先来看看cache
方法的逻辑,因为我们的sizeClass
为Normal
,所以直接调用cacheForNormal
:
private MemoryRegionCache<?> cache(PoolArena<?> area, int normCapacity, SizeClass sizeClass) {switch (sizeClass) {case Normal:return cacheForNormal(area, normCapacity);case Small:return cacheForSmall(area, normCapacity);case Tiny:return cacheForTiny(area, normCapacity);default:throw new Error();}}
因为我们的area
是直接内存的,所以直接步入if逻辑,它会基于我们的normCapacity
得到一个索引位置idx
,然后调用cache
得到normalDirectCaches
中一个缓存:
private MemoryRegionCache<?> cacheForNormal(PoolArena<?> area, int normCapacity) {if (area.isDirect()) {//基于我们的normCapacity得到一个索引位置idxint idx = log2(normCapacity >> numShiftsNormalDirect);//调用cache得到normalDirectCaches中的一个cachereturn cache(normalDirectCaches, idx);}int idx = log2(normCapacity >> numShiftsNormalHeap);return cache(normalHeapCaches, idx);}
得到cache之后就是调用add方法,可以看到其内部逻辑:
- 基于
chunk
和handle
创建一个entry
。 - 将其存入当前cache的queue中,便于后续内存分配时,直接复用这两个对象,避免后续分配时频繁的
handle
计算和chunk
各种memoryMap
等对象计算和创建:
public final boolean add(PoolChunk<T> chunk, long handle) {Entry<T> entry = newEntry(chunk, handle);boolean queued = queue.offer(entry);if (!queued) {// If it was not possible to cache the chunk, immediately recycle the entryentry.recycle();}return queued;}
自此我们也将Bytebuf
内存回收的逻辑讲完了,我们也来简单小结一下其核心步骤:
- 基于
CAS
完成refCnt
自减。 - 清空
PooledByteBuf
的handle值。 - 将
chunk
和handle
回收到normalDirectCaches
中便于后续内存分配时直接复用。
缓存数据结构
上文提到了复用缓存即MemoryRegionCache
的,每当新创建的chunk分配的内存被释放时,线程中的MemoryRegionCache
数组就会将其回收以便复用,这也就是为什么我们使用PooledByteBufAllocator
创建bytebuf时内存分配都会经过cache.allocateTiny
、cache.allocateSmall
等方法尝试先分配,就是为了确保之前回收的内存能够被复用。
查看其源码可以发现其核心成员变量如下,其中:
size
为缓存数组queue
的容量大小。queue
中的Entry
存储handle
和chunk
,handle
即内存指针,而chunk
则是bytebuffer
以及内存相关描述信息的,通过两者可以得到一个分配好内存的bytebuf
。sizeClass
代表当前MemoryRegionCache
的内存规格。
//queue的容量大小private final int size;// queue中的Entry存储handle和chunk,我们基于handle和chunk可以完成一个bytebuf的内存分配private final Queue<Entry<T>> queue;//记录当前MemoryRegionCache规格例如:Tiny,Small,Normalprivate final SizeClass sizeClass;
我们都知道每个线程都有自己的PoolThreadCache
,它们按照堆和直接内存将MemoryRegionCache
细拆可以分为6种:
final class PoolThreadCache {private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;private final MemoryRegionCache<byte[]>[] normalHeapCaches;private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
对应的我们以tinySubPageHeapCaches
查看这些缓存的初始化逻辑,从该方法可知,它想创建一个大小为tinyCacheSize
即512b
,容量为PoolArena.numTinySubpagePools
即32,SizeClass
为tiny
的MemoryRegionCache
数组:
tinySubPageHeapCaches = createSubPageCaches(tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
步入后即可看到对应的创建逻辑:
- 创建
MemoryRegionCache
数组。 - 基于
numCaches
循环变量完成数组初始化。
private static <T> MemoryRegionCache<T>[] createSubPageCaches(int cacheSize, int numCaches, SizeClass sizeClass) {if (cacheSize > 0) {@SuppressWarnings("unchecked")MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];for (int i = 0; i < cache.length; i++) {// TODO: maybe use cacheSize / cache.lengthcache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);}return cache;} else {return null;}}
自此每个线程中的MemoryRegionCache
数组都初始化好了,每当全新创建的chunk
使用完时,MemoryRegionCache
数组都会按照制定算法从数组中找到合适的MemoryRegionCache
完成回收以便复用。
PoolArena基本概念详解
从上文我们都知道内存分配时若MemoryRegionCache
分配不成功则会通过PoolArena
,当PoolArena
分配的内存使用完毕之后MemoryRegionCache
会将PoolArena
的PoolChunk
和handle
回收以便下次内存分配可以复用 :
final class PoolThreadCache {final PoolArena<byte[]> heapArena;final PoolArena<ByteBuffer> directArena;}
我们查看PoolArena
的源码,首先会看到这也一个枚举,它定义了PoolArena
的3中规格区间:
enum SizeClass {//0-512BTiny,//512B-8kSmall,//8K-16M Normal}
我们直接以构造方法为入口了解一下PoolArena,其核心步骤为:
- parent记录当前分配的PooledByteBufAllocator ,pageSize为8k,maxOrder 为11即memoryMap层级,pageShifts为13用于后续内存分配时二叉树的计算,chunkSize为chunk的大小即16M,subpageOverflowMask用于判断每次进行内存分配时所需内存是否page还是subpage。
- 初始化tinySubpagePools 、smallSubpagePools用于缓存用户每次新分配的内存信息。
- 初始化PoolChunkList,PoolChunkList会按照使用率存储不同的PoolChunk,例如qInit记录使用率为未使用~25,q000 记录使用率1 ~ 50依次类推。
protected PoolArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize) {this.parent = parent;this.pageSize = pageSize;this.maxOrder = maxOrder;this.pageShifts = pageShifts;this.chunkSize = chunkSize;subpageOverflowMask = ~(pageSize - 1);tinySubpagePools = newSubpagePoolArray(numTinySubpagePools);for (int i = 0; i < tinySubpagePools.length; i ++) {tinySubpagePools[i] = newSubpagePoolHead(pageSize);}numSmallSubpagePools = pageShifts - 9;smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);for (int i = 0; i < smallSubpagePools.length; i ++) {smallSubpagePools[i] = newSubpagePoolHead(pageSize);}q100 = new PoolChunkList<T>(null, 100, Integer.MAX_VALUE, chunkSize);q075 = new PoolChunkList<T>(q100, 75, 100, chunkSize);q050 = new PoolChunkList<T>(q075, 50, 100, chunkSize);q025 = new PoolChunkList<T>(q050, 25, 75, chunkSize);q000 = new PoolChunkList<T>(q025, 1, 50, chunkSize);qInit = new PoolChunkList<T>(q000, Integer.MIN_VALUE, 25, chunkSize);q100.prevList(q075);q075.prevList(q050);q050.prevList(q025);q025.prevList(q000);q000.prevList(null);qInit.prevList(qInit);List<PoolChunkListMetric> metrics = new ArrayList<PoolChunkListMetric>(6);metrics.add(qInit);metrics.add(q000);metrics.add(q025);metrics.add(q050);metrics.add(q075);metrics.add(q100);chunkListMetrics = Collections.unmodifiableList(metrics);}
查看PoolChunkList构造,就是记录列表前后节点以及使用率,还有当前列表中每个节点可分配的最大容量maxCapacity 。
PoolChunkList(PoolChunkList<T> nextList, int minUsage, int maxUsage, int chunkSize) {assert minUsage <= maxUsage;this.nextList = nextList;this.minUsage = minUsage;this.maxUsage = maxUsage;maxCapacity = calculateMaxCapacity(minUsage, chunkSize);}
按照上述PoolChunkList前驱和后继的组装最终可以构成这样一个链表+列表的数据结构:
PoolArena是中还有这两个池,用于缓存每一次新创建的PoolSubpage:
private final PoolSubpage<T>[] tinySubpagePools;private final PoolSubpage<T>[] smallSubpagePools;
查阅PoolSubpage其核心成员变量如下:
final class PoolSubpage<T> implements PoolSubpageMetric {//记录内存分配用到的chunkfinal PoolChunk<T> chunk;//记录内存分配二叉树的索引数private final int memoryMapIdx;//当前chunk在depthMap所在层级的偏移量private final int runOffset;//一个page为8kprivate final int pageSize;//用于记录chunk的使用情况,每当smallSubpagePools某个chunk被使用,bitmap就会在对应索引位置+1private final long[] bitmap;//记录前后节点PoolSubpage<T> prev;PoolSubpage<T> next;//判断当前PoolSubpage是否被销毁boolean doNotDestroy;//当前元素大小int elemSize;//最大可用的元素private int maxNumElems;//bitmapLength长度private int bitmapLength;//下一个可用的bitmapIdx索引位置private int nextAvail;//记录还可以分配的subpage个数private int numAvail;
ByteBuf常见面试题
ByteBuf内存的类别有哪些?
我们可以从3种不同的方式进行分类:
- 按照分配的内存是否可以复用可分为Pooled和Unpooled。
- 按照是否可以直接操作内存地址可分为Unsafe和非Unsafe。
- 按照内存是否是在JVM上分配可分为Direct和Heap。
ByteBuf如何减少多线程内存分配之间的竞争?
PooledByteBufAllocator内部维护了几个Arena数组,所有的内存分配都会基于这些初始化好的Arena数组进行分配。并且Netty通过PoolThreadCache使得线程和Arena意义对应,从而尽可能避免了多线程间分配的竞争。
不同大小的内存是如何进行分配的?
对于Page级别的内存分配,Netty通过完全二叉树,即上文说的memoryMap查找到某段未使用的连续内存,然后通过这个索引位置定位到实际的内存地址完成分配,而Page级别以下也是到memoryMap找对应索引位置的内存,然后按照subPage大小进行划分在进行内存分配,并基于bitMap记录当前这个内存的使用情况。
ByteBuf如何实现复用的
当我们使用PooledByteBufAllocator
常见Bytebuf
时,PooledByteBufAllocator
在初始化时会创建一定数量的池化对象(PoolChunk)
。每个池化对象都表示一块内存区域,用于存储 ByteBuf
对象。
然后PooledByteBufAllocator
会从可用的池化对象(PoolChunk)
按照请求的大小选择最接近或稍微大于请求大小的池化对象。如果没有可用的池化对象,PooledByteBufAllocator
会创建一个新的池化对象。
当一个 ByteBuf
对象不再使用时,它会被返回给 PooledByteBufAllocator
。PooledByteBufAllocator
会将该对象内存分配时用到的handle
和池化对象(PoolChunk)
加入缓存中,以便下次内存分配时进行复用。
参考
【Netty源码】ByteBuf源码剖析:https://blog.csdn.net/baiye_xing/article/details/76551937
Netty零拷贝之CompositeByteBuf实际用法:https://blog.csdn.net/youxijishu/article/details/104815309/
吃透Netty源码系列三十五之CompositeByteBuf详解一:https://blog.csdn.net/wangwei19871103/article/details/104486129
netty源码阅读之ByteBuf之ByteBuf结构和重要API:https://blog.csdn.net/fst438060684/article/details/81915064
netty源码阅读之ByteBuf之内存page级别内存的分配:https://blog.csdn.net/fst438060684/article/details/82532093
Netty之ByteBuf深入分析:https://www.jianshu.com/p/2498db9c91fe
这篇关于Netty源码ByteBuf详解的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!