【项目日记】高并发内存池---细节优化及性能测试

2024-09-07 16:28

本文主要是介绍【项目日记】高并发内存池---细节优化及性能测试,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在这里插入图片描述

终此一生,只有两种办法:
要么梦见生活,要么落实生活。
--- 勒内・夏尔 ---

高并发内存池---细节优化及性能测试

  • 1 细节优化
    • 1.1 大块内存的申请处理
    • 1.2 配合定长池脱离使用new
    • 1.3 释放对象无需内存大小
  • 2 调试Debug
  • 3 性能测试
  • 4 项目总结

1 细节优化

在前面的文章中我们已经实现了高并发内存池的申请内存逻辑和释放内存逻辑:

  • 申请逻辑:
    1. 首先在线程缓存中查看是否有内存块可以直接使用,没有就去中心缓存申请一批内存块。
    2. 中心缓存中的span中有未使用的合适内存块就进行返回,没有就去页缓存中申请合适页数的span。
    3. 页缓存中如果有合适的span就进行返回,没有就查找有没有页数更大的span 。如果有页数大的span有进行拆分,没有就去申请最大页数的span进行拆分!
  • 释放逻辑:
    1. 线程缓存回收内存块,挂载到合适的自由链表中。如果挂载的内存块数量到达一定标准就进行回收。
    2. 回收的内存块还给中心缓存,中心缓存根据内存块的地址归还给对应的span进挂载。当span的引用计数为0时就进行回收
    3. 页缓存得到span之后,先来进行前后页的合并,将span尽可能的合并成更大的span。

接下来我们就来解决一下一些细节问题:

1.1 大块内存的申请处理

根据我们书写的代码,线程缓存中最大挂载的内存块大小是256KB,当我们申请大于256KB的内存时,显然现场缓存是不能满足要求的,而由于中心缓存的映射关系和线程缓存是一致的,所以想要申请大于256KB的内存块,就需要去页缓存中直接申请。大于256KB的内存我们按照页的大小来进行内存对齐,所以大于256KB的内存我们就实际上是去获取对应页的span。我们获取到span之后再按照页号复原出地址进行返回就可以了!

static void* ConcurrentAlloc(size_t size)
{//如果size的空间大于的32页,就不能在线程缓存里进行取了//因为线程缓存中最大的内存块是256KB //超过32页就需要取页缓存中去进行取了if (size > MAX_BYTES){//计算对齐空间size_t alignsize = SizeClass::RoundUp(size);//计算页数size_t num = alignsize >> PAGE_SHIFT;//根据页数开辟spanPageCache::GetInstance()->GetMutex().lock();Span * span = PageCache::GetInstance()->NewSpan(num);PageCache::GetInstance()->GetMutex().unlock();span->_objsize = alignsize;//根据span中的_pageid计算地址void* ptr = (void*)(span->_pageid << PAGE_SHIFT);//进行返回即可return ptr;}//在该线程中进行内存的申请if (pThreadCache == nullptr){static ObjectPool<ThreadCache> tcpool;//pThreadCache = new ThreadCache;pThreadCache = tcpool.New();}//进行开辟空间void * obj = pThreadCache->Allocate(size);//cout << std::this_thread::get_id() << " : " << pThreadCache << endl;return obj;
}

回收大内存时,也进行特殊处理,跳过线程缓存直接回收到页缓存中就可以了!

static void  ConcurrentFree(void* ptr)
{assert(ptr);//找到ptr对应的span即可找到字节数//在span创建完之后传回给中心缓存挂载起来时就已经确定了其内存块的大小!!!Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);size_t size = span->_objsize;//如果size大于256KB就要跳过线程缓存和中心缓存if (size > MAX_BYTES){//直接回收该空间到页缓存中PageCache::GetInstance()->GetMutex().lock();PageCache::GetInstance()->ReleaseSpanToPageCache(span);PageCache::GetInstance()->GetMutex().unlock();return;}//回收空间pThreadCache->Deallocate(ptr, size);
}

大于256KB的情况中还有一种特殊情况,当申请的空间页数大于128页时,页缓存也无法对其进行管理。因为页缓存中最大的span就是128页的,针对这种情况,我们的内存池就不能发挥作用了,所以我们直接使用系统调用来申请者篇大空间就可以了,回收时一样特殊处理。

