本文主要是介绍【项目日记】高并发内存池---细节优化及性能测试,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
高并发内存池---细节优化及性能测试
- 1 细节优化
- 1.1 大块内存的申请处理
- 1.2 配合定长池脱离使用new
- 1.3 释放对象无需内存大小
- 2 调试Debug
- 3 性能测试
- 4 项目总结
1 细节优化
在前面的文章中我们已经实现了高并发内存池的申请内存逻辑和释放内存逻辑:
- 申请逻辑:
- 首先在线程缓存中查看是否有内存块可以直接使用,没有就去中心缓存申请一批内存块。
- 中心缓存中的span中有未使用的合适内存块就进行返回,没有就去页缓存中申请合适页数的span。
- 页缓存中如果有合适的span就进行返回,没有就查找有没有页数更大的span 。如果有页数大的span有进行拆分,没有就去申请最大页数的span进行拆分!
- 释放逻辑:
- 线程缓存回收内存块,挂载到合适的自由链表中。如果挂载的内存块数量到达一定标准就进行回收。
- 回收的内存块还给中心缓存,中心缓存根据内存块的地址归还给对应的span进挂载。当span的引用计数为0时就进行回收
- 页缓存得到span之后,先来进行前后页的合并,将span尽可能的合并成更大的span。
接下来我们就来解决一下一些细节问题:
1.1 大块内存的申请处理
根据我们书写的代码,线程缓存中最大挂载的内存块大小是256KB
,当我们申请大于256KB的内存时,显然现场缓存是不能满足要求的,而由于中心缓存的映射关系和线程缓存是一致的,所以想要申请大于256KB的内存块,就需要去页缓存中直接申请。大于256KB的内存我们按照页的大小来进行内存对齐,所以大于256KB的内存我们就实际上是去获取对应页的span。我们获取到span之后再按照页号复原出地址进行返回就可以了!
static void* ConcurrentAlloc(size_t size)
{//如果size的空间大于的32页,就不能在线程缓存里进行取了//因为线程缓存中最大的内存块是256KB //超过32页就需要取页缓存中去进行取了if (size > MAX_BYTES){//计算对齐空间size_t alignsize = SizeClass::RoundUp(size);//计算页数size_t num = alignsize >> PAGE_SHIFT;//根据页数开辟spanPageCache::GetInstance()->GetMutex().lock();Span * span = PageCache::GetInstance()->NewSpan(num);PageCache::GetInstance()->GetMutex().unlock();span->_objsize = alignsize;//根据span中的_pageid计算地址void* ptr = (void*)(span->_pageid << PAGE_SHIFT);//进行返回即可return ptr;}//在该线程中进行内存的申请if (pThreadCache == nullptr){static ObjectPool<ThreadCache> tcpool;//pThreadCache = new ThreadCache;pThreadCache = tcpool.New();}//进行开辟空间void * obj = pThreadCache->Allocate(size);//cout << std::this_thread::get_id() << " : " << pThreadCache << endl;return obj;
}
回收大内存时,也进行特殊处理,跳过线程缓存直接回收到页缓存中就可以了!
static void ConcurrentFree(void* ptr)
{assert(ptr);//找到ptr对应的span即可找到字节数//在span创建完之后传回给中心缓存挂载起来时就已经确定了其内存块的大小!!!Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);size_t size = span->_objsize;//如果size大于256KB就要跳过线程缓存和中心缓存if (size > MAX_BYTES){//直接回收该空间到页缓存中PageCache::GetInstance()->GetMutex().lock();PageCache::GetInstance()->ReleaseSpanToPageCache(span);PageCache::GetInstance()->GetMutex().unlock();return;}//回收空间pThreadCache->Deallocate(ptr, size);
}
大于256KB的情况中还有一种特殊情况,当申请的空间页数大于128页时,页缓存也无法对其进行管理。因为页缓存中最大的span就是128页的,针对这种情况,我们的内存池就不能发挥作用了,所以我们直接使用系统调用来申请者篇大空间就可以了,回收时一样特殊处理。
Span* PageCache::NewSpan(size_t num)
{// 如果开辟的页数大于 128 页 内存池就不能发挥作用了// 需要直接去堆上申请if (num >= PAGENUM){//开辟对应的大空间void* ptr = SystemAlloc(num);//创建span//Span* newspan = new Span;//使用定长池 跳过new(malloc)Span* newspan = _spanpool.New();newspan->_pageid = (PAGE_ID)ptr >> PAGE_SHIFT;//页号newspan->_n = num;//页数//建立哈希映射_SpanMap[newspan->_pageid] = newspan;//进行返回return newspan;}// 1 - 128//...省略...//.........
}
void PageCache::ReleaseSpanToPageCache(Span* span)
{//如果页数大于128 不能进行合并 直接返回给系统if (span->_n > PAGENUM - 1){void* ptr = (void*)(span->_pageid << PAGE_SHIFT);//解除映射_SpanMap.erase(span->_pageid);_spanpool.Delete(span);SystemFree(ptr);return;}//1 -128//......//......
}
这就是对大内存的处理方法,这一步的优化其实就是将特殊情况给补全!
1.2 配合定长池脱离使用new
我们的项目是为了替代掉C++中原始的new(malloc),使用自己的一套体系来做到内存申请。但是我们在实现高并发内存池是使用很多的new来创建span,为了解决这个问题,我们引入我们最开始实现的定长池:定长池思路分析
定长池内部是:
- 一段连续的大空间:我们使用char*来进行管理,方便一个一个字节来进行处理。
- 一个自由链表:进行资源的回收使用,内部通过链表结构链接起来
- 剩余资源数:进行资源空间的管理,不足够是进行扩容!
通过对开辟出来的大空间进行拆分实现减少内存碎片!
我们可以在页缓存中加入一个管理span对象的定长池,每次newspan的时候就换成从定长池中进行取出。
class PageCache
{
public:Span* NewSpan(size_t num);std::mutex& GetMutex();void ReleaseSpanToPageCache(Span* span);Span* MapObjectToSpan(void* start);
private://...//对象池 方便进行取出spanObjectPool<Span> _spanpool;//...//...
}
然后将所有的new都更换为_spanpool.New()
就可以了!
1.3 释放对象无需内存大小
我们的项目最终是要替代new(malloc)的,我们在使用new的时候并不需要去加入开辟的空间,系统会自动识别开辟空间的大小。那么在我们的程序中我们需要如何优化才可以不需要在加入内存大小呢?
首先,我们申请的空间都为内存块,内存块在创建的时候是挂载在span上的,每个span都有自己独一无二的页号。所以其实页号就可以对应其内存块的大小。为了达到这样的效果我们可以建立一个哈希表,来记录页号对应的内存块大小。但更简单的做法是在span中记录一个内存块大小的成员变量,这样我们可以通过地址找到span,,也就找到了内存块大小!!!
static void ConcurrentFree(void* ptr)
{assert(ptr);//找到ptr对应的span即可找到字节数//在span创建完之后传回给中心缓存挂载起来时就已经确定了其内存块的大小!!!Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);size_t size = span->_objsize;//如果size大于256KB就要跳过线程缓存和中心缓存if (size > MAX_BYTES){//直接回收该空间到页缓存中PageCache::GetInstance()->GetMutex().lock();PageCache::GetInstance()->ReleaseSpanToPageCache(span);PageCache::GetInstance()->GetMutex().unlock();return;}//回收空间pThreadCache->Deallocate(ptr, size);
}
这样就不在需要再free时加上内存大小了,使用起来方便了很多!!!
2 调试Debug
在性能测试之前,首先我们要确保我们的代码没有了问题,由于编写代码时的不仔细,导致我进行了很长时间的Debug ,快给孩子整崩溃了o(╥﹏╥)oo(╥﹏╥)oo(╥﹏╥)o
在Debug过程中,VS2022提供了很多实用的功能:
- 并行监视:可以监视多个线程的执行情况
- 调用堆栈:在程序崩溃时可以返回到上层的调用函数
- 内存对比:解决内存块问题,无法通过监视来查看链式机构,通过内存窗口可以方便查看
程序崩溃时,通过条件断点逐层向上检查:
if()
{int x = 0 ;//打上断点
}
这样可以帮助我们更快的解决bug,但还是需要一定经验,不然会和我一样进行好长好长时间的调试,真的太难受了!
3 性能测试
现在我们来进行一下高并发内存池与new的性能对比,我们分别使用两种方式来进行开辟若干的内存块:
可以看到我们现在的内存池开辟释放空间的效率其实和原生new差不多,这可不好,因为谷歌开源的tcmalloc可以提升至原生new效率的4倍。我们现在到底是哪里的效率比较低呢???我们可以通过VS2022的性能测试工具来进行检测:
这里如果报出了无法生成的错误,说明我们需要进行一个简单配置,将我们的程序加上/profile
属性
之后我们就可以来看效率中影响最大的部分是哪里:
根据调用的函数可以推断出:
主要耗时是在锁的竞争中,在central中因为使用的是桶锁所以对性能的影响不大。但是在pagecache文件中的函数耗时都比较久,其实我们隐约已经知道问题出现在哪里了,首先unordered_map的底层是哈希桶结构,哈希桶内部不是线程安全的,其次所以每次对页缓存进行操作时都要将整个页缓存锁住,这其实就是影响性能的主要因素。
那么解决这个问题的首要问题就可不可以将锁去除,使用一直无需加锁的哈希表结构呢?tcmalloc中的解决方案是基数树,一种特殊的哈希表。其主要有三种:
- 单层结构:线性映射的大数组,通过页号直接映射。
- 双层结构:二维映射的哈希表,将一个页号进行两部分的拆分。
- 三层结构:三维映射的哈希表,将一个页号进行三部分的拆分。
为什么基数树不需要加锁?
因为基数树的读写是独立的,当一个线程正在处理一个页号对应的span时,如果有另外一个线程想要进行写入,这是完全不会互相影响的,因为基数树是一个大数组,其内部结构确定,通过页号来进行行映射,不会因为新写入一个数据而改变结构。
unordered_map需要加锁,是因为读写并不独立,写入时会改变底层哈希桶的结构,可能导致读取出错!
这里给出三种结构的基数树,32位可以使用双层结构,64位使用三层结构。
#include"Common.h"// --- 参考tc-malloc源码的基数树优化 ---
//可以实现无锁操作 --- 大大提升性能
//基数树的本质也是哈希表
// 1. 1层的结构就是通过一个大的指针数组来指向span
// 2. 2层的结构就是通过二维数组来实现 比如32位下 8KB页,会有2^19个页号
// 前5位确认在哪个指针数组 后13位确定具体指针
// 3 3层的同理使用三维数组// Single-level array
template <int BITS>
class TCMalloc_PageMap1 {
private:static const int LENGTH = 1 << BITS;void** array_;public:typedef uintptr_t Number;//explicit TCMalloc_PageMap1(void* (*allocator)(size_t)) {explicit TCMalloc_PageMap1() {//array_ = reinterpret_cast<void**>((*allocator)(sizeof(void*) << BITS));size_t size = sizeof(void*) << BITS;size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT);array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT);memset(array_, 0, sizeof(void*) << BITS);}// Return the current value for KEY. Returns NULL if not yet set,// or if k is out of range.void* get(Number k) const {if ((k >> BITS) > 0) {return NULL;}return array_[k];}// REQUIRES "k" is in range "[0,2^BITS-1]".// REQUIRES "k" has been ensured before.//// Sets the value 'v' for key 'k'.void set(Number k, void* v) {array_[k] = v;}
};// Two-level radix tree
template <int BITS>
class TCMalloc_PageMap2 {
private:// Put 32 entries in the root and (2^BITS)/32 entries in each leaf.static const int ROOT_BITS = 5;static const int ROOT_LENGTH = 1 << ROOT_BITS;static const int LEAF_BITS = BITS - ROOT_BITS;static const int LEAF_LENGTH = 1 << LEAF_BITS;// Leaf nodestruct Leaf {void* values[LEAF_LENGTH];};Leaf* root_[ROOT_LENGTH]; // Pointers to 32 child nodesvoid* (*allocator_)(size_t); // Memory allocatorpublic:typedef uintptr_t Number;//explicit TCMalloc_PageMap2(void* (*allocator)(size_t)) {explicit TCMalloc_PageMap2() {//allocator_ = allocator;memset(root_, 0, sizeof(root_));PreallocateMoreMemory();}void* get(Number k) const {const Number i1 = k >> LEAF_BITS;const Number i2 = k & (LEAF_LENGTH - 1);if ((k >> BITS) > 0 || root_[i1] == NULL) {return NULL;}return root_[i1]->values[i2];}void set(Number k, void* v) {const Number i1 = k >> LEAF_BITS;const Number i2 = k & (LEAF_LENGTH - 1);ASSERT(i1 < ROOT_LENGTH);root_[i1]->values[i2] = v;}bool Ensure(Number start, size_t n) {for (Number key = start; key <= start + n - 1;) {const Number i1 = key >> LEAF_BITS;// Check for overflowif (i1 >= ROOT_LENGTH)return false;// Make 2nd level node if necessaryif (root_[i1] == NULL) {//Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));//if (leaf == NULL) return false;static ObjectPool<Leaf> leafPool;Leaf* leaf = (Leaf*)leafPool.New();memset(leaf, 0, sizeof(*leaf));root_[i1] = leaf;}// Advance key past whatever is covered by this leaf nodekey = ((key >> LEAF_BITS) + 1) << LEAF_BITS;}return true;}void PreallocateMoreMemory() {// Allocate enough to keep track of all possible pagesEnsure(0, 1 << BITS);}
};// Three-level radix tree
template <int BITS>
class TCMalloc_PageMap3 {
private:// How many bits should we consume at each interior levelstatic const int INTERIOR_BITS = (BITS + 2) / 3; // Round-upstatic const int INTERIOR_LENGTH = 1 << INTERIOR_BITS;// How many bits should we consume at leaf levelstatic const int LEAF_BITS = BITS - 2 * INTERIOR_BITS;static const int LEAF_LENGTH = 1 << LEAF_BITS;// Interior nodestruct Node {Node* ptrs[INTERIOR_LENGTH];};// Leaf nodestruct Leaf {void* values[LEAF_LENGTH];};Node* root_; // Root of radix treevoid* (*allocator_)(size_t); // Memory allocatorNode* NewNode() {Node* result = reinterpret_cast<Node*>((*allocator_)(sizeof(Node)));if (result != NULL) {memset(result, 0, sizeof(*result));}return result;}public:typedef uintptr_t Number;explicit TCMalloc_PageMap3(void* (*allocator)(size_t)) {allocator_ = allocator;root_ = NewNode();}void* get(Number k) const {const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);const Number i3 = k & (LEAF_LENGTH - 1);if ((k >> BITS) > 0 ||root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL) {return NULL;}return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3];}void set(Number k, void* v) {ASSERT(k >> BITS == 0);const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);const Number i3 = k & (LEAF_LENGTH - 1);reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v;}bool Ensure(Number start, size_t n) {for (Number key = start; key <= start + n - 1;) {const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1);// Check for overflowif (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH)return false;// Make 2nd level node if necessaryif (root_->ptrs[i1] == NULL) {Node* n = NewNode();if (n == NULL) return false;root_->ptrs[i1] = n;}// Make leaf node if necessaryif (root_->ptrs[i1]->ptrs[i2] == NULL) {Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));if (leaf == NULL) return false;memset(leaf, 0, sizeof(*leaf));root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);}// Advance key past whatever is covered by this leaf nodekey = ((key >> LEAF_BITS) + 1) << LEAF_BITS;}return true;}void PreallocateMoreMemory() {}
};
然后将所以的_SpanMap的[]
和find()
操作更换为set
和get
然后将不必要的锁去除,接下来就是见证奇迹的时刻了:
可以看到我们高并发内存池可以达到原生new性能的四倍左右!!!
4 项目总结
这样我们就完成了我们的高并发内存池项目,接下来我们可以将我们的项目打包成静态库或动态库来来进行使用
通过对属性页的修改,我们可以生成对应的静态库和动态库!至此我们的项目全部完成!
项目总结(简历版)
2024.8 - 2024.9 — 高并发内存池 —
- 项目描述:
开发了一个用于高并发环境的内存池管理系统,旨在提高内存分配与释放的效率,减少系统开销并优化多线程应用性能。- 技术栈:
C++编程语言 、多线程编程(std::thread, std::mutex, std::condition_variable)、内存管理与自定义分配器- 关键实现:
实现了自定义内存池以管理内存分配,有效减少内存碎片。采用互斥锁和条件变量等技术确保线程安全的同时,尽量降低锁争用。针对多线程优化,使用策略如缓存对齐和锁分段技术,提高应用程序的运行性能。- 项目挑战:
多线程环境下的非阻塞内存管理设计与实现。
通过合理策略减少内存碎片,并提高分配器效率。
使用全面的测试方案进行调试,确保在高并发环境下的稳定性。- 项目成果:
提高了内存分配和释放的效率,显著降低了多线程程序的内存管理开销。
这篇关于【项目日记】高并发内存池---细节优化及性能测试的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!