本文主要是介绍tiny-Tcmalloc(高并发内存池),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
项目地址(绝对可运行)
一. 初识高并发内存池
1、项目介绍
当前项目是实现一个高并发的内存池,它的原型是google的一个开源项目tcmalloc,tcmalloc全称Thread-Caching Malloc,即线程缓存的malloc,实现了高效的多线程内存管理,用于替代系统的内存分配相关的函数(malloc 、free)。
2、项目所需的知识
C/C++、数据结构(链表、哈希桶)、操作系统内存管理、单例模式、多线程、互斥锁、基数树等方面的知识。
3、了解池化技术
所谓“池化技术”,就是程序先向系统申请过量的资源,然后自己管理,以备不时之需。之所以要申请过量的资源,是因为每次申请该资源都有较大的开销,不如提前申请好了,这样使用时就会变得非常快捷,大大提高程序运行效率。
在计算机中,有很多使用“池”这种技术的地方,除了内存池,还有连接池、线程池、对象池等。以服务器上的线程池为例,它的主要思想是:先启动若干数量的线程,让它们处于睡眠状态,当接收到客户端的请求时,唤醒池中某个睡眠的线程,让它来处理客户端的请求,当处理完这个请求,线程又进入睡眠状态。
内存池是指程序预先从操作系统申请一块足够大内存,此后,当程序中需要申请内存的时候,不是直接向操作系统申请,而是直接从内存池中获取;同理,当程序释放内存的时候,并不真正将内存返回给操作系统,而是返回内存池。当程序退出(或者特定时间)时,内存池才将之前申请的内存真正释放。
4、内存池的作用
内存池主要解决的问题有两个:
- 效率问题(池化技术)
- 内存碎片问题(外部碎片指的是一些空闲的连续内存区域太小,且这些内存空间不连续,以至于虽然空闲的内存空间足够,但是不能满足一些内存分配的申请需求。内部碎片指的是由于一些对齐的要求(比如struct结构体中的内存对齐),导致分配出去的空间中一些内存无法被利用。)
tcmalloc和malloc的关系
动态申请和释放内存是在进程地址空间的堆区上申请的,但实际上一般不会直接去堆上申请,因为效率和内存碎片这两个问题。
实际上C/C++程序申请空间的时候不是直接去堆上,而是借助malloc函数申请的,释放的时候就是free,也就是C标准库提供的这两个函数。
C++中不是在用new和delete吗?new和delete归根结底中还是用了malloc和free的。new和delete中封装了operator new和operator delete。operator new中又调用了malloc来帮助开空间,封装malloc符合C++中new失败了抛异常的机制。operator delete中也是通过宏定义后的free来释放空间。所以说C/C++中申请和释放内存块最终还是通过malloc和free来实现的。
二、定长内存池
_memory用来指向大块的内存:
自由链表的头指针_freelist用来管理还回来的空间
#pragma once
#include<Windows.h>
#include"Common.h"inline static void* SystemAlloc(size_t kpage){// Windows下的系统调用接口void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);if (ptr == nullptr){throw std::bad_alloc();}return ptr;
}template<class T>
class ObjectPool
{
public:T* New(){T* obj = nullptr;//申请的空间if (_freeList){//内存中有空闲的空间//头删void* next = *(void**)_freeList;//*(void**)是前指针个字节指向的空间obj = (T*)_freeList;_freeList = next;}else{//自由链表中没有可以重复利用的块,需要向内存块中申请空间//内存块还剩的内存小于T的大小,需要开空间if (_remainBytes < sizeof(T)){_remainBytes = 128 * 1024;//开128KB的空间//_memory = (char*)malloc(_remainBytes);// 右移13位,就是除以8KB,也就是得到的是16,这里就表示申请16页_memory = (char*)SystemAlloc(_remainBytes >> 13);if (_memory == nullptr){throw std::bad_alloc();}}//向内存块申请一个T大小的空间obj = (T*)_memory;//判断T的大小,若小于指针大小,就直接分配一个指针大小,否则分配T的大小size_t objSize = sizeof(void*) > sizeof(T) ? sizeof(void*) : sizeof(T);_memory += objSize;_remainBytes -= objSize;}new(obj)T;//通过定位new调用构造函数对T进行初始化return obj;}//回收归还的空间void Delete(T* obj){//显式调用析构函数,进行清理obj->~T();//头插*(void**)obj = _freeList;_freeList = obj;}private:char* _memory = nullptr;//指向内存块的指针size_t _remainBytes = 0;//大块内存在切分后还剩的内存void* _freeList = nullptr;//自由链表,连接归还回来的内存public:mutex _poolMtx;//防止tc申请时申请到空指针
};
三、高并发内存池
现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc本身其实已经很优秀,但是我们项目的原型tcmalloc就是在多线程高并发的场景下更胜一筹,所以这次实现的内存池需要考虑以下几方面的问题。
- 性能问题。
- 多线程环境下,锁竞争问题。
- 内存碎片问题。
主要由以下3个部分构成:
- 一个进程中有几个线程,就会有几个tc,也就是每一个线程都会有其对应的tc。
- 如上图所示,假设有三个线程,分别为t1、t2、t3。每个线程去动态申请内存时不需要加锁,因为每个线程独享一个tc,如果tc中有空闲空间,线程在申请的时候只会去自己的tc中申请。单次向tc申请的最大上限为256KB。
- 如果单次申请的空间是小于256KB的,线程就在自己的tc中申请,且大部分情况下申请空间是不会大于256KB的,这样就可以解决大部分情况下的锁竞争问题,因为自己向自己的tc申请空间不需要加锁。
- 如果单次申请的空间大于了256KB,tc就会用其他方式去申请空间。
- 中心缓存是所有线程共享,thread cache是按需从central cache中获取的对象。central cache合适的时机回收thread cache中的对象,避免一个线程占用了太多的内存,而其他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的目的。
- central cache是存在竞争的,所以从这里取内存对象是需要加锁,首先这里用的是桶锁,其次只有thread cache的没有内存对象时才会找central cache,所以这里竞争不会很激烈。
- cc内部是由哈希桶实现的,有多个哈希桶,每个桶有一串空间,只有多个线程同时并发申请同一个桶的时候才会有线程安全问题,每一个桶自身会有一个桶锁。比如,t1申请3号桶的空间,t2申请5号桶的空间,此时是不用加锁的;再比如,t1和t2同时申请4号桶的空间,此时才需要对4号桶的申请加锁,所以就算是多线程并发向cc申请锁也不会有非常激烈的竞争。
- cc会在合适的时机回收tc中的对象空间,比如t1原先申请了很多内存,用完之后空闲了很多,那么此时cc就可以回收掉t1的tc中空闲的空间,如果此时t2急着用空间,正好就可以将这些空闲空间给t2。这样就起到了一个均衡调度的功能,不至于说一个人吃得太多而另一个没得吃。
- 当cc中空间不够时会再向page cache申请,而且如果cc回收回来的内存没有让tc申请走时还会再交给pc来解决内存碎片的问题。
- page cache是在central cache缓存上面的一层缓存,pc中会组织很多个叫做span的结构体,是以页为单位存储及分配的。
- central cache没有内存对象时,从page cache分配出一定数量的page,并切割成定长大小的小块内存,分配给central cache。
- 当一个span的几个跨度页的对象都回收以后,page cache会回收central cache满足条件的span对象,并且合并相邻的页,组成更大的页,缓解内存碎片的问题。
哈希表与内存对齐
在使用ThreadCache和CentralCache时,都需要根据所需内存的字节数去对应的哈希桶中申请内存,若我们采用一对一的哈希映射关系,即以字节为单位的任意大小的内存均要对应上哈希表的下标,且因为管理内存采用的是单链表的方式(申请的内存大小至少为4或8字节),即哈希表的下标范围应为[8,256*1024],显然下标范围这么大的哈希表不切实际,且实用性低。因此要采用一定的内存对齐原则。我们采用如下原则进行内存对齐:
size范围就是指线程申请的空间范围,[1, 128]就是指申请空间在1~128B以内的,对齐数就是8B,也就是如果申请3B,就要对齐到8B,如果申请的是13B就要对齐到16B,就像这样找到对应大于size的8的最小倍数。对齐到几B就给线程提供几B的空间。
那么对应哈希桶下标范围也就很好理解了,下标为0的哈希桶(自由链表)连接的就是大小为8B的块空间,下标为1的哈希桶(自由链表)连接的就是大小为16B的块空间,下标为2的哈希桶(自由链表)连接的就是大小为24B的块空间……
[128 + 1, 1024]就是指申请空间在129~1024B以内的,对齐数就是16B,也就是如果申请130B,就要对齐到128 + 16 = 144B,也就是申请130B就会给线程144B。下标为16的哈希桶连接的就是144B的块空间。
这样不仅缩小哈希表长度,大幅减少哈希表占用的空间,而且有效控制了内碎片的浪费问题。
我们可以根据所需字节数算出对齐后的字节数,进而根据对齐后的字节数算出哈希表对应的桶的位置。
代码实现如下:
// 整体控制在最多10%左右的内碎片浪费
// [1,128] 8byte对齐 freelist[0,16)
// [128+1,1024] 16byte对齐 freelist[16,72)
// [1024+1,8*1024] 128byte对齐 freelist[72,128)
// [8*1024+1,64*1024] 1024byte对齐 freelist[128,184)
// [64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208)//一页多少位,一页8KB,就是13位
static const size_t PAGE_SHIFT = 13;
//ThreadCache单次申请的最大字节256KB
static const size_t MAX_BYTES = 256 * 1024;class SizeClass{
public://根据申请的字节数计算出对应的对齐数static size_t RoundUp(size_t size){if (size <= 128){return _RoundUp(size, 8);}else if (size <= 1024){return _RoundUp(size, 16);}else if (size <= 8*1024){return _RoundUp(size, 128);}else if (size <= 64*1024){return _RoundUp(size, 1024);}else if (size <= 256*1024){return _RoundUp(size, 8 * 1024);}else{//单次申请空间大于256KB,直接按照页来对齐return _RoundUp(size, 1 << PAGE_SHIFT);}}//alignNum 对应分区的对齐数static size_t _RoundUp(size_t size, size_t alignNum){size_t ret;if (size % alignNum != 0){//有余数,要多给一个对齐数ret = (size / alignNum + 1) * alignNum;}else{//没有余数,本身就能对齐ret = size;}return ret;//二进制写法//return ((size + alignNum - 1)& ~(alignNum - 1));}// 求size对应在哈希表中的下标static inline size_t _Index(size_t size, size_t align_shift){ /*这里align_shift是指对齐数的二进制位数。比如size为2的时候对齐数为8,8就是2^3,所以此时align_shift就是3*/return ((size + (1 << align_shift) - 1) >> align_shift) - 1;//这里_Index计算的是当前size所在区域的第几个下标,所以Index的返回值需要加上前面所有区域的哈希桶的个数}// 计算映射的哪一个自由链表桶static inline size_t Index(size_t size){assert(size <= MAX_BYTES);// 每个区间有多少个链static int group_array[4] = { 16, 56, 56, 56 };if (size <= 128){ // [1,128] 8B -->8B就是2^3B,对应二进制位为3位return _Index(size, 3); // 3是指对齐数的二进制位位数,这里8B就是2^3B,所以就是3}else if (size <= 1024){ // [128+1,1024] 16B -->4位return _Index(size - 128, 4) + group_array[0];}else if (size <= 8 * 1024){ // [1024+1,8*1024] 128B -->7位return _Index(size - 1024, 7) + group_array[1] + group_array[0];}else if (size <= 64 * 1024){ // [8*1024+1,64*1024] 1024B -->10位return _Index(size - 8 * 1024, 10) + group_array[2] + group_array[1]+ group_array[0];}else if (size <= 256 * 1024){ // [64*1024+1,256*1024] 8 * 1024B -->13位return _Index(size - 64 * 1024, 13) + group_array[3] +group_array[2] + group_array[1] + group_array[0];}else{assert(false);}return -1;}
};
ThreadCache申请实现
Thread Cache中是哈希表的结构,哈希表中的哈希桶采用单链表的形式将内存块链接起来,与定长内存池链接内存块的方式一致。
//获取obj的头4/8个字节
static void*& objNext(void* obj){return *(void**)obj;
}class FreeList {
public://回收空间void Push(void* obj){assert(obj);//头插objNext(obj) = _freeList;_freeList = obj;++_size;} //提供空间void* Pop(){assert(_freeList);//头删void* obj = _freeList;_freeList = objNext(_freeList);--_size;return obj;}bool Empty() { //查看自由链表是否为空return _freeList == nullptr;}
private:void* _freeList = nullptr;
};
线程如何独享ThreadCache
每个线程去申请ThreadCache时是在堆上申请的,但一个进程中每个线程共享一份堆区,所以无法保证每个线程独享一份ThreadCache,只能通过加锁的方式来共用ThreadCache。有办法支持线程无锁访问ThreadCache并且每个线程都独享一份ThreadCache吗?答案是有的。即线程局部存储:线程局部存储(TLS),是一种变量的存储方法,这个变量在它所在的线程内是全局可访问的,但是不能被其他线程访问到,这样就保持了数据的线程独立性。
当每个线程去自己独享的ThreadCache中申请内存时,ThreadCache会根据所需字节数先按照内存对齐的原则,再找到对应桶的下标,去取内存块给线程。当对应下标的桶中没有内存时,则该Threadcache应该向CentralCache申请内存。
class ThreadCache{
public:void* Allocate(size_t size);//线程申请size大小的空间void Deallocate(void* obj, size_t size);//回收线程中大小为size的obj空间void* FetchFromCentralCache(size_t alignSize, size_t index);//ThreadCache空间不够,向CentralCache申请空间private://FreeList用来存放和管理相同大小的块空间FreeList _freeLists[FREE_LIST_NUM];//哈希表,每个哈希桶表示一个自由链表
};//TLS的全局对象的指针,每个线程都能有一个独立的全局对象
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
//线程申请size大小的空间
void* ThreadCache::Allocate(size_t size){assert(size < MAX_BYTES);//tc单次申请不能大于256KBsize_t alignSize = SizeClass::RoundUp(size);//size对齐后的字节数size_t index = SizeClass::Index(size);//对应桶的下标if (!_freeLists[index].Empty()){//自由链表(哈希桶)不为空,直接从其获取空间return _freeLists[index].Pop();}else{//自由链表(哈希桶)为空,从CentralCache中获取空间return FetchFromCentralCache(alignSize, index);}
}
CentralCache申请实现
CentralCache也是一个哈希桶结构,它的哈希桶的映射关系跟ThreadCache是一样的。不同的是它的每个哈希桶位置挂的是SpanList双链表结构,不过每个桶下面的Span中的大内存块按映射关系切成了所对应特定字节的一个个小内存块对象挂在Span的自由链表中。
//SpanList就是CentralCache中的哈希桶
class SpanList{
private:Span* _head;//带哨兵位的头结点
public:std::mutex _mutex;//每一个哈希桶也就是一个SpanList就要有一个桶锁SpanList(){_head = new Span();//双向链表_head->next = _head;_head->prev = _head;}//链表中是否有spanbool Empty(){//带头结点双向循环链表return _head == _head->next;}//头节点Span* Begin(){return _head->next;}//尾节点Span* End(){return _head;}//在pos之前插入ptrvoid Insert(Span* pos, Span* ptr){assert(pos);assert(ptr);Span* pre = pos->prev;pre->next = ptr;ptr->prev = pre;ptr->next = pos;pos->prev = ptr;}//头插void PushFront(Span* span){Insert(Begin(), span);}//头删Span* PopFront(){Span* front = _head->next;Erase(front);return front;}//删除pos位置的Spanvoid Erase(Span* pos){assert(pos);pos->prev->next = pos->next;pos->next->prev = pos->prev;//这里不需要调用delete,pos节点的Span需要回收}
};
typedef size_t PageID;
//以页为单位的结构体 span:管理以页为单位的大块内存
struct Span{
public:PageID _pageID = 0;//页号size_t _n = 0;//当前span管理的页的数量size_t _objSize = 0;//span管理页被切分成的块有多大void* _freeList = nullptr;//每个span下挂接小块空间的链表的头结点size_t useCount = 0;//当前span分配出去了多少块Span* prev = nullptr;Span* next = nullptr;bool _isUse = false;//判断当前span在cc中还是pc中
};
_pageId与_n
当我们走到最底层,即向系统申请内存空间时,系统返回给我们的指向这段空间起始地址的指针(32位系统下是4字节,64位系统下是八字节)。而_pageId就是与指针建立起映射关系!!假如现在我们申请的起始地址是(32位系统下)0xffff0000,那么它对应的起始页号_pageId就是524280。因为我们一页的大小为8KB,所以让它除以8192(十进制)就可以得出起始页号,同理根据它的页数_n的数量,就可以知道Span管理的大块内存的末尾地址,假设_n=1,那么它的末尾地址就是0xffff2000。而为了在不同的系统下都能够保存下_pageId的值,所以采用条件编译,32位系统下采用size_t类型,64位系统下采用unisgned long long类型。
_objectSize与_useCount
因为CentralCache采用的哈希表映射规则与Threadcache一致,所以每个Span挂到相应位置的桶的时候,它所切成的小块内存的大小必须是符合ThreadCache哈希表映射关系的,所以_objectSize可以是8字节-1024*256字节。前文说过,CentralCache是起一个调度作用的中间媒介,当ThreadCache中内存过多时,要进行回收并整合,同理当Span过多时,也要返回给PageCache,而_useCount就是对分出去的小内存块进行计数,并在必要时回收。
分析完CentralCache的哈希表结构后,再来思考一下如何将CentralCache只在全局中设计出一份,让每个线程都共享CentralCache。我们采用单例模式,即将CentralCache的构造和拷贝构造都加上delete,在全局只创建出唯一一个CentralCache。
class CentralCache{
private://SpanList用来管理一个一个Span的带头双向循环链表,其中每个Span用来管理和分配页空间的SpanList _SpanList[FREE_LIST_NUM];static CentralCache _sInst;//饿汉模式创建一个CentralCacheCentralCache(){}CentralCache(const CentralCache& c) = delete;CentralCache& operator=(const CentralCache& c) = delete;public:static CentralCache* getInstance(){return &_sInst;}//cc从自己的_SpanList中为tc提供tc所需要的块空间//start end 为输出型参数 表示给cc提供的空间的开始和结尾//n表示tc需要多少块size大小的空间//size表示tc需要的单块空间的大小//返回值为cc实际给tc提供的空间大小-size_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);//获取一个管理空间不为空的SpanSpan* getOneSpan(SpanList& list, size_t size);//将tc归还的多块空间放进span中void ReleaseListToSpans(void* start, size_t size);};
CentralCache如何将内存给ThreadCache
上文讲到ThreadCache没有内存后就向CentralCache索要内存,那么为了更好的体现池化思想,CentralCache应该每次都多给ThreadCache一点内存。在这里我们采用类似于TCP协议中的慢增长算法。
慢开始反馈调节算法
1 .最开始不会一次向central cache一次批量要太多,因为要太多了可能用不完
2. 如果你不要这个size大小内存需求,那么batchNum就会不断增长,直到上限
3. size越大,一次向central cache要的batchNum就越小
4. size越小,一次向central cache要的batchNum就越大
ThreadCache中FetchFromCentralCache代码如下:
//FetchFromCentralCache第二个参数直接给alignSize,就是指tc向cc申请空间的时候就不需要考虑对齐的问题了,直接申请整块的大小。
void* ThreadCache::FetchFromCentralCache(size_t index, size_t alignSize){// 慢开始反馈调节算法// 1、最开始不会向central cache一次批量要太多,因为要太多了可能用不完(一次要一个又容易导致频繁竞争锁)// 2、如果你不断要这个size大小内存需求,那么batchNum就会不断增长,直到上限// 3、size越大,一次向central cache要的batchNum就越小// 4、size越小,一次向central cache要的batchNum就越大// 从而使得ThreadCache向CentralCache获取内存的频率和大小控制在一个合理的水平。// MaxSize()表示index位置的自由链表单次申请未到上限时,能够申请多少内存块空间// NumFromCentral()表示tc单次向cc申请alignSize大小的空间块的最多块数是多少// 二者取小,得到的就是本次要给tc提供多少块alignSize大小的空间// 比如alignSize为8B,MaxSize为1,NumFromCentral为512,那就给一块8B的空间// 也就是说没有到上限就给MaxSize,到了上限就给NumFromCentralsize_t batchNum = std::min(_freeLists[index].MaxSize(), SizeClass::NumFromCentral(alignSize));if (_freeLists[index].MaxSize() == batchNum){_freeLists[index].MaxSize() += 1; //如果觉得增长地太慢,可以增加这里递增的值}void* start = nullptr;void* end = nullptr;size_t realNum = CentralCache::getInstance()->FetchRangeObj(start, end, batchNum, alignSize);//realNum一定是大于等于1的assert(realNum >= 1);if (realNum == 1){//直接将start返回给线程assert(start == end);return start;}else{//除了将start返回给线程,还要在tc对应的自由链表位置插入未用的空间[objNext(start),end]_freeLists->PushRange(objNext(start), end,realNum - 1);//返回realNum-1,因为头一块空间直接给了线程return start;}
}
此时ThreadCache采用了一个慢增长算法向CentralCache索要内存,那站在CentralCache的角度,它该如何将内存给ThreadCache呢?
1、首先CentralCache要找出桶中不为空的span
因为CentralCache中存的是SpanList,且整个进程只有一份CentralCache,所以每次访问对应的Spanlist桶时要加锁,访问时要找出Spanlist中不为空的Span
2、对该Span进行切分
拿出Span后由于它可能满足我们的需求,也有可能不满足,所以我们返回的是实际拿到的内存块数量。
代码如下:
//首先向cc自己的哈希桶中拿span的时候要加锁,如果cc的桶中没有span就要向pc申请span,
//那么在cc向pc申请span的时候需要将cc的桶锁解除,让其它向该cc的哈希桶进行归还操作的线程能拿到锁,
//然后加上pc的整体的锁,申请到新的span后解除pc的整体锁,然后cc对新span进行切分,
//切分完毕后,再将切完的span挂到对应cc桶中时就加上锁,然后给将这个新挂上去的span管理的空间交给需要的tc之后再解锁。Span* CentralCache::getOneSpan(SpanList& list, size_t size){//先在cc中找一个管理空间的非空spanSpan* it = list.Begin();while (it != list.End()){if (it->_freeList != nullptr){return it;}else{it = it->next;}}//将cc中的桶锁释放,让其它向该cc进行归还操作的线程能拿到锁list._mutex.unlock();//cc中没有找到管理空间的非空spansize_t k = SizeClass::NumFromPage(size);//pg加锁PageCache::getInstance()->_pageMtx.lock();Span* span = PageCache::getInstance()->newSpan(k);span->_isUse = true;//cc获取到了pc中的spanspan->_objSize = size;//记录span被切分的块有多大//pg解锁PageCache::getInstance()->_pageMtx.unlock(); //将_pageID强转为*类型char* start = (char*)(span->_pageID << PAGE_SHIFT); //页号 * 页大小char* end = (char*)(start + (span->_n << PAGE_SHIFT));//start + 页数 * 页大小//切分span管理的空间span->_freeList = start;void* tail = start;start += size;//start往后移一块,方便循环控制//连接各个块while (start < end){objNext(tail) = start;start += size;tail = objNext(tail);}objNext(tail) = nullptr;//切好span后,需要把span挂到cc对应下标的桶里面//span挂上去之前加锁list._mutex.lock();list.PushFront(span);//切好span后,需要把span挂到cc对应下标的桶里面return span;
}size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size){//获取的size对应哪一个spanListsize_t index = SizeClass::Index(size);//整个进程中只有一个cc,而且可能会有多个线程向同一个cc中的SpanList申请span中的块空间,所以要对SpanList的操作加锁_SpanList[index]._mutex.lock();//获取到一个管理空间非空的spanSpan* span = getOneSpan(_SpanList[index], size);assert(span);assert(span->_freeList);start = end = span->_freeList;//函数的返回值size_t realNum = 1;//在end的next不为空时,让end向前走batchNum-1while (--batchNum && objNext(end) != nullptr){end = objNext(end);realNum++;}//将[start,end]返回给ThreadCache后,调整span的_freeListspan->_freeList = objNext(end);span->useCount += realNum;//给tc分了多少空间//和原先的span的_freeList中的块断开objNext(end) = nullptr;_SpanList[index]._mutex.unlock();return realNum;
}
PageCache申请实现
PageCache 的核心结构是Spanlist的哈希桶,但是和CentralCache与ThreadCache有本质区别的,CentralCache中的哈希桶是按照ThreadCache一样的大小对齐关系映射的,它的Spanlist中挂的Span中的内存都被按映射关系切好链接成小块内存的自由链表。而PageCache 中的Spanlist则是按下标桶号映射的,也就是说第i号桶中挂的Span都是i页内存,同样全局只有一个PageCache,将PageCache设计为单例,且为了线程安全对其上锁。
class PageCache{
private://每个SpanList用来管理一个一个的span,每个span用来存放和管理页空间的SpanList _spanList[PAGE_NUM];//pc中的哈希std::unordered_map<PageID, Span*> _idSpanMap;//哈希映射 用来快速通过页号找到对应的spanstatic PageCache _sInst;//单例类对象ObjectPool<Span> _spanPool;//创建span的对象池PageCache(){}PageCache(const PageCache& p) = delete;PageCache& operator=(const PageCache& p) = delete;public:std::mutex _pageMtx;//pc整体的锁//饿汉单例static PageCache* getInstance(){return &_sInst;}//pc从_spanList中拿出一个k页的spanSpan* newSpan(size_t k);//通过页地址找到spanSpan* MapObjectToSpan(void* obj);//管理cc还回来的spanvoid ReleaseSpanToPageCache(Span *span);
};
PageCache切分内存与系统调用
PageCache将自己的大内存块Span给CentralCache,那PageCache中是如何调度呢。
- 当PageCache中有CentralCache所需页数的Span时,PageCache直接返回给CentralCache。
- 当PageCache中没有CentralCache所需页数的Span时,但有超过它需求的页数时,先将超出这部分切开,再还给CentralCache。
- 当PageCache上述两种情况都不满足时,它向操作系统申请128页的大内存块,然后再重复情况2。
Span* PageCache::newSpan(size_t k){assert(k > 0);//大于128页的空间直接向堆申请if (k > PAGE_NUM - 1){void* ptr = SystemAlloc(k);//Span* span = new Span();Span* span = _spanPool.New();span->_pageID = (PageID)ptr >> PAGE_SHIFT;span->_n = k;_idSpanMap[span->_pageID] = span;return span;}//1、k号桶中有spanif (!_spanList[k].Empty()){//直接返回桶中的第一个spanSpan* span = _spanList[k].PopFront();//记录分配出去的span管理的页号和其地址的映射关系for (PageID i = 0; i < span->_n; i++){//n页的空间全部映射都是span地址_idSpanMap[span->_pageID + i] = span;}return span;}//2、k号桶中没有span,但后面的桶中有spanfor (int i = k + 1; i < PAGE_NUM; i++){//i号桶中有span,对该span进行切分if (!_spanList[i].Empty()){Span* nspan = _spanList[i].PopFront();//将span切分为一个k页和一个n-k页的span//Span* kspan = new Span;Span* kspan = _spanPool.New();kspan->_pageID = nspan->_pageID;kspan->_n = k;nspan->_pageID += k;nspan->_n -= k;//n-k页放到对应的哈希桶中_spanList[nspan->_n].PushFront(nspan);//把n-k页的span边缘页映射一下,方便后续合并_idSpanMap[nspan->_pageID] = nspan;_idSpanMap[nspan->_pageID + nspan->_n] = nspan;//记录分配出去的span管理的页号和其地址的映射关系for (PageID i = 0; i < kspan->_n; i++){//n页的空间全部映射都是span地址_idSpanMap[kspan->_pageID + i] = kspan;}return kspan;}}//3、k号桶和后面的桶中都没有span//直接向系统申请128页的spanvoid* ptr = SystemAlloc(PAGE_NUM - 1);//Span* bigSpan = new Span;Span* bigSpan = _spanPool.New();bigSpan->_pageID = ((PageID)ptr) >> PAGE_SHIFT;bigSpan->_n = PAGE_NUM - 1;//将这个span放到对应哈希桶中_spanList[PAGE_NUM - 1].PushFront(bigSpan);//递归再次申请k页的spanreturn newSpan(k);
}
当所有Cache都没有空间的时候的整个流程:
ThreadCache归还实现(线程将内存还给ThreadCache)
ThreadCache是哈希桶的结构,与内存字节数映射,所以当线程归还内存时,只需将内存块挂到(头插)对应的哈希桶上即可。
//回收线程中大小为size的obj空间 第二个参数size是一个对齐后的size,不用担心没有对齐的问题
void ThreadCache::Deallocate(void* obj, size_t size){assert(obj);assert(size <= MAX_BYTES);//找到size对应的自由链表size_t index = SizeClass::Index(size);//用对应的自由链表回收空间_freeLists[index].Push(obj);//当前桶的块数大于等于单批次申请块数的时候归还空间if (_freeLists[index].Size() >= _freeLists[index].MaxSize()){ListTooLong(_freeLists[index], size);}
}
我们的项目能有效解决内存碎片化的问题,即线程归还给ThreadCache的内存块过多时,就要还给CentralCache。
//tc归还空间时需要将MaxSize个块归还给cc
void ThreadCache::ListTooLong(FreeList& list, size_t size){void* start = nullptr;void* end = nullptr;list.PopRange(start, end, list.MaxSize());CentralCache::getInstance()->ReleaseListToSpans(start, size);
}
CentralCache归还实现(利用Span回收管理小内存块)
我们有每块内存的地址,但如何靠地址找到它属于哪块Span呢?靠Span的页号,建立页号与地址的对应关系,这里页号与地址是一对多的关系。
在PageCache里增加关联式容器unordered_map,且为了方便页与页之间的合并,故unordered_map存放<PAGE_ID,Span*>的关系。
//将tc归还的多块空间放进span中
void CentralCache::ReleaseListToSpans(void* start, size_t size){//先通过size找到对应的桶size_t index = SizeClass::Index(size);//加锁_SpanList[index]._mutex.lock();//遍历start,将各个块放到对应页的span所管理的_freeList中while (start){//记录下一个void* next = objNext(start);//找到对应spanSpan* span = PageCache::getInstance()->MapObjectToSpan(start);//将当前块插入到对应span中objNext(start) = span->_freeList;span->_freeList = start;span->useCount--;//span管理的所有都还回来了if (span->useCount == 0){//将这个span从cc中删掉_SpanList[index].Erase(span);span->_freeList = nullptr;span->next = nullptr;span->prev = nullptr;//归还span//解开当前桶锁_SpanList[index]._mutex.unlock();//加锁PageCache::getInstance()->_pageMtx.lock();//将这个span还给pcPageCache::getInstance()->ReleaseSpanToPageCache(span);//解锁PageCache::getInstance()->_pageMtx.unlock();//归还完加上桶锁_SpanList[index]._mutex.unlock();}//后移start = next;}//解锁_SpanList[index]._mutex.unlock();
}
PageCache归还实现(合并更大的Span)
当Span中切出的小内存块都还回来时,将该Span还给PageCache,利用该Span去与其它空闲的Span合并,合并出更大的页的Span,解决内存外部碎片问题。
Span* PageCache::MapObjectToSpan(void* obj){//通过块地址找到页号PageID id = ((PageID)obj >> PAGE_SHIFT);std::unique_lock<std::mutex> lc(_pageMtx);//通过页号找到对应的spanauto ret = _idSpanMap.find(id);if (ret != _idSpanMap.end()){return ret->second;}else{assert(false);return nullptr;}
}void PageCache::ReleaseSpanToPageCache(Span *span){//通过span判断释放的空间页数是否大于128页,如果大于就直接还给osif (span->_n > PAGE_NUM - 1){void* ptr = (void*)(span->_pageID << PAGE_SHIFT);SystemFree(ptr);//delete span;_spanPool.Delete(span);return;}//小于等于128页的span//向左不断合并while (true){PageID leftID = span->_pageID - 1;//拿到左边相邻页auto ret = _idSpanMap.find(leftID);//通过相邻页映射出对应span//没有相邻span,停止合并if (ret == _idSpanMap.end()){break;}Span* leftSpan = ret->second;//相邻span//相邻span在cc中,停止合并if (leftSpan->_isUse == true){break;}//相邻span和当前span合并超过128页,停止合并if (leftSpan->_n + span->_n > PAGE_NUM - 1){break;}//当前span与相邻span进行合并span->_pageID = leftSpan->_pageID;span->_n += leftSpan->_n;//将相邻的span对象从桶中删掉_spanList[leftSpan->_n].Erase(leftSpan);//删除掉相邻span对象//delete leftSpan;_spanPool.Delete(leftSpan);}//向右不断合并while (true){PageID rightID = span->_pageID + span->_n;//拿到右边相邻页auto ret = _idSpanMap.find(rightID);//通过相邻页映射出对应span//没有相邻span,停止合并if (ret == _idSpanMap.end()){break;}Span* rightSpan = ret->second;//相邻span//相邻span在cc中,停止合并if (rightSpan->_isUse == true){break;}//相邻span和当前span合并超过128页,停止合并if (rightSpan->_n + span->_n > PAGE_NUM - 1){break;}//当前span与相邻span进行合并span->_n += rightSpan->_n;//将相邻的span对象从桶中删掉_spanList[rightSpan->_n].Erase(rightSpan);//删除掉相邻span对象//delete rightSpan;_spanPool.Delete(rightSpan);}//合并完毕,将当前span挂在对应桶中_spanList[span->_n].PushFront(span);//从cc返回pc,isUse改成falsespan->_isUse = false;//映射当前span的边缘页,后续还可以对这个span合并_idSpanMap[span->_pageID] = span;_idSpanMap[span->_pageID + span->_n - 1] = span;
}
超过128页内存的申请与释放
四、性能测试
该项目研究的是多线程环境下与malloc效率的对比,所以我们设计多线程的测试环境。设计代码如下:
// ntimes 一轮申请和释放内存的次数
// rounds 轮次
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vector<std::thread> vthread(nworks);std::atomic<size_t> malloc_costtime = 0;std::atomic<size_t> free_costtime = 0;for (size_t k = 0; k < nworks; ++k){vthread[k] = std::thread([&, k]() {std::vector<void*> v;v.reserve(ntimes);for (size_t j = 0; j < rounds; ++j){size_t begin1 = clock();for (size_t i = 0; i < ntimes; i++){v.push_back(malloc(16));//v.push_back(malloc((16 + i) % 8192 + 1));}size_t end1 = clock();size_t begin2 = clock();for (size_t i = 0; i < ntimes; i++){free(v[i]);}size_t end2 = clock();v.clear();malloc_costtime += (end1 - begin1);free_costtime += (end2 - begin2);}});}for (auto& t : vthread){t.join();}printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, malloc_costtime.load());printf("%u个线程并发执行%u轮次,每轮次free %u次: 花费:%u ms\n",nworks, rounds, ntimes, free_costtime.load());printf("%u个线程并发malloc&free %u次,总计花费:%u ms\n",nworks, nworks * rounds * ntimes, malloc_costtime.load() + free_costtime.load());
}// 单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vector<std::thread> vthread(nworks);std::atomic<size_t> malloc_costtime = 0;std::atomic<size_t> free_costtime = 0;for (size_t k = 0; k < nworks; ++k) {vthread[k] = std::thread([&, k]() {std::vector<void*> v;v.reserve(ntimes);for (size_t r = 0; r < rounds; ++r) {size_t begin1 = clock();for (size_t i = 0; i < ntimes; ++i) {v.push_back(ConcurrentAlloc(16));}size_t end1 = clock();size_t begin2 = clock();for (size_t i = 0; i < ntimes; ++i) {ConcurrentFree(v[i]);}size_t end2 = clock();v.clear();malloc_costtime += (end1 - begin1);free_costtime += (end2 - begin2);}});}for (auto& t : vthread){t.join();}printf("%u个线程并发执行%u轮次,每轮次concurrent alloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, malloc_costtime.load());printf("%u个线程并发执行%u轮次,每轮次concurrent dealloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, free_costtime.load());printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%u ms\n",nworks, nworks * rounds * ntimes, malloc_costtime.load() + free_costtime.load());
}int main()
{size_t n = 1000;cout << "==========================================================" << endl;BenchmarkConcurrentMalloc(n, 4, 10);cout << endl << endl;BenchmarkMalloc(n, 4, 10);cout << "==========================================================" << endl;return 0;
}
一方面是数据量大了之后unordered_map查找消耗比较大,还有一个就是锁竞争的消耗比较大,而如果说查的越慢那么锁竞争就会越激烈,所以说上面的实现中,随着数据量的增大,会导致性能越来越不行。
采用基数树进行优化。
基数树有如下优点:
- 逻辑和物理上是直接映射的顺序结构,无需考虑结构的变化。
- 不用考虑结构的变化就可以无需考虑对整个结构进行读写时加锁的问题(不论多少线程读写,因为结构不变,所以不加锁)。
- 基数树最多有三层,避免一次向堆区申请空间过大的问题。
基数树一层结构如下:
基数树二层结构如下:
最终引入基数树,替换我们代码中用到map的地方,基数树代码如下:
template <int BITS>
class TCMalloc_PageMap1 {
private:static const int LENGTH = 1 << BITS;void** array_;public:typedef uintptr_t Number;//explicit TCMalloc_PageMap1(void* (*allocator)(size_t)) {explicit TCMalloc_PageMap1() {//array_ = reinterpret_cast<void**>((*allocator)(sizeof(void*) << BITS));size_t size = sizeof(void*) << BITS;size_t alignSize = SizeClass::_RoundUp(size, 1<<PAGE_SHIFT);array_ = (void**)SystemAlloc(alignSize>>PAGE_SHIFT);memset(array_, 0, sizeof(void*) << BITS);}// Return the current value for KEY. Returns NULL if not yet set,// or if k is out of range.void* get(Number k) const {if ((k >> BITS) > 0) {return NULL;}return array_[k];}// REQUIRES "k" is in range "[0,2^BITS-1]".// REQUIRES "k" has been ensured before.//// Sets the value 'v' for key 'k'.void set(Number k, void* v) {array_[k] = v;}
};// Two-level radix tree
template <int BITS>
class TCMalloc_PageMap2 {
private:// Put 32 entries in the root and (2^BITS)/32 entries in each leaf.static const int ROOT_BITS = 5;static const int ROOT_LENGTH = 1 << ROOT_BITS;static const int LEAF_BITS = BITS - ROOT_BITS;static const int LEAF_LENGTH = 1 << LEAF_BITS;// Leaf nodestruct Leaf {void* values[LEAF_LENGTH];};Leaf* root_[ROOT_LENGTH]; // Pointers to 32 child nodesvoid* (*allocator_)(size_t); // Memory allocatorpublic:typedef uintptr_t Number;//explicit TCMalloc_PageMap2(void* (*allocator)(size_t)) {explicit TCMalloc_PageMap2() {//allocator_ = allocator;memset(root_, 0, sizeof(root_));PreallocateMoreMemory();}void* get(Number k) const {const Number i1 = k >> LEAF_BITS;const Number i2 = k & (LEAF_LENGTH - 1);if ((k >> BITS) > 0 || root_[i1] == NULL) {return NULL;}return root_[i1]->values[i2];}void set(Number k, void* v) {const Number i1 = k >> LEAF_BITS;const Number i2 = k & (LEAF_LENGTH - 1);ASSERT(i1 < ROOT_LENGTH);root_[i1]->values[i2] = v;}bool Ensure(Number start, size_t n) {for (Number key = start; key <= start + n - 1;) {const Number i1 = key >> LEAF_BITS;// Check for overflowif (i1 >= ROOT_LENGTH)return false;// Make 2nd level node if necessaryif (root_[i1] == NULL) {//Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));//if (leaf == NULL) return false;static ObjectPool<Leaf> leafPool;Leaf* leaf = (Leaf*)leafPool.New();memset(leaf, 0, sizeof(*leaf));root_[i1] = leaf;}// Advance key past whatever is covered by this leaf nodekey = ((key >> LEAF_BITS) + 1) << LEAF_BITS;}return true;}void PreallocateMoreMemory() {// Allocate enough to keep track of all possible pagesEnsure(0, 1 << BITS);}
};// Three-level radix tree
template <int BITS>
class TCMalloc_PageMap3 {
private:// How many bits should we consume at each interior levelstatic const int INTERIOR_BITS = (BITS + 2) / 3; // Round-upstatic const int INTERIOR_LENGTH = 1 << INTERIOR_BITS;// How many bits should we consume at leaf levelstatic const int LEAF_BITS = BITS - 2 * INTERIOR_BITS;static const int LEAF_LENGTH = 1 << LEAF_BITS;// Interior nodestruct Node {Node* ptrs[INTERIOR_LENGTH];};// Leaf nodestruct Leaf {void* values[LEAF_LENGTH];};Node* root_; // Root of radix treevoid* (*allocator_)(size_t); // Memory allocatorNode* NewNode() {Node* result = reinterpret_cast<Node*>((*allocator_)(sizeof(Node)));if (result != NULL) {memset(result, 0, sizeof(*result));}return result;}public:typedef uintptr_t Number;explicit TCMalloc_PageMap3(void* (*allocator)(size_t)) {allocator_ = allocator;root_ = NewNode();}void* get(Number k) const {const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);const Number i3 = k & (LEAF_LENGTH - 1);if ((k >> BITS) > 0 ||root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL) {return NULL;}return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3];}void set(Number k, void* v) {ASSERT(k >> BITS == 0);const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);const Number i3 = k & (LEAF_LENGTH - 1);reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v;}bool Ensure(Number start, size_t n) {for (Number key = start; key <= start + n - 1;) {const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1);// Check for overflowif (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH)return false;// Make 2nd level node if necessaryif (root_->ptrs[i1] == NULL) {Node* n = NewNode();if (n == NULL) return false;root_->ptrs[i1] = n;}// Make leaf node if necessaryif (root_->ptrs[i1]->ptrs[i2] == NULL) {Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));if (leaf == NULL) return false;memset(leaf, 0, sizeof(*leaf));root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);}// Advance key past whatever is covered by this leaf nodekey = ((key >> LEAF_BITS) + 1) << LEAF_BITS;}return true;}void PreallocateMoreMemory() {}
};
五、项目总结
优点
- 提高动态申请的效率
- 减少陷入内核的次数
- 减少系统内存碎片
- 提升内存使用率
- 尽量减少锁竞争
- 应用于多核多线程场景
不足
1、当前实现的项目中并没有完全脱离malloc,在内存池自身数据结构的管理中,如SpanList中的span等结构,还是使用的new Span()这样的操作,new的底层使用的是malloc,所以还不足以替换malloc。
解决方案:使用定长内存池配合脱离使用new(static ObjectPool<ThreadCache>objPool;这是一个静态的对象,整个进程中独一份,而对于静态的对象整个进程中的线程是共享的,那么如果多线程对其中进行操作就可能会出问题,所以需要加锁。)
2、平台及兼容性
linux等系统下面,需要将VirtualAlloc替换为brk等。x64系统下面,当前的实现支持不足,比如:id查找Span当前使用的是unordered_map<id, Span*>。在64位系统下面,这个数据结构在性能和内存等方面都是撑不住,需要将unordered_map改进为基数树。
面试常见问题
1、如何去替代malloc?替代malloc的原理是什么?
不同平台替换方式不一样,linux下使用weak alias的方式进行替换,window下使用hook的钩子技术进行替换,不用更改原来的代码,只需要使用钩子将源码中使用malloc的地方勾过来让其执行该内存池代码,所有的malloc的调用都跳转到了Tcmalloc的实现,hook技术也通常用来写外挂,用来进行系统层面函数更改。
2、能不能把ThreadCahe和CentralCache合并掉,减掉CentralCache层?
不能,CentralCache核心作用是承上启下的作用,假设把CentralCache直接去掉,就意味着ThreadCahe和PageCache直接进行对接,会产生一个问题,PageCache中的span有些是切好的,有些是没有切好的,而且不是一下子就给ThreadCahe,有可能给一部分,可能会留下一部分,这是第一个问题。第二个问题是归还回来是还一部分的,切过的和没切过的混在一起会有问题,比如申请一块大块内存,在PageCache中找,但是却不知道找到的到底是切好的还是没切好的,虽然UseCount也能进行判断,但是切的多了混在一起找的时候查找效率没有那么高,再其次均衡调度作用就不明显了,ThreadCahe中8字节专门给CentralCache的8字节用的,但是如果在PageCache中就比较混乱,因为PageCache是按照页进行映射的,更大的问题在于CentralCache加的是桶锁,PageCache虽然也是一个一个映射的桶,但是它涉及到一个span的合并和切分,span会在各个桶之间流动,就不能使用桶锁,必须使用一把整体锁进行锁住。
CentralCache小结:
1.CentralCache均衡多个线程之间对同一个大小的内存空间的需求
2.它的span都是至少有部分在用的,区分于PageCache都是大块完整的页。
3.它实现的是桶锁,因为一个span只会给一个桶用,不会再桶之间流动,效率更高,而PageCache是一把整体锁,因为PageCache中的span需要切割和合并,会在多个映射桶之间流动。
3.一定是以8k对齐按照项目中的那种分段映射对齐吗?
不一定,这个根据设计者需求,可能换个参数就全变了,依据控制10%左右的浪费设计了映射规则。
4.ThreadCahe销毁了,如果它还有内存没还给CentralCache怎么办?假设这个线程有内存泄露或者它还没有达到listToLong()函数的条件,有可能有一些内存还没有还回来或者挂在ThreadCahe中,但是这个线程被销毁了,那么这个内存就没有回到这个CentralCache,CentralCache也不会回到PageCache,会耽搁小页合大,还会导致一些内存的泄露,有没有什么办法解决呢?
解决方法:给这个项目注册一个回调函数,只要线程结束,回调函数的作用是把ThreadCahe里面给clear掉,把每个桶数据都往下一个还,在此创建tls时就进行处理,在new线程空间时同时注册一个回调函数,一旦崩溃就清理掉这个回调函数。
这篇关于tiny-Tcmalloc(高并发内存池)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!