本文主要是介绍【lesson5】高并发内存池Central Cache层申请内存的实现,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
文章目录
- Central Cache层的结构
- 申请内存的流程
- 释放内存的流程
- Span对象的结构
- SpanList的实现
- SpanList需要的成员变量
- SpanList需要的成员函数
- SpanList()的实现
- Insert()的实现
- Erase()
- SpanList的完整实现代码
- Central Cache对象的结构
- Central Cache所需要的成员变量
- Central Cache所需要的成员函数
- GetInstance()的实现
- FetchRangeObj()的实现
- FetchFromCentralCache()的实现
- FetchRangeObj()的实现
- 从span->_freeList取出对象的过程的详细过程
- GetOneSpan()
- Central Cache层的结构的完整代码
- Common.h
- ThreadCache.h
- ThreadCache.cpp
- ConcurrentAlloc.h
- CentralCache.h
- CentralCache.cpp
Central Cache层的结构
central cache也是一个哈希桶结构,他的哈希桶的映射关系跟thread cache是一样的。不同的是他的每个哈希桶位置挂是SpanList链表结构
,不过每个映射桶下面的span中的大内存块被按映射关系切成了一个个小内存块对象挂在span的自由链表中。
申请内存的流程
- 当thread cache中没有内存时,就会批量向central cache申请一些内存对象,这里的批量获取对象的数量使用了类似网络tcp协议拥塞控制的慢开始算法;central cache也有一个哈希映射的spanlist,spanlist中挂着span,从span中取出对象给thread cache,这个过程是需要加锁的,不过这里使用的是一个桶锁,尽可能提高效率。
- central cache映射的spanlist中所有span的都没有内存以后,则需要向page cache申请一个新的span对象,拿到span以后将span管理的内存按大小切好作为自由链表链接到一起。然后从span中取对象给thread cache。
- central cache的中挂的span中use_count记录分配了多少个对象出去,分配一个对象给threadcache,就++use_count
释放内存的流程
当thread_cache过长或者线程销毁,则会将内存释放回central cache中的,释放回来时–use_count。当use_count减到0时则表示所有对象都回到了span,则将span释放回page cache,page cache中会对前后相邻的空闲页进行合并。
在这里先不实现释放内存的流程,等我们先把申请内存流程走通,然后再去实现释放内存。
Span对象的结构
在上面我们了解了Central Cache的结构,知道Central Cache层主要管理的是一个个已经被切分好的span对象。
所以我们先要定义span对象
// 管理多个连续页大块内存跨度结构
struct Span
{PAGE_ID _pageId = 0; // 大块内存起始页的页号size_t _n = 0; // 页的数量Span* _next = nullptr; // 双向链表的结构Span* _prev = nullptr;size_t _useCount = 0; // 切好小块内存,被分配给thread cache的计数void* _freeList = nullptr; // 切好的小块内存的自由链表
};
_pageId:就是一块内存的首地址 >> 13位,也就是8k(81024个字节)
举例:
_n
:该span有多少也的大小,一页的大小我们定义为8K(81024字节)
_next
:后一个span的地址
_prev
前一个span的地址
_useCount
:切好小块内存,被分配给thread cache的要计数
_freeList
:管理切好的小块内存的自由链表
SpanList的实现
从上面Central Cache的结构我们可知Central Cache会存在大量的span对象所以对这些span对象我们也要管理起来,那么如何管理?先描述,再组织
SpanList需要的成员变量
// 带头双向循环链表
class SpanList
{
private:Span* _head;
public:std::mutex _mtx; // 桶锁
_head
:因为是用带头双向链表管理一个个span对象的,所以需要头节点指针。
std::mutex _mtx
:因为central cache会被多个线程访问所以需要锁。但是又因为central cache有多个桶,不同的线程,同时访问不同的桶是不需要加锁的。只有不同的线程同时访问同一个桶才需要加锁,所以这里的锁是桶锁。
画图说明:
SpanList需要的成员函数
class SpanList
{
public:SpanList(){}void Insert(Span* pos, Span* newSpan){}void Erase(Span* pos){}private:Span* _head;
public:std::mutex _mtx; // 桶锁
};
SpanList()
:对spanList进行初始化
Insert()
:支持指定位置插入span节点
Erase()
:支持删除指定span节点
SpanList()的实现
SpanList(){_head = new Span;//初始化头节点//让头节点收尾自己相连_head->_next = _head;_head->_prev = _head;}
图展示自己对字节收尾相连
Insert()的实现
Insert过程图:
void Insert(Span* pos, Span* newSpan){assert(pos);assert(newSpan);//记录pos的前一个节点Span* prev = pos->_prev;//和prev建立连接prev->_next = newSpan;newSpan->_prev = prev;//和pos建立连接newSpan->_next = pos;pos->_prev = newSpan;}
Erase()
Erase过程图:
void Erase(Span* pos){//头节点和空节点不能删assert(pos);assert(pos != _head);//保存pos前后节点的地址Span* prev = pos->_prev;Span* next = pos->_next;//连接prev和nextprev->_next = next;next->_prev = prev;}
SpanList的完整实现代码
// 带头双向循环链表
class SpanList
{
public:SpanList(){_head = new Span;_head->_next = _head;_head->_prev = _head;}void Insert(Span* pos, Span* newSpan){assert(pos);assert(newSpan);Span* prev = pos->_prev;// prev newspan posprev->_next = newSpan;newSpan->_prev = prev;newSpan->_next = pos;pos->_prev = newSpan;}void Erase(Span* pos){assert(pos);assert(pos != _head);Span* prev = pos->_prev;Span* next = pos->_next;prev->_next = next;next->_prev = prev;}private:Span* _head;
public:std::mutex _mtx; // 桶锁
};
Central Cache对象的结构
首先我们要保证Central Cache对象全局就只有一个所以我们把Central Cache对象设计成单例模式中的懒汉模式
。
Central Cache所需要的成员变量
#pragma once#include "Common.h"// 单例模式
class CentralCache
{
private:SpanList _spanLists[NFREELIST];CentralCache()//防止构造新对象{}CentralCache(const CentralCache&) = delete;//防止外面拷贝构造新对象static CentralCache _sInst;//设置成static保证所有线程可见
};
SpanList _spanLists[NFREELIST]
:哈希桶结构,每个桶存储对应的span
Central Cache所需要的成员函数
// 单例模式
class CentralCache
{
public://获取全局唯一的单例对象static CentralCache* GetInstance(){}// 获取一个非空的spanSpan* GetOneSpan(SpanList& list, size_t byte_size);// 从中心缓存获取一定数量的对象给thread cachesize_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);private:SpanList _spanLists[NFREELIST];private:CentralCache(){}CentralCache(const CentralCache&) = delete;static CentralCache _sInst;
};
GetInstance()
:因为全局唯一一个对象是私有的,所以我们要提供一个函数给外界,让外界可以访问到我们的私有对象。
FetchRangeObj()
:从中心缓存获取一定数量的对象给thread cache
GetOneSpan()
:获取一个非空的span
GetInstance()的实现
static CentralCache* GetInstance(){//直接引用给外界全局唯一的对象return &_sInst;}
//这里用static的原因和之前说的一样,防止多个.cpp文件包含central cache.h文件导致出错
//static表示只在central cache.h该文件有效
FetchRangeObj()的实现
在实现FetchRangeObj()
之前我们要先实现Thread Cache层的FetchFromCentralCache()
之前因为这个跟Central Cache层的逻辑相关所以就没有实现。现在我们已经知道了Central Cache层的结构就可以实现了。
FetchFromCentralCache()的实现
首先要实现FetchFromCentralCache()我们的知道,我们需要向Central Cache要多少个对象?
要一个?不太好,这样我们会频繁的向CentralCache要对象
要上千个?也不太好,可能用不到这么多浪费内存
所以我们就需要设计一个函数来帮助我们计算每次需要向CentralCache取多少个对象。
// 一次thread cache从中心缓存获取多少个static size_t NumMoveSize(size_t size){assert(size > 0);// [2, 512],一次批量移动多少个对象的(慢启动)上限值// 小对象一次批量上限高// 小对象一次批量上限低int num = MAX_BYTES / size;if (num < 2)num = 2;if (num > 512)num = 512;return num;}
这么函数放在之前Common.h的SizeClass类中
但是小对象一开始就去512也不科学,所以我们就设置一个变量来控制我们每次获取的数理,该变量逐次增长,知道大于512那么我们就取512,不再增加。
这个对象放在FreeList中。(这个慢慢增长的过程,也叫慢开始跟网络中的拥塞控制类型)
class FreeList
{
public:size_t& MaxSize(){return _maxSize;}
private:void* _freeList = nullptr;size_t _maxSize = 1;
};
FetchFromCentralCache()的正式实现
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{// 慢开始反馈调节算法// 1、最开始不会一次向central cache一次批量要太多,因为要太多了可能用不完// 2、如果你不断有size大小的内存需求,那么batchNum就会不断增长,直到上限// 3、size越大,一次向central cache要的batchNum就越小// 4、size越小,一次向central cache要的batchNum就越大//_freeLists[index].MaxSize()记录下一次应该获取多少个对象size_t batchNum = std::min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));if (_freeLists[index].MaxSize() == batchNum){//慢增长控制,类似网络拥塞控制,指数方式增长_freeLists[index].MaxSize() *= 2;}void* start = nullptr;//输出型参数void* end = nullptr;//输出型参数//actualNum记录从CentralCache中实际获取到的对象个数size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);//获取到的对象个数必须大于1assert(actualNum > 1);//如果实际对象个数只有1个那么直接给外界if (actualNum == 1){assert(start == end);return start;}else//如果实际对象个数>1 那么1个给外界剩余的链接到自由链表上{//但是这是一段范围,所以自由链表必须提供插入一段范围_freeLists[index].PushRange(NextObj(start), end);return start;}
}
PushRange过程图
PushRange的实现
void PushRange(void* start, void* end){NextObj(end) = _freeList;_freeList = start;}
FetchRangeObj()的实现
我们实现了ThreadCache层的FetchFromCentralCache()知道当ThreadCache对象不够时需要调用CentralCache层的FetchRangeObj()。所以接下来我们就要实现FetchRangeObj()。
// 从中心缓存获取一定数量的对象给thread cache
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{//首先计算映射到桶的位置size_t index = SizeClass::Index(size);//因为存在多个线程访问,所以有线程安全的问题需要加锁_spanLists[index]._mtx.lock();//直接调用GetOneSpan函数获取一个非空的spanSpan* span = GetOneSpan(_spanLists[index], size);//这个span一定不能为空指针并且必须有对象assert(span);assert(span->_freeList);// 从span中获取batchNum个对象// 如果不够batchNum个,有多少拿多少start = span->_freeList;end = start;size_t i = 0;//记录实际的对象个数size_t actualNum = 1;//从span->_freeList取对象的过程(下面会详细讲解)while ( i < batchNum - 1 && NextObj(end) != nullptr){end = NextObj(end);++i;++actualNum;}span->_freeList = NextObj(end);NextObj(end) = nullptr;//解锁_spanLists[index]._mtx.unlock();return actualNum;
}
从span->_freeList取出对象的过程的详细过程
取对象的过程的代码
//取对象的过程
start = span->_freeList;
end = start;
size_t i = 0;
//记录实际的对象个数
size_t actualNum = 1;
while ( i < batchNum - 1 && NextObj(end) != nullptr)
{end = NextObj(end);++i;++actualNum;
}
span->_freeList = NextObj(end);
NextObj(end) = nullptr;
取对象的过程画图展示
刚才是start和end都指向头节点
end开始向后走
只要走一步就可以了。然后断开链接,把2个对象取走,最后_freeList指向剩余的对象。
GetOneSpan()
因为GetOneSpan()设计pagecache层所以这里暂时不实现等将page cache的时候再实现。
Central Cache层的结构的完整代码
Common.h
#pragma once#include <iostream>
#include <vector>
#include <algorithm>#include <time.h>
#include <assert.h>#include <thread>
#include <mutex>using std::cout;
using std::endl;static const size_t MAX_BYTES = 256 * 1024;
static const size_t NFREELIST = 208;#ifdef _WIN64typedef unsigned long long PAGE_ID;
#elif _WIN32typedef size_t PAGE_ID;
#else// linux
#endifstatic void*& NextObj(void* obj)
{return *(void**)obj;
}// 管理切分好的小对象的自由链表
class FreeList
{
public:void Push(void* obj){assert(obj);// 头插//*(void**)obj = _freeList;NextObj(obj) = _freeList;_freeList = obj;}void PushRange(void* start, void* end){NextObj(end) = _freeList;_freeList = start;}void* Pop(){assert(_freeList);// 头删void* obj = _freeList;_freeList = NextObj(obj);return obj;}bool Empty(){return _freeList == nullptr;}size_t& MaxSize(){return _maxSize;}private:void* _freeList = nullptr;size_t _maxSize = 1;
};// 计算对象大小的对齐映射规则
class SizeClass
{
public:// 整体控制在最多10%左右的内碎片浪费// [1,128] 8byte对齐 freelist[0,16)// [128+1,1024] 16byte对齐 freelist[16,72)// [1024+1,8*1024] 128byte对齐 freelist[72,128)// [8*1024+1,64*1024] 1024byte对齐 freelist[128,184)// [64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208)static inline size_t _RoundUp(size_t bytes, size_t alignNum){return ((bytes + alignNum - 1) & ~(alignNum - 1));}static inline size_t RoundUp(size_t size){if (size <= 128){return _RoundUp(size, 8);}else if (size <= 1024){return _RoundUp(size, 16);}else if (size <= 8*1024){return _RoundUp(size, 128);}else if (size <= 64*1024){return _RoundUp(size, 1024);}else if (size <= 256 * 1024){return _RoundUp(size, 8*1024);}else{assert(false);return -1;}}static inline size_t _Index(size_t bytes, size_t align_shift){return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;}// 计算映射的哪一个自由链表桶static inline size_t Index(size_t bytes){assert(bytes <= MAX_BYTES);// 每个区间有多少个链static int group_array[4] = { 16, 56, 56, 56 };if (bytes <= 128){return _Index(bytes, 3);}else if (bytes <= 1024){return _Index(bytes - 128, 4) + group_array[0];}else if (bytes <= 8 * 1024){return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];}else if (bytes <= 64 * 1024){return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];}else if (bytes <= 256 * 1024){return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];}else{assert(false);}return -1;}// 一次thread cache从中心缓存获取多少个static size_t NumMoveSize(size_t size){assert(size > 0);// [2, 512],一次批量移动多少个对象的(慢启动)上限值// 小对象一次批量上限高// 小对象一次批量上限低int num = MAX_BYTES / size;if (num < 2)num = 2;if (num > 512)num = 512;return num;}
};// 管理多个连续页大块内存跨度结构
struct Span
{PAGE_ID _pageId = 0; // 大块内存起始页的页号size_t _n = 0; // 页的数量Span* _next = nullptr; // 双向链表的结构Span* _prev = nullptr;size_t _useCount = 0; // 切好小块内存,被分配给thread cache的计数void* _freeList = nullptr; // 切好的小块内存的自由链表
};// 带头双向循环链表
class SpanList
{
public:SpanList(){_head = new Span;_head->_next = _head;_head->_prev = _head;}void Insert(Span* pos, Span* newSpan){assert(pos);assert(newSpan);Span* prev = pos->_prev;// prev newspan posprev->_next = newSpan;newSpan->_prev = prev;newSpan->_next = pos;pos->_prev = newSpan;}void Erase(Span* pos){assert(pos);assert(pos != _head);Span* prev = pos->_prev;Span* next = pos->_next;prev->_next = next;next->_prev = prev;}private:Span* _head;
public:std::mutex _mtx; // 桶锁
};
ThreadCache.h
#pragma once#include "Common.h"class ThreadCache
{
public:// 申请和释放内存对象void* Allocate(size_t size);void Deallocate(void* ptr, size_t size);// 从中心缓存获取对象void* FetchFromCentralCache(size_t index, size_t size);
private:FreeList _freeLists[NFREELIST];
};// TLS thread local storage
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
ThreadCache.cpp
#include "ThreadCache.h"
#include "CentralCache.h"void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{// 慢开始反馈调节算法// 1、最开始不会一次向central cache一次批量要太多,因为要太多了可能用不完// 2、如果你不要这个size大小内存需求,那么batchNum就会不断增长,直到上限// 3、size越大,一次向central cache要的batchNum就越小// 4、size越小,一次向central cache要的batchNum就越大size_t batchNum = std::min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));if (_freeLists[index].MaxSize() == batchNum){_freeLists[index].MaxSize() += 1;}void* start = nullptr;void* end = nullptr;size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);assert(actualNum > 1);if (actualNum == 1){assert(start == end);return start;}else{_freeLists[index].PushRange(NextObj(start), end);return start;}
}void* ThreadCache::Allocate(size_t size)
{assert(size <= MAX_BYTES);size_t alignSize = SizeClass::RoundUp(size);size_t index = SizeClass::Index(size);if (!_freeLists[index].Empty()){return _freeLists[index].Pop();}else{return FetchFromCentralCache(index, alignSize);}
}void ThreadCache::Deallocate(void* ptr, size_t size)
{assert(ptr);assert(size <= MAX_BYTES);// 找对映射的自由链表桶,对象插入进入size_t index = SizeClass::Index(size);_freeLists[index].Push(ptr);// ...
}
ConcurrentAlloc.h
#pragma once#include "Common.h"
#include "ThreadCache.h"static void* ConcurrentAlloc(size_t size)
{// 通过TLS 每个线程无锁的获取自己的专属的ThreadCache对象if (pTLSThreadCache == nullptr){pTLSThreadCache = new ThreadCache;}cout << std::this_thread::get_id() << ":"<<pTLSThreadCache<<endl;return pTLSThreadCache->Allocate(size);
}static void ConcurrentFree(void* ptr, size_t size)
{assert(pTLSThreadCache);pTLSThreadCache->Deallocate(ptr, size);
}
CentralCache.h
#pragma once#include "Common.h"// 单例模式
class CentralCache
{
public:static CentralCache* GetInstance(){return &_sInst;}// 获取一个非空的spanSpan* GetOneSpan(SpanList& list, size_t byte_size);// 从中心缓存获取一定数量的对象给thread cachesize_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);private:SpanList _spanLists[NFREELIST];private:CentralCache(){}CentralCache(const CentralCache&) = delete;static CentralCache _sInst;
};
CentralCache.cpp
#include "CentralCache.h"CentralCache CentralCache::_sInst;// 获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{// ...return nullptr;
}// 从中心缓存获取一定数量的对象给thread cache
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)
{size_t index = SizeClass::Index(size);_spanLists[index]._mtx.lock();Span* span = GetOneSpan(_spanLists[index], size);assert(span);assert(span->_freeList);// 从span中获取batchNum个对象// 如果不够batchNum个,有多少拿多少start = span->_freeList;end = start;size_t i = 0;size_t actualNum = 1;while ( i < batchNum - 1 && NextObj(end) != nullptr){end = NextObj(end);++i;++actualNum;}span->_freeList = NextObj(end);NextObj(end) = nullptr;_spanLists[index]._mtx.unlock();return actualNum;
}
这篇关于【lesson5】高并发内存池Central Cache层申请内存的实现的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!