Span* PageCache::NewSpan(size_t num)
{// 如果开辟的页数大于 128 页 内存池就不能发挥作用了// 需要直接去堆上申请if (num >= PAGENUM){//开辟对应的大空间void* ptr = SystemAlloc(num);//创建span//Span* newspan = new Span;//使用定长池 跳过new(malloc)Span* newspan = _spanpool.New();newspan->_pageid = (PAGE_ID)ptr >> PAGE_SHIFT;//页号newspan->_n = num;//页数//建立哈希映射_SpanMap[newspan->_pageid] = newspan;//进行返回return newspan;}// 1 - 128//...省略...//.........
}
void PageCache::ReleaseSpanToPageCache(Span* span)
{//如果页数大于128 不能进行合并 直接返回给系统if (span->_n > PAGENUM - 1){void* ptr = (void*)(span->_pageid << PAGE_SHIFT);//解除映射_SpanMap.erase(span->_pageid);_spanpool.Delete(span);SystemFree(ptr);return;}//1 -128//......//......
}

这就是对大内存的处理方法,这一步的优化其实就是将特殊情况给补全!

1.2 配合定长池脱离使用new

我们的项目是为了替代掉C++中原始的new(malloc),使用自己的一套体系来做到内存申请。但是我们在实现高并发内存池是使用很多的new来创建span,为了解决这个问题,我们引入我们最开始实现的定长池:定长池思路分析

定长池内部是:

  1. 一段连续的大空间:我们使用char*来进行管理,方便一个一个字节来进行处理。
  2. 一个自由链表:进行资源的回收使用,内部通过链表结构链接起来
  3. 剩余资源数:进行资源空间的管理,不足够是进行扩容!

通过对开辟出来的大空间进行拆分实现减少内存碎片!

我们可以在页缓存中加入一个管理span对象的定长池,每次newspan的时候就换成从定长池中进行取出。

class PageCache
{
public:Span* NewSpan(size_t num);std::mutex& GetMutex();void ReleaseSpanToPageCache(Span* span);Span* MapObjectToSpan(void* start);
private://...//对象池 方便进行取出spanObjectPool<Span> _spanpool;//...//...
}

然后将所有的new都更换为_spanpool.New()就可以了!

1.3 释放对象无需内存大小

我们的项目最终是要替代new(malloc)的,我们在使用new的时候并不需要去加入开辟的空间,系统会自动识别开辟空间的大小。那么在我们的程序中我们需要如何优化才可以不需要在加入内存大小呢?

首先,我们申请的空间都为内存块,内存块在创建的时候是挂载在span上的,每个span都有自己独一无二的页号。所以其实页号就可以对应其内存块的大小。为了达到这样的效果我们可以建立一个哈希表,来记录页号对应的内存块大小。但更简单的做法是在span中记录一个内存块大小的成员变量,这样我们可以通过地址找到span,,也就找到了内存块大小!!!

static void  ConcurrentFree(void* ptr)
{assert(ptr);//找到ptr对应的span即可找到字节数//在span创建完之后传回给中心缓存挂载起来时就已经确定了其内存块的大小!!!Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);size_t size = span->_objsize;//如果size大于256KB就要跳过线程缓存和中心缓存if (size > MAX_BYTES){//直接回收该空间到页缓存中PageCache::GetInstance()->GetMutex().lock();PageCache::GetInstance()->ReleaseSpanToPageCache(span);PageCache::GetInstance()->GetMutex().unlock();return;}//回收空间pThreadCache->Deallocate(ptr, size);
}

这样就不在需要再free时加上内存大小了,使用起来方便了很多!!!

2 调试Debug

在性能测试之前,首先我们要确保我们的代码没有了问题,由于编写代码时的不仔细,导致我进行了很长时间的Debug ,快给孩子整崩溃了o(╥﹏╥)oo(╥﹏╥)oo(╥﹏╥)o

在Debug过程中,VS2022提供了很多实用的功能:

  1. 并行监视:可以监视多个线程的执行情况
  2. 调用堆栈:在程序崩溃时可以返回到上层的调用函数
  3. 内存对比:解决内存块问题,无法通过监视来查看链式机构,通过内存窗口可以方便查看

在这里插入图片描述
程序崩溃时,通过条件断点逐层向上检查:

if()
{int x = 0 ;//打上断点
}

这样可以帮助我们更快的解决bug,但还是需要一定经验,不然会和我一样进行好长好长时间的调试,真的太难受了!

3 性能测试

现在我们来进行一下高并发内存池与new的性能对比,我们分别使用两种方式来进行开辟若干的内存块:
在这里插入图片描述
可以看到我们现在的内存池开辟释放空间的效率其实和原生new差不多,这可不好,因为谷歌开源的tcmalloc可以提升至原生new效率的4倍。我们现在到底是哪里的效率比较低呢???我们可以通过VS2022的性能测试工具来进行检测:
在这里插入图片描述
这里如果报出了无法生成的错误,说明我们需要进行一个简单配置,将我们的程序加上/profile属性
在这里插入图片描述

之后我们就可以来看效率中影响最大的部分是哪里:
在这里插入图片描述
根据调用的函数可以推断出:
主要耗时是在锁的竞争中,在central中因为使用的是桶锁所以对性能的影响不大。但是在pagecache文件中的函数耗时都比较久,其实我们隐约已经知道问题出现在哪里了,首先unordered_map的底层是哈希桶结构,哈希桶内部不是线程安全的,其次所以每次对页缓存进行操作时都要将整个页缓存锁住,这其实就是影响性能的主要因素。

那么解决这个问题的首要问题就可不可以将锁去除,使用一直无需加锁的哈希表结构呢?tcmalloc中的解决方案是基数树,一种特殊的哈希表。其主要有三种:

  1. 单层结构:线性映射的大数组,通过页号直接映射。
  2. 双层结构:二维映射的哈希表,将一个页号进行两部分的拆分。
  3. 三层结构:三维映射的哈希表,将一个页号进行三部分的拆分。

为什么基数树不需要加锁
因为基数树的读写是独立的,当一个线程正在处理一个页号对应的span时,如果有另外一个线程想要进行写入,这是完全不会互相影响的,因为基数树是一个大数组,其内部结构确定,通过页号来进行行映射,不会因为新写入一个数据而改变结构。

unordered_map需要加锁,是因为读写并不独立,写入时会改变底层哈希桶的结构,可能导致读取出错!

这里给出三种结构的基数树,32位可以使用双层结构,64位使用三层结构。

#include"Common.h"// --- 参考tc-malloc源码的基数树优化 ---
//可以实现无锁操作 --- 大大提升性能
//基数树的本质也是哈希表
// 1. 1层的结构就是通过一个大的指针数组来指向span
// 2. 2层的结构就是通过二维数组来实现 比如32位下 8KB页,会有2^19个页号
//	  前5位确认在哪个指针数组 后13位确定具体指针
// 3 3层的同理使用三维数组// Single-level array
template <int BITS>
class TCMalloc_PageMap1 {
private:static const int LENGTH = 1 << BITS;void** array_;public:typedef uintptr_t Number;//explicit TCMalloc_PageMap1(void* (*allocator)(size_t)) {explicit TCMalloc_PageMap1() {//array_ = reinterpret_cast<void**>((*allocator)(sizeof(void*) << BITS));size_t size = sizeof(void*) << BITS;size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT);array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT);memset(array_, 0, sizeof(void*) << BITS);}// Return the current value for KEY.  Returns NULL if not yet set,// or if k is out of range.void* get(Number k) const {if ((k >> BITS) > 0) {return NULL;}return array_[k];}// REQUIRES "k" is in range "[0,2^BITS-1]".// REQUIRES "k" has been ensured before.//// Sets the value 'v' for key 'k'.void set(Number k, void* v) {array_[k] = v;}
};// Two-level radix tree
template <int BITS>
class TCMalloc_PageMap2 {
private:// Put 32 entries in the root and (2^BITS)/32 entries in each leaf.static const int ROOT_BITS = 5;static const int ROOT_LENGTH = 1 << ROOT_BITS;static const int LEAF_BITS = BITS - ROOT_BITS;static const int LEAF_LENGTH = 1 << LEAF_BITS;// Leaf nodestruct Leaf {void* values[LEAF_LENGTH];};Leaf* root_[ROOT_LENGTH];             // Pointers to 32 child nodesvoid* (*allocator_)(size_t);          // Memory allocatorpublic:typedef uintptr_t Number;//explicit TCMalloc_PageMap2(void* (*allocator)(size_t)) {explicit TCMalloc_PageMap2() {//allocator_ = allocator;memset(root_, 0, sizeof(root_));PreallocateMoreMemory();}void* get(Number k) const {const Number i1 = k >> LEAF_BITS;const Number i2 = k & (LEAF_LENGTH - 1);if ((k >> BITS) > 0 || root_[i1] == NULL) {return NULL;}return root_[i1]->values[i2];}void set(Number k, void* v) {const Number i1 = k >> LEAF_BITS;const Number i2 = k & (LEAF_LENGTH - 1);ASSERT(i1 < ROOT_LENGTH);root_[i1]->values[i2] = v;}bool Ensure(Number start, size_t n) {for (Number key = start; key <= start + n - 1;) {const Number i1 = key >> LEAF_BITS;// Check for overflowif (i1 >= ROOT_LENGTH)return false;// Make 2nd level node if necessaryif (root_[i1] == NULL) {//Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));//if (leaf == NULL) return false;static ObjectPool<Leaf>	leafPool;Leaf* leaf = (Leaf*)leafPool.New();memset(leaf, 0, sizeof(*leaf));root_[i1] = leaf;}// Advance key past whatever is covered by this leaf nodekey = ((key >> LEAF_BITS) + 1) << LEAF_BITS;}return true;}void PreallocateMoreMemory() {// Allocate enough to keep track of all possible pagesEnsure(0, 1 << BITS);}
};// Three-level radix tree
template <int BITS>
class TCMalloc_PageMap3 {
private:// How many bits should we consume at each interior levelstatic const int INTERIOR_BITS = (BITS + 2) / 3; // Round-upstatic const int INTERIOR_LENGTH = 1 << INTERIOR_BITS;// How many bits should we consume at leaf levelstatic const int LEAF_BITS = BITS - 2 * INTERIOR_BITS;static const int LEAF_LENGTH = 1 << LEAF_BITS;// Interior nodestruct Node {Node* ptrs[INTERIOR_LENGTH];};// Leaf nodestruct Leaf {void* values[LEAF_LENGTH];};Node* root_;                          // Root of radix treevoid* (*allocator_)(size_t);          // Memory allocatorNode* NewNode() {Node* result = reinterpret_cast<Node*>((*allocator_)(sizeof(Node)));if (result != NULL) {memset(result, 0, sizeof(*result));}return result;}public:typedef uintptr_t Number;explicit TCMalloc_PageMap3(void* (*allocator)(size_t)) {allocator_ = allocator;root_ = NewNode();}void* get(Number k) const {const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);const Number i3 = k & (LEAF_LENGTH - 1);if ((k >> BITS) > 0 ||root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL) {return NULL;}return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3];}void set(Number k, void* v) {ASSERT(k >> BITS == 0);const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);const Number i3 = k & (LEAF_LENGTH - 1);reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v;}bool Ensure(Number start, size_t n) {for (Number key = start; key <= start + n - 1;) {const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1);// Check for overflowif (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH)return false;// Make 2nd level node if necessaryif (root_->ptrs[i1] == NULL) {Node* n = NewNode();if (n == NULL) return false;root_->ptrs[i1] = n;}// Make leaf node if necessaryif (root_->ptrs[i1]->ptrs[i2] == NULL) {Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));if (leaf == NULL) return false;memset(leaf, 0, sizeof(*leaf));root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);}// Advance key past whatever is covered by this leaf nodekey = ((key >> LEAF_BITS) + 1) << LEAF_BITS;}return true;}void PreallocateMoreMemory() {}
};

然后将所以的_SpanMap的[]find()操作更换为set get然后将不必要的锁去除,接下来就是见证奇迹的时刻了:
在这里插入图片描述
可以看到我们高并发内存池可以达到原生new性能的四倍左右!!!

4 项目总结

这样我们就完成了我们的高并发内存池项目,接下来我们可以将我们的项目打包成静态库或动态库来来进行使用
在这里插入图片描述
通过对属性页的修改,我们可以生成对应的静态库和动态库!至此我们的项目全部完成!

项目总结(简历版)

2024.8 - 2024.9 — 高并发内存池 —

  1. 项目描述:
    开发了一个用于高并发环境的内存池管理系统,旨在提高内存分配与释放的效率,减少系统开销并优化多线程应用性能。
  2. 技术栈:
    C++编程语言 、多线程编程(std::thread, std::mutex, std::condition_variable)、内存管理与自定义分配器
  3. 关键实现:
    实现了自定义内存池以管理内存分配,有效减少内存碎片。采用互斥锁和条件变量等技术确保线程安全的同时,尽量降低锁争用。针对多线程优化,使用策略如缓存对齐和锁分段技术,提高应用程序的运行性能。
  4. 项目挑战:
    多线程环境下的非阻塞内存管理设计与实现。
    通过合理策略减少内存碎片,并提高分配器效率。
    使用全面的测试方案进行调试,确保在高并发环境下的稳定性。
  5. 项目成果:
    提高了内存分配和释放的效率,显著降低了多线程程序的内存管理开销。

这篇关于【项目日记】高并发内存池---细节优化及性能测试的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1145567

相关文章

Vue3 的 shallowRef 和 shallowReactive:优化性能

大家对 Vue3 的 ref 和 reactive 都很熟悉,那么对 shallowRef 和 shallowReactive 是否了解呢? 在编程和数据结构中,“shallow”(浅层)通常指对数据结构的最外层进行操作,而不递归地处理其内部或嵌套的数据。这种处理方式关注的是数据结构的第一层属性或元素,而忽略更深层次的嵌套内容。 1. 浅层与深层的对比 1.1 浅层(Shallow) 定义

这15个Vue指令,让你的项目开发爽到爆

1. V-Hotkey 仓库地址: github.com/Dafrok/v-ho… Demo: 戳这里 https://dafrok.github.io/v-hotkey 安装: npm install --save v-hotkey 这个指令可以给组件绑定一个或多个快捷键。你想要通过按下 Escape 键后隐藏某个组件,按住 Control 和回车键再显示它吗?小菜一碟: <template

性能测试介绍

性能测试是一种测试方法,旨在评估系统、应用程序或组件在现实场景中的性能表现和可靠性。它通常用于衡量系统在不同负载条件下的响应时间、吞吐量、资源利用率、稳定性和可扩展性等关键指标。 为什么要进行性能测试 通过性能测试,可以确定系统是否能够满足预期的性能要求,找出性能瓶颈和潜在的问题,并进行优化和调整。 发现性能瓶颈:性能测试可以帮助发现系统的性能瓶颈,即系统在高负载或高并发情况下可能出现的问题

HDFS—存储优化(纠删码)

纠删码原理 HDFS 默认情况下,一个文件有3个副本,这样提高了数据的可靠性,但也带来了2倍的冗余开销。 Hadoop3.x 引入了纠删码,采用计算的方式,可以节省约50%左右的存储空间。 此种方式节约了空间,但是会增加 cpu 的计算。 纠删码策略是给具体一个路径设置。所有往此路径下存储的文件,都会执行此策略。 默认只开启对 RS-6-3-1024k

NameNode内存生产配置

Hadoop2.x 系列,配置 NameNode 内存 NameNode 内存默认 2000m ,如果服务器内存 4G , NameNode 内存可以配置 3g 。在 hadoop-env.sh 文件中配置如下。 HADOOP_NAMENODE_OPTS=-Xmx3072m Hadoop3.x 系列,配置 Nam

如何用Docker运行Django项目

本章教程,介绍如何用Docker创建一个Django,并运行能够访问。 一、拉取镜像 这里我们使用python3.11版本的docker镜像 docker pull python:3.11 二、运行容器 这里我们将容器内部的8080端口,映射到宿主机的80端口上。 docker run -itd --name python311 -p

字节面试 | 如何测试RocketMQ、RocketMQ?

字节面试:RocketMQ是怎么测试的呢? 答: 首先保证消息的消费正确、设计逆向用例,在验证消息内容为空等情况时的消费正确性; 推送大批量MQ,通过Admin控制台查看MQ消费的情况,是否出现消费假死、TPS是否正常等等问题。(上述都是临场发挥,但是RocketMQ真正的测试点,还真的需要探讨) 01 先了解RocketMQ 作为测试也是要简单了解RocketMQ。简单来说,就是一个分

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

使用opencv优化图片(画面变清晰)

文章目录 需求影响照片清晰度的因素 实现降噪测试代码 锐化空间锐化Unsharp Masking频率域锐化对比测试 对比度增强常用算法对比测试 需求 对图像进行优化,使其看起来更清晰,同时保持尺寸不变,通常涉及到图像处理技术如锐化、降噪、对比度增强等 影响照片清晰度的因素 影响照片清晰度的因素有很多,主要可以从以下几个方面来分析 1. 拍摄设备 相机传感器:相机传

MySQL高性能优化规范

前言:      笔者最近上班途中突然想丰富下自己的数据库优化技能。于是在查阅了多篇文章后,总结出了这篇! 数据库命令规范 所有数据库对象名称必须使用小写字母并用下划线分割 所有数据库对象名称禁止使用mysql保留关键字(如果表名中包含关键字查询时,需要将其用单引号括起来) 数据库对象的命名要能做到见名识意,并且最后不要超过32个字符 临时库表必须以tmp_为前缀并以日期为后缀,备份