NVIDIA NCCL 源码学习(十四)- NVLink SHARP

2024-04-19 13:12

本文主要是介绍NVIDIA NCCL 源码学习(十四)- NVLink SHARP,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

背景

上节我们介绍了IB SHARP的工作原理,进一步的,英伟达在Hopper架构机器中引入了第三代NVSwitch,就像机间IB SHARP一样,机内可以通过NVSwitch执行NVLink SHARP,简称nvls,这节我们会介绍下NVLink SHARP如何工作的。

后续为了方便都是以nranks为2举例的,但值得注意的是nranks为2实际上不会用到nvls。

图搜索

ncclResult_t ncclNvlsInit(struct ncclComm* comm) {...if (comm->nvlsSupport == 1) comm->nvlsChannels = std::max(comm->config.minCTAs, std::min(comm->config.maxCTAs, (int)ncclParamNvlsChannels()));return ncclSuccess;
}

init过程中主要是判断是否支持,如果支持的话会将nvlsSupport设置为1,然后设置comm->nvlsChannels,这里需要注意下,这个是kernel实际启动的block数,而不是搜索出来的channel数。

然后开始执行搜索过程,nvls的channel和其他算法搜索出来的channel不太一样,我们具体看下

  nvlsGraph.pattern = NCCL_TOPO_PATTERN_NVLS;nvlsGraph.minChannels = 1; nvlsGraph.maxChannels = MAXCHANNELS;if (comm->nvlsSupport) {NCCLCHECKGOTO(ncclTopoCompute(comm->topo, &nvlsGraph), ret, fail);NCCLCHECKGOTO(ncclTopoPrintGraph(comm->topo, &nvlsGraph), ret, fail);}

设置pattern为NCCL_TOPO_PATTERN_NVLS,然后开始搜索,通过pattern确定到backToNet和backToFirstRank均为-1。

ncclResult_t ncclTopoSearchRec(struct ncclTopoSystem* system, struct ncclTopoGraph* graph, struct ncclTopoGraph* saveGraph, int* time) {int backToNet, backToFirstRank;NCCLCHECK(ncclTopoSearchParams(system, graph->pattern, &backToNet, &backToFirstRank));if (system->nodes[NET].count) {} else {if (graph->pattern == NCCL_TOPO_PATTERN_NVLS) {NCCLCHECK(ncclTopoSearchTryGpu(system, graph, saveGraph, 0, backToNet, backToFirstRank, 0, time, -1, -1, graph->nChannels));return ncclSuccess;} ...}return ncclSuccess;
}

此时graph->nChannels为0。

ncclResult_t ncclTopoSearchTryGpu(struct ncclTopoSystem* system, struct ncclTopoGraph* graph, struct ncclTopoGraph* saveGraph, int step, int backToNet, int backToFirstRank, int forcedOrder, int *time, int type, int index, int g) {const uint64_t flag = 1ULL<<(graph->nChannels);struct ncclTopoNode* gpu;NCCLCHECK(ncclTopoFollowPath(system, graph, type, index, GPU, g, 1, &gpu));if (gpu) {gpu->used ^= flag;NCCLCHECK(ncclTopoSearchRecGpu(system, graph, saveGraph, gpu, step, backToNet, backToFirstRank, forcedOrder, time));gpu->used ^= flag;NCCLCHECK(ncclTopoFollowPath(system, graph, type, index, GPU, g, -1, &gpu));} return ncclSuccess;
}     

由于type为-1,因此ncclTopoFollowPath直接返回gpu0,从gpu0开始搜索。

ncclResult_t ncclTopoSearchRecGpu(struct ncclTopoSystem* system, struct ncclTopoGraph* graph, struct ncclTopoGraph* saveGraph, struct ncclTopoNode* gpu, int step, int backToNet, int backToFirstRank, int forcedOrder, int *time) {if ((*time) <= 0) return ncclSuccess;(*time)--;int ngpus = system->nodes[GPU].count;if (step == ngpus) {}graph->intra[graph->nChannels*ngpus+step] = gpu->gpu.rank;int g = gpu - system->nodes[GPU].nodes;if (step == backToNet) {} else if (graph->pattern == NCCL_TOPO_PATTERN_NVLS) {NCCLCHECK(ncclTopoSearchTryNvls(system, graph, saveGraph, g, ngpus, time));} else if (step < system->nodes[GPU].count-1) {} else if (step == backToFirstRank) {} else {}return ncclSuccess;
}

将0号GPU填到graph->intra,由于pattern为NCCL_TOPO_PATTERN_NVLS,因此直接执行ncclTopoSearchTryNvls。

ncclResult_t ncclTopoSearchTryNvls(struct ncclTopoSystem* system, struct ncclTopoGraph* graph, struct ncclTopoGraph* saveGraph, int g, int ngpus, int *time) {struct ncclTopoNode* nvs;struct ncclTopoNode* gpu;int d0=0; // See if there is enough bandwidth for NVS->GPU trafficdo {NCCLCHECK(ncclTopoFollowPath(system, graph, NVS, 0, GPU, d0, d0 == g ? 2 : 1, &gpu));d0++;} while (gpu && d0 < system->nodes[GPU].count);if (gpu == NULL) {d0--;} else {int d1=0; // See if there is enough bandwidth for GPU->NVS trafficdo {NCCLCHECK(ncclTopoFollowPath(system, graph, GPU, d1, NVS, 0, d1 == g ? 2 : 1, &nvs));d1++;} while (nvs && d1 < system->nodes[GPU].count);if (nvs == NULL) {d1--;} else { // Both directions worked. Move on to the next path.NCCLCHECK(ncclTopoSearchRecGpu(system, graph, saveGraph, NULL, ngpus, -1, -1, 0, time));}while (d1) {d1--;NCCLCHECK(ncclTopoFollowPath(system, graph, GPU, d1, NVS, 0, d1 == g ? -2 : -1, &nvs));}}while (d0) {d0--;NCCLCHECK(ncclTopoFollowPath(system, graph, NVS, 0, GPU, d0, d0 == g ? -2 : -1, &gpu));}return ncclSuccess;
}

这里就是判断带宽是否满足要求,我们先看下实际机器中GPU和NVSwitch的拓扑如图1所示

在这里插入图片描述

图 1

但是因为NVSwitch对用户来说是透明的,因此NCCL中构建的拓扑实际下所示

在这里插入图片描述

图 2

假设现在的搜索条件中带宽为bw,g为GPU0,那么这里搜索一个channel的逻辑是判断所有GPU节点到NVSwitch的双向带宽是否大于bw,如果大于的话,则减去bw,特殊的是GPU0,需要判断现有链路带宽是否大于2 * bw,这个原因后边会介绍。

在这里插入图片描述

图 3

然后继续执行ncclTopoSearchRecGpu,注意这里step指定为了ngpus,所以就完成了一个channel的搜索,nChannels变为1,那么下次执行ncclTopoSearchTryGpu的时候将从GPU1开始,重复这一过程知道搜索到了ngpus个channel,以四卡为例,搜索出的channel如下所示

0
1
2
3

其他算法的channel表示了节点内的传输顺序,而nvls搜索出来的channel不太一样,比如第一个channel里的0,nccl称为nvlsHead,他用来表示某一段内存的reduce之类的工作由谁来负责,后边我们会看到。

channel连接

static ncclResult_t connectNvls(struct ncclComm* comm, int* nvlsHeads, struct ncclTopoGraph* nvlsGraph) {int nHeads = nvlsGraph->nChannels;int headRank = -1; for (int h=0; h<nHeads; h++) {if (nvlsGraph->intra[h*comm->localRanks] == comm->rank) headRank = h;}for (int c=0; c<comm->nvlsChannels; c++) {struct ncclChannel* channel = comm->channels+c;channel->nvls.nHeads = nHeads;for (int h=0; h<nHeads; h++) channel->nvls.up[h] = comm->nRanks+1+h;for (int h=nHeads; h<NCCL_MAX_NVLS_ARITY; h++) channel->nvls.up[h] = -1; channel->nvls.down = comm->nRanks+1+headRank;channel->nvls.out = -1;       // NVLS+SHARP not yet implemented.channel->nvls.headRank = headRank;channel->nvls.treeUp = channel->nvls.treeDown[0] = channel->nvls.treeDown[1] = channel->nvls.treeDown[2] = -1; channel->nvls.node = comm->node;channel->nvls.nNodes = comm->nNodes;}if (comm->nNodes == 1) return ncclSuccess;
}

计算headRank,即第几个channel的节点是自己这个rank,然后开始设置所有的nvls channel,这里的up和down用于索引peers,由于从nRanks+1开始才是nvls的链接,所以这里要加上nRanks+1,up是所有的head,down是headRank,其实就是自己。

内存注册

内存注册的整体流程如图4所示,首先通过cuMulticastCreate创建一个multicast对象,图中handle指向这个multicast对象,然后每个GPU通过cuMulticastAddDevice将当前device和这个multicast对象关联起来,然后申请显存,最后通过cuMulticastBindAddr或者cuMulticastBindMem将申请到的显存和handle关联起来。
在这里插入图片描述

图 4

ncclResult_t ncclNvlsSetup(struct ncclComm* comm, struct ncclComm* parent) {...size_t buffSize = comm->buffSizes[NCCL_PROTO_SIMPLE];size_t memSize = NVLS_MEM_ALIGN_SIZE;size_t nvlsPerRankSize = nChannels * 2 * (buffSize + memSize);size_t nvlsTotalSize = nvlsPerRankSize * nHeads;char* shareableHandle = resources->shareableHandle;NCCLCHECKGOTO(nvlsGetProperties(comm, resources, dev, comm->localRanks, nvlsTotalSize), res, cleanup);...
}

buffSize为SIMPLE协议的buff大小,memSize用于保存head,tail,然后计算一共需要分配多少内存,nHeads为搜索出来的channel数量,即nRanks,后边会看到为什么内存大小是这样的,然后将内存总大小和localRanks等信息保存到resources。

ncclResult_t ncclNvlsSetup(struct ncclComm* comm, struct ncclComm* parent) {...if (comm->localRank == 0) {NCCLCHECKGOTO(nvlsGroupCreate(comm, &resources->properties, comm->localRank, comm->localRanks, &resources->mcHandle, shareableHandle), res, cleanup);NCCLCHECKGOTO(bootstrapIntraNodeBroadcast(comm->bootstrap, comm->localRankToRank, comm->localRank, comm->localRanks, 0, shareableHandle, NVLS_HANDLE_SIZE), res, cleanup);} else {NCCLCHECKGOTO(bootstrapIntraNodeBroadcast(comm->bootstrap, comm->localRankToRank, comm->localRank, comm->localRanks, 0, shareableHandle, NVLS_HANDLE_SIZE), res, cleanup);NCCLCHECKGOTO(nvlsGroupConnect(comm, shareableHandle, comm->localRankToRank[0], &resources->mcHandle), res, cleanup);}   ...
}

rank0执行nvlsGroupCreate,通过cuMulticastCreate创建一个multicast对象,保存在resources->mcHandle,由于要跨进程共享,所以需要转成shareable handle,这里是直接memcpy的。

ncclResult_t nvlsGroupCreate(struct ncclComm *comm, CUmulticastObjectProp *prop, int rank, unsigned int nranks, CUmemGenericAllocationHandle *mcHandle, char *shareableHandle) {size_t size = prop->size;CUCHECK(cuMulticastCreate(mcHandle, prop));memcpy(shareableHandle, mcHandle, sizeof(CUmemGenericAllocationHandle));return ncclSuccess;
}

然后所有rank执行bootstrapIntraNodeBroadcast,rank0将multicast对象的共享handle广播到所有rank的shareableHandle,其他rank在得到shareableHandle之后通过nvlsGroupConnect转成mcHandle。然后通过cuMulticastAddDevice将当前卡bind到mcHandle,这样所有rank就都拿到了mcHandle对应的multicast对象。
ncclResult_t nvlsGroupBindMem(struct ncclComm *comm, struct ncclNvlsSharedRes* resources) {size_t size = resources->size;size_t granularity;CUdeviceptr ptr = 0;CUmemAllocationProp prop;memset(&prop, 0, sizeof(prop));prop.type = CU_MEM_ALLOCATION_TYPE_PINNED;prop.location.type = CU_MEM_LOCATION_TYPE_DEVICE;prop.location.id = resources->dev;prop.requestedHandleTypes = NVLS_CU_MEM_HANDLE_TYPE;CUCHECK(cuMemGetAllocationGranularity(&granularity, &prop, CU_MEM_ALLOC_GRANULARITY_RECOMMENDED));resources->ucGran = granularity;// Map a VA for UC memoryCUCHECK(cuMemAddressReserve(&ptr, size, granularity, 0U, 0));// Alloc local physical mem for this NVLS groupCUCHECK(cuMemCreate(&resources->ucHandle, size, &prop, 0));CUCHECK(cuMemMap(ptr, size, 0, resources->ucHandle, 0));CUCHECK(cuMemSetAccess(ptr, size, &resources->accessDesc, 1));CUDACHECK(cudaMemset((void*)ptr, 0, size));resources->ucBuff = (char*)ptr;CUCHECK(cuMulticastBindMem(resources->mcHandle, 0/*mcOffset*/, resources->ucHandle, 0/*memOffset*/, size, 0/*flags*/));return ncclSuccess;
}

然后开始分配物理内存并映射到虚拟地址空间,首先预留一段虚拟地址空间到ptr,然后分配物理内存到ucHandle,再将ucHandle指向的物理内存map到ptr,将ptr赋值给ucBuff,如图5所示。

在这里插入图片描述

图 5

最后通过cuMulticastBindMem将ucHandle对应的物理内存bind到mcHandle。

然后开始执行nvlsGroupMapMem将mcHandle映射到虚拟地址空间。

ncclResult_t nvlsGroupMapMem(struct ncclComm *comm, struct ncclNvlsSharedRes* resources) {size_t size = resources->size;CUdeviceptr ptr = 0;// Create a VA for the NVLSCUCHECK(cuMemAddressReserve(&ptr, size, resources->granularity, 0U, 0));// Map the VA locallyCUCHECK(cuMemMap(ptr, size, 0, resources->mcHandle, 0));resources->mcBuff = (char*)ptr;INFO(NCCL_NVLS, "NVLS Mapped MC buffer at %p size %zi", resources->mcBuff, size);// Having completed the BindMem we can now call SetAccess// NB: It will block until all ranks have bound to the GroupCUCHECK(cuMemSetAccess((CUdeviceptr)resources->mcBuff, size, &resources->accessDesc, 1));return ncclSuccess;
}

同样的,预留虚拟地址空间空间到ptr,然后将mcHandle映射到ptr,保存在mcBuff。此时如图6所示

在这里插入图片描述

图 6

此时这块物理内存被映射到了ucBuff和mcBuff,ucBuff是Unicast buffer,对他的访问只会影响到当前device的内存,mcBuff是Multicast buffer,对他的访问将被NVSwitch广播到所有被添加到mcHandle的device。

然后开始将内存记录到各个peer的connection的buff。

ncclResult_t ncclNvlsSetup(struct ncclComm* comm, struct ncclComm* parent) {...for (int h = 0; h < nHeads; h++) {int nvlsPeer = comm->nRanks + 1 + h;for (int c = 0; c < nChannels; c++) {struct ncclChannel* channel = comm->channels + c;char* mem = NULL;struct ncclChannelPeer* peer = channel->peers[nvlsPeer];// Reduce UC -> MCmem = resources->ucBuff + (h * 2 * nChannels + c) * (buffSize + memSize);peer->send[1].transportComm = &nvlsTransport.send;peer->send[1].conn.buffs[NCCL_PROTO_SIMPLE] = mem;peer->send[1].conn.head = (uint64_t*)(mem + buffSize);peer->send[1].conn.tail = (uint64_t*)(mem + buffSize + memSize / 2);mem = resources->mcBuff + (h * 2 * nChannels + c) * (buffSize + memSize);peer->recv[0].transportComm = &nvlsTransport.recv;peer->recv[0].conn.buffs[NCCL_PROTO_SIMPLE] = mem;peer->recv[0].conn.head = (uint64_t*)(mem + buffSize);peer->recv[0].conn.tail = (uint64_t*)(mem + buffSize + memSize / 2);peer->recv[0].conn.flags |= NCCL_NVLS_MIN_POLL;// Broadcast MC -> UCmem = resources->ucBuff + ((h * 2 + 1) * nChannels + c) * (buffSize + memSize);peer->recv[1].transportComm = &nvlsTransport.recv;peer->recv[1].conn.buffs[NCCL_PROTO_SIMPLE] = mem;peer->recv[1].conn.head = (uint64_t*)(mem + buffSize);peer->recv[1].conn.tail = (uint64_t*)(mem + buffSize + memSize / 2);mem = resources->mcBuff + ((h * 2 + 1) * nChannels + c) * (buffSize + memSize);peer->send[0].transportComm = &nvlsTransport.send;peer->send[0].conn.buffs[NCCL_PROTO_SIMPLE] = mem;peer->send[0].conn.head = (uint64_t*)(mem + buffSize);peer->send[0].conn.tail = (uint64_t*)(mem + buffSize + memSize / 2);peer->send[0].conn.flags |= NCCL_NVLS_MIN_POLL;CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[nvlsPeer]->send[0], &peer->send[0].conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), res, cleanup);CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[nvlsPeer]->recv[0], &peer->recv[0].conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), res, cleanup);CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[nvlsPeer]->send[1], &peer->send[1].conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), res, cleanup);CUDACHECKGOTO(cudaMemcpyAsync(&comm->channels[c].devPeersHostPtr[nvlsPeer]->recv[1], &peer->recv[1].conn, sizeof(struct ncclConnInfo), cudaMemcpyHostToDevice, comm->sharedRes->hostStream.cudaStream), res, cleanup);}}...
}

这一过程完成后将如图7所示。
在这里插入图片描述

图 7

图中黄色的为ucBuff,蓝色的为mcBuff,mcBuff即PTX中的multimem,multimem操作如下:

The multimem.* operations operate on multimem addresses and accesses all of the multiple memory locations which the multimem address points to.

以ld_reduce为例:

multimem.ld_reduce{.ldsem}{.scope}{.ss}.op.type d, [a];

假设GPU0对peer[0]->send[1].buff执行multimem.ld_reduce,这将会load GPU0和GPU1对应位置的数据,然后执行reduce,结果保存在d。

以ReduceScatter为例介绍一下kernel流程

2.19版本改动较多,所以在看nvls kernel之前,我们先介绍下新版kernel执行的过程。

kernel的转发

首先看下kernel是如何launch的,以及如何一步步执行到对应的proto,algo等对应的device函数中的。
由于有多种api,reduce类型,数据类型,算法,协议,而kernel个是这些变量笛卡尔积,所以nccl用generate.py生成这些kernel定义,主要生成两个数组,ncclDevKernelForFunc和ncclDevFuncTable,分别为global函数和device函数。

static ncclResult_t scheduleCollTasksToPlan(...) {...NCCLCHECK(computeColl(&info, &workFuncIndex, &workElem, &proxyOp));...if (!plan->kernelSpecialized) {plan->kernelFn = ncclDevKernelForFunc[workFuncIndex];plan->kernelSpecialized = ncclDevKernelForFuncIsSpecialized[workFuncIndex];}...
}

enqueue过程通过computeColl计算出workFuncIndex,然后记录下kernelFn为ncclDevKernelForFunc[workFuncIndex],以ReduceScatter sum为例,得到的workFuncIndex为485,查询ncclDevKernelForFunc[485]为ncclDevKernel_ReduceScatter_Sum_f32_RING_LL,那么launch kernel就会执行这一kernel,这里注意下,我们实际用的是SIMPLE协议,但是这个kerne为LL,我们看下如何进一步转发的。

DEFINE_ncclDevKernel(ReduceScatter_Sum_f32_RING_LL, ncclFuncReduceScatter, FuncSum, float, NCCL_ALGO_RING, NCCL_PROTO_LL, 483)
#define DEFINE_ncclDevKernel(suffix, coll, redop, ty, algo, proto, specializedFnId) \__global__ void ncclDevKernel_##suffix(struct ncclDevComm* comm, uint64_t channelMask, struct ncclWork* workHead) { \ncclKernelMain<specializedFnId, RunWork<coll, ty, redop<ty>, algo, proto>>(comm, channelMask, workHead); \}

ncclDevKernel_ReduceScatter_Sum_f32_RING_LL这个函数定义如上,specializedFnId为483,然后直接执行ncclKernelMain。

在看ncclKernelMain之前我们先看下现有参数信息存放。

__shared__ ncclShmemData ncclShmem;struct ncclShmemGroup {ncclConnInfo *recvConns[NCCL_MAX_NVLS_ARITY];ncclConnInfo *sendConns[NCCL_MAX_NVLS_ARITY];void* srcs[NCCL_MAX_NVLS_ARITY+1];void* dsts[NCCL_MAX_NVLS_ARITY+1];union {unpackGroupShmem unpack;} devicePlugin;
};struct ncclShmemData {struct ncclShmemGroup groups[NCCL_MAX_GROUPS];uint64_t redOpArgs[NCCL_MAX_NVLS_ARITY+1];int channelId;int aborted;alignas(16) struct ncclDevComm comm;alignas(16) struct ncclDevChannel channel;alignas(16) struct ncclWork work;alignas(16) union {unpackShmem unpack;} devicePlugin;
};

ncclShmem位于共享内存,存储了kernel需要的参数信息,比如channelId,comm,channel等,一个block中的线程都会使用这些信息用于收发数据。之前版本中一个block所有线程的peer是一样的,而新版中不同线程可能会对应不同的peer,比如send/recv,一个block可以收发8个peer,再比如本节介绍的nvls,一个block中不同warp使用流水线的方式完成整体流程,因此引入了数据结构ncclShmemGroup groups,一个group表示执行相同逻辑的线程组,group所需要的conn,srcs,dsts等信息存储在groups中。

template<int SpecializedFnId, typename SpecializedRunWork>
__device__ void ncclKernelMain(struct ncclDevComm* comm, uint64_t channelMask, struct ncclWork* workHead) {int tid = threadIdx.x;if (tid < WARP_SIZE) {int x = tid;if (channelMask & (1ull<<x)) {int y = __popcll(channelMask & ((1ull<<x)-1));if (blockIdx.x == y) ncclShmem.channelId = x;}   ... }__syncthreads(); // publish ncclShmem.channelIdint channelId = ncclShmem.channelId;...
}

选择block和channel的对应关系,就是计算当前block应该处理channel。

template<int SpecializedFnId, typename SpecializedRunWork>
__device__ void ncclKernelMain(struct ncclDevComm* comm, uint64_t channelMask, struct ncclWork* workHead) {...while (true) {// Notify host that all fifo reads are complete....if (0 <= SpecializedFnId && ncclShmem.work.header.funcIndex == (unsigned)SpecializedFnId) {SpecializedRunWork().run(&ncclShmem.work);} else {ncclDevFuncTable[ncclShmem.work.header.funcIndex]();}   int workIxNext = ncclShmem.work.header.workNext;__syncthreads();...}...
}	

funcIndex为485,而SpecializedFnId为483,因此会再次去ncclDevFuncTable中找funcIndex对应的函数,函数为ncclDevFunc_ReduceScatter_Sum_f32_RING_SIMPLE,这样就找到了需要执行的函数。

DEFINE_ncclDevFunc(ReduceScatter_Sum_f32_RING_SIMPLE, ncclFuncReduceScatter, FuncSum, float, NCCL_ALGO_RING, NCCL_PROTO_SIMPLE)#define DEFINE_ncclDevFunc(suffix, coll, redop, ty, algo, proto) \__device__ void ncclDevFunc_##suffix() { \RunWork<coll, ty, redop<ty>, algo, proto>().run(&ncclShmem.work); \}template<ncclFunc_t Fn, typename T, typename RedOp, int Algo, int Proto>
struct RunWork {// This __forceinline__ is necessary. The compiler was inserting a function call// here from the LL ncclKernel.__device__ __forceinline__ void run(ncclWork *w) {int wid = threadIdx.x / WARP_SIZE;ncclWorkElem* we = w->header.type == ncclWorkTypeRegColl ? &w->regElems[0].elem : &w->elems[0];int stride = w->header.type == ncclWorkTypeRegColl ? sizeof(ncclWorkElemReg) : sizeof(ncclWorkElem);#pragma unroll 1while ((char*)we + stride <= (char*)(w+1) && we->isUsed) {if (wid < we->nWarps) {RunWorkElement<Fn, T, RedOp, Algo, Proto>().run(we);}we = (ncclWorkElem*)((char*)we + stride);}}
};

我们看下这个函数的定义,可以得到,Fn为ncclFuncReduceScatter,T为float,RedOp为FuncSum<float>,algo为NCCL_ALGO_RING,协议为NCCL_PROTO_SIMPLE,然后开始执行runRing

  template<typename T, typename RedOp, typename Proto>__device__ __forceinline__ void runRing(ncclWorkElem *args) {...const ssize_t loopSize = nChannels*chunkSize;const ssize_t size = args->count;Primitives<T, RedOp, FanSymmetric<1>, 0, Proto, 0>prims(tid, nthreads, &ring->prev, &ring->next, args->sendbuff, args->recvbuff, args->redOpArg);for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) {ssize_t realChunkSize;...realChunkSize = int(realChunkSize);ssize_t chunkOffset = gridOffset + bid*int(realChunkSize);/// begin ReduceScatter steps ///ssize_t offset;int nelem = min(realChunkSize, size-chunkOffset);int rankDest;// step 0: push data to next GPUrankDest = ringRanks[nranks-1];offset = chunkOffset + rankDest * size;prims.send(offset, nelem);

第一步,执行自己rank对应的block数据的send,send就是将数据发送到下一个rank的buffer。

// k-2 steps: reduce and copy to next GPUfor (int j=2; j<nranks; ++j) {rankDest = ringRanks[nranks-j];offset = chunkOffset + rankDest * size;prims.recvReduceSend(offset, nelem);}

然后接下来的nranks - 2次步骤,执行recvReduceSend,就是将前一个rank发送到自己buffer中的数据和自己的用户输入数据中对应位置执行reduce,然后发送给下一个rank。

      // step k-1: reduce this buffer and data, which will produce the final resultrankDest = ringRanks[0];offset = chunkOffset + rankDest * size;prims.recvReduceCopy(offset, chunkOffset, nelem, /*postOp=*/true);}}
}

最后一次执行recvReduceCopy,就是将前一个rank发送过来的数据和自己用户输入中对应位置执行reduce,并拷贝到用户输出。

primitive初始化

回顾下ReduceScatter中的primitives的构造,recvPeers为ring中的前一个rank,sendPeers为ring中下一个rank,inputBuf和outputBuf为用户执行api提供的输入输出buff,group为默认参数0。redOpArg用于比如mean的操作,会被设置为nranks,在reduceCopy的时候会除以nranks,本例为sum操作,可以忽略redOpArg。

      Primitives<T, RedOp, FanSymmetric<1>, 0, Proto, 0>prims(tid, nthreads, &ring->prev, &ring->next, args->sendbuff, args->recvbuff, args->redOpArg);__device__ Primitives(int tid, int nthreads, int const *recvPeers, int const *sendPeers,void const *inputBuf, void *outputBuf, uint64_t redOpArg, uint8_t group=0,uint8_t connIndexRecv = 0, uint8_t connIndexSend = 0, struct ncclWorkElem* e = nullptr, int stepSize_=0):tid(tid), nthreads(nthreads), tidInBlock(threadIdx.x), group(group),stepSize(stepSize_ == 0 ? ncclShmem.comm.buffSizes[NCCL_PROTO_SIMPLE]/NCCL_STEPS/sizeof(T) : stepSize_) {} 

模板参数中Direct和P2p为0,Fan为FanSymmetric<1>,作用是记录有几个recv和send,此时MaxRecv和MaxArity均为1。

template<int MaxArity>
struct FanSymmetric {static constexpr int MaxRecv = MaxArity, MaxSend = MaxArity;int n;FanSymmetric() = default;__device__ FanSymmetric(int nrecv, int nsend): n(nrecv) {// assert(nrecv == nsend && nrecv <= MaxArity);}__device__ int nrecv() const { return n; }__device__ int nsend() const { return n; }
};

然后继续看初始化过程

  __device__ Primitives(...)// For send operations, we need an extra warp to overlap the threadfence and the copythis->nworkers = nthreads - (MaxSend > 0 && nthreads-WARP_SIZE >= 64 ? WARP_SIZE : 0); int nrecv=0, nsend=0;while (nrecv < MaxRecv && recvPeers[nrecv] != -1) nrecv++;while (nsend < MaxSend && sendPeers[nsend] != -1) nsend++;this->fan = Fan(nrecv, nsend);constexpr int ThreadPerSync = 8;static_assert(MaxSend <= ThreadPerSync && MaxRecv <= ThreadPerSync, "Not enough threads to cover all peers");int g = tid / ThreadPerSync;int ng = nthreads / ThreadPerSync;index = tid % ThreadPerSync;flags = 0;if (g == 0) {if (index < nrecv) flags |= RoleWaitRecv;if (index == nrecv) flags |= RoleInput;} else if (g == 1) {if (index < nsend) flags |= RoleWaitSend;if (index == nsend) flags |= RoleOutput;} else if (g == ng - 2) {if (index < nrecv) flags |= RolePostRecv;} else if (g == ng - 1) {if (index < nsend) flags |= RolePostSend;}...}

nthreads为执行的总线程数,本例子中等于block中的线程数,nworkers是实际干活的线程数,由于发送的时候需要一个warp执行threadfence,因此实际干活的线程数为nthreads减去一个warp,不过当总的warp数少的时候就不会使用独立的同步warp。记录nsend和nrecv,此时均为1。

然后开始设置每个线程的role,将nthreads按照8分成多个小组,假设一共有n-1个组,假设recvPeer和sendPeer都有两个,那么各个线程的role分配如图8所示,其中WaitRecv表示这个线程负责等待直到fifo中有数据可以接收,本例中g[0]的thr[0]负责等待第0个recvPeer,thr[1]负责第1个recvPeer,Input线程负责写入用户buff的地址,PostRecv负责在接收到数据后通知recvPeer,g[n-2]的thr[0]负责通知第0个recvPeer,thr[1]负责通知第1个recvPeer,同理对于send。
在这里插入图片描述

图 8

  __device__ __forceinline__ void loadRecvConn(ncclDevChannelPeer *peer, int connIndex, struct ncclWorkElem* e) {if (flags & (RoleWaitRecv|RolePostRecv)) {auto *conn = &peer->recv[connIndex];step = conn->step;step = roundUp(step, SlicePerChunk*StepPerSlice);if (flags & RolePostRecv) {connStepPtr = conn->head;*connStepPtr = step; // Return credits in case we rounded up.}if (flags & RoleWaitRecv) {ncclShmem.groups[group].recvConns[index] = conn; // WaitRecv role saves since that's who needs it in setDataPtrs()flags |= (conn->flags & NCCL_NVLS_MIN_POLL) ? NvlsMinPolling : 0;connStepPtr = conn->tail;connStepCache = loadStepValue(connStepPtr);flags |= (conn->offsFifo != nullptr) ? OffsFifoEnabled : 0;if (Direct) {...}if (flags & OffsFifoEnabled)connOffsFifoPtr = conn->offsFifo;connEltsFifo = (T*)conn->buffs[NCCL_PROTO_SIMPLE];}}}

只有RoleWaitRecv和RolePostRecv的线程才会执行loadRecvConn,读取step,之前章节中介绍过,step表示在fifo中的位置;RolePostRecv线程负责通知recvPeer,所以需要保存conn中的head指针到connStepPtr,RoleWaitRecv线程负责等待直到fifo中有新的数据,因此需要保存conn中的tail指针到connStepPtr,并将内容cache到connStepCache,以避免频繁的global mem读取,最后将conn->buff,即fifo,记录到connEltsFifo中。

  __device__ __forceinline__ void loadSendConn(ncclDevChannelPeer *peer, int connIndex, struct ncclWorkElem* e) {if (flags & (RoleWaitSend|RolePostSend)) {auto *conn = &peer->send[connIndex];step = conn->step;step = roundUp(step, SlicePerChunk*StepPerSlice);if (flags & RolePostSend) {connStepPtr = conn->tail;connEltsFifo = (T*)conn->buffs[NCCL_PROTO_SIMPLE];}if (flags & RoleWaitSend) {ncclShmem.groups[group].sendConns[index] = conn; // WaitSend role saves since that's who needs it in setDataPtrs()flags |= (conn->flags & NCCL_NVLS_MIN_POLL) ? NvlsMinPolling : 0;connStepPtr = conn->head;connStepCache = loadStepValue(connStepPtr);flags |= (conn->offsFifo != nullptr) ? OffsFifoEnabled : 0;if (flags & OffsFifoEnabled)connOffsFifoPtr = conn->offsFifo;connEltsFifo = (T*)conn->buffs[NCCL_PROTO_SIMPLE];...}}}

loadSendConn逻辑一样,RolePostSend线程负责通知send peer,因此持有tail指针,RoleWaitSend线程负责等待send peer,因此持有head指针,然后记录fifo。
最后是执行setDataPtrs,设置userBuff为用户的输入输出。

  __device__ void setDataPtrs(void const *inputBuf, void *outputBuf, uint64_t redOpArg, struct ncclWorkElemReg* e) {if (flags & RoleInput) {userBuff = (T*)inputBuf;ncclShmem.redOpArgs[0] = redOpArg;  // scaler for local input}   if (flags & RoleOutput) userBuff = (T*)outputBuf;...}

到这里就完成了初始化。

recvReduceSend

  __device__ __forceinline__ void recvReduceSend(intptr_t inpIx, int eltN, bool postOp=false) {genericOp<0, 0, 1, 1, Input, -1>(inpIx, -1, eltN, postOp);}template <int DirectRecv1, int DirectSend1, int Recv, int Send, int SrcBuf, int DstBuf>__device__ __forceinline__ void genericOp(intptr_t srcIx, intptr_t dstIx, int nelem, bool postOp) { constexpr int DirectRecv = 1 && Direct && DirectRecv1;constexpr int DirectSend = 1 && Direct && DirectSend1;constexpr int Src = SrcBuf != -1; constexpr int Dst = DstBuf != -1;nelem = nelem < 0 ? 0 : nelem;int sliceSize = stepSize*StepPerSlice;sliceSize = max(divUp(nelem, 16*SlicePerChunk)*16, sliceSize/32);int slice = 0;int offset = 0;...}

模板参数中Recv表示是否需要执行recv,Send表示是否执行Send,SrcBuf表示输入中是否有用户的src buff,DstBuf表示输出中是否包含了用户的dst buff。然后计算得到DirectRecv和DirectSend为0,Src为1,Dst为0。

  template <int DirectRecv1, int DirectSend1, int Recv, int Send, int SrcBuf, int DstBuf>__device__ __forceinline__ void genericOp(intptr_t srcIx, intptr_t dstIx, int nelem, bool postOp) { ...if (tid < nworkers && offset < nelem) {do {sliceSize = sliceSize < nelem-offset ? sliceSize : nelem-offset;if (Src && (flags & (SrcBuf==Input ? RoleInput : RoleOutput)))ncclShmem.groups[group].srcs[0] = userBuff + srcIx + offset;if (Dst && (flags & (DstBuf==Input ? RoleInput : RoleOutput)))ncclShmem.groups[group].dsts[0] = userBuff + dstIx + offset;waitPeer<DirectRecv, DirectSend, Recv, Send, Src, Dst>(srcIx, dstIx, offset, sliceSize);...} while (slice < SlicePerChunk && offset < nelem);}   ...}

2.7.8中负责数据收发的工作线程和负责同步的线程都在一个循环里,会引入很多分支指令影响性能,新版中将逻辑拆分成了两个循环,以提高性能,工作线程执行第一个循环,同步线程执行第二个循环。

RoleInput线程将用户buff填入到srcs[0],然后执行waitPeer,waitPeer函数就是之前的waitSend和waitRecv,会等待直到可以发送和接收数据,并将数据地址填入到srcs和dsts。

  template <int DirectRecv, int DirectSend, int Recv, int Send, int Src, int Dst>__device__ __forceinline__ void waitPeer(intptr_t srcIx, intptr_t dstIx, int offset, int nelts) {const bool isSendNotRecv = (Send && Recv) ? (flags & RoleWaitSend) : Send;const bool noRecvWait = DirectRecv && Src && (flags & DirectRead);        // no wait when directly reading from remote inputconst bool noSendWait = DirectSend && (flags & (DirectRead|DirectWrite)); // no wait in empty send (e.g. directScatter) or direct remote writeif (((flags & (Recv*RoleWaitRecv)) && !noRecvWait) ||((flags & (Send*RoleWaitSend)) && !noSendWait)) {int spins = 0;while (connStepCache + (isSendNotRecv ? NCCL_STEPS : 0) < step + StepPerSlice) {connStepCache = loadStepValue(connStepPtr);if (checkAbort(spins)) break;}   }...}   

noRecvWait和noSendWait都为0,RoleWaitSend线程的isSendNotRecv为1,由于持有的connStepPtr是head指针,所以他判断等待的逻辑是如果head指针加上队列容量小于step + StepPerSlice,那么不能执行send,否则会超过队列容量,因此循环等待;而RoleWaitRecv线程的isSendNotRecv为0,由于持有的connStepPtr是tail指针,所以他判断等待逻辑是如果step + StepPerSlice超过了队尾指针,说明队列中已经没有数据了,那么就需要等待。

  template <int DirectRecv, int DirectSend, int Recv, int Send, int Src, int Dst>__device__ __forceinline__ void waitPeer(intptr_t srcIx, intptr_t dstIx, int offset, int nelts) {...if (flags & (Recv*RoleWaitRecv | Send*RoleWaitSend)) {if (isSendNotRecv && (flags & SizesFifoEnabled))connSizesFifoPtr[step%NCCL_STEPS] = nelts*sizeof(T);void **ptrs = isSendNotRecv ? (ncclShmem.groups[group].dsts + Dst): (ncclShmem.groups[group].srcs + Src);if (flags & OffsFifoEnabled)else if (isSendNotRecv && DirectSend) {} else if (!isSendNotRecv && DirectRecv) {}   else {ptrs[index] = connEltsFifo + (step%NCCL_STEPS)*stepSize;}   step += StepPerSlice;}   }

然后开始填充srcs和dsts数组,就是将自己持有fifo对应的slot填进去,然后更新step。所以对于recvReduceSend,srcs[0]为用户buff,srcs[1]为前一个ran的fifo,dsts[0]为下一个rank的fifo,因此就能达到前边描述的作用,接收前一个rank的数据,和用户的输入buff执行reduce,然后发送给下一个rank。

  template <int DirectRecv1, int DirectSend1, int Recv, int Send, int SrcBuf, int DstBuf>__device__ __forceinline__ void genericOp(intptr_t srcIx, intptr_t dstIx, int nelem, bool postOp) { ...if (tid < nworkers && offset < nelem) {do {...subBarrier();int workSize = ncclShmem.aborted ? 0 : sliceSize;if (DirectRecv && ncclShmem.groups[group].srcs[0] == ncclShmem.groups[group].dsts[0]} else if (DirectSend && !DirectRecv && SrcBuf != Input && ncclShmem.groups[group].dsts[Dst] == nullptr) {} else {constexpr int PreOpSrcs = SrcBuf != Input ? 0 : DirectRecv*MaxRecv == NCCL_MAX_DIRECT_ARITY ? (1+NCCL_MAX_DIRECT_ARITY) : 1;reduceCopy<Unroll, RedOp, T,MultimemSrcs, Recv+Src, Recv*MaxRecv+Src,MultimemDsts, Send+Dst, Send*MaxSend+Dst, PreOpSrcs>(tid, nworkers, ncclShmem.redOpArgs[0], ncclShmem.redOpArgs, postOp,Recv*fan.nrecv()+Src, ncclShmem.groups[group].srcs,Send*fan.nsend()+Dst, ncclShmem.groups[group].dsts,workSize);}   barrier(); // This barrier has a counterpart in following looppostPeer<Recv, Send>(0 < sliceSize);offset += sliceSize;slice += 1;} while (slice < SlicePerChunk && offset < nelem);}   while (slice < SlicePerChunk) {sliceSize = sliceSize < nelem-offset ? sliceSize : nelem-offset;barrier(); // Has couterpart in preceding worker-only loop.postPeer<Recv, Send>(0 < sliceSize);offset += sliceSize;slice += 1;}   }

waitPeer完成之后,说明可以执行数据收发了,这里先执行subBarrier(),作用是同步一下所有的工作线程,保证在waitPeer完成之后才进入收发数据的逻辑。然后执行reduceCopy将数据从srcs完成reduce并拷贝到dsts。然后执行barrier(),barrier所有的线程,即工作线程加同步线程,因为同步线程只有在等到数据收发结束才能开始post,然后看下同步线程执行的postPeer。

  template<int Recv, int Send>inline __device__ void postPeer(bool dataStored) {if (flags & (Recv*RolePostRecv | Send*RolePostSend)) {step += StepPerSlice;if (Send && (flags & RolePostSend) && dataStored) fence_acq_rel_sys();st_relaxed_sys_global(connStepPtr, step);}   }

RolePost类线程需要更新step,然后将step写入到connStepPtr,对于RolePostRecv,持有的是head指针,直接接入就好;对于RolePostSend,持有的是tail指针,为了保证先完成数据的写之后再完成post,需要加一个fence,这里使用了acq_rel的屏障,其实这个场景使用release语义就是足够的,不过查了一下PTX,好像没有单独的release语义指令。对于读数据的场景,也是需要配对使用读屏障,但是nccl的实现使用的是volatile,这样可以bypass L1 cache,因此不需要使用屏障。

nvls

ReduceScatter kernel

      if (tid < tidEndScatter) {// Scatterusing Proto = ProtoSimple<1, 1, COLL_UNROLL>;Primitives<T, RedOp, FanAsymmetric<0, NCCL_MAX_NVLS_ARITY>, /*Direct=*/0, Proto, 0>prims(tid, nThreadsScatter, NULL, nvls->up, args->sendbuff, NULL,args->redOpArg, 0 * Proto::MaxGroupWidth, 1, 1); for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) {ssize_t offset = gridOffset + bid * chunkSize;int nelem = min(chunkSize, size - offset);prims.scatter(offset, nvls->nHeads * size, nelem, size, -1, 0);}} 

scatter线程会通过prim执行scatter操作,sendPeers为up,因此就是所有的rank,inputBuf为用户的输入args->sendbuff,connIndexSend为1,因此load第1个send conn。

  __device__ __forceinline__ voidscatter(intptr_t inpIx, ssize_t totalElem, int peerElem, ssize_t peerOffset, int skip, int shift) {ScatterGatherOp<0, 0, 0, 1>(inpIx, -1, totalElem, peerElem, peerOffset, skip, shift, /*postOp=*/false);}template <int DirectRecv1, int DirectSend1, int Recv, int Send>__device__ __forceinline__ voidScatterGatherOp(intptr_t inpIx, intptr_t outIx, ssize_t totalElem, int peerElem, ssize_t peerOffset, int skip, int shift, bool postOp) {constexpr int DirectRecv = 1 && Direct && DirectRecv1;constexpr int DirectSend = 1 && Direct && DirectSend1;int offset = 0; // slice offsetint sliceSize = stepSize*StepPerSlice;int dataSize = max(DIVUP(peerElem, 16*SlicePerChunk)*16, sliceSize/32);  // per-peer slice size#pragma unrollfor (int slice=0; slice<SlicePerChunk; ++slice) {ssize_t realSize = max(0, min(dataSize, peerElem-offset));bool fenceNeeded = false;if (tid < nworkers) {if (Send) {// Scatter pre-scales data of input buffer only in non-Direct caseconstexpr int PreOpSrcs = DirectSend ? 0 : 1;if (flags & RoleInput) ncclShmem.groups[group].srcs[0] = userBuff + inpIx + offset;// realSize is not accurate here; but intra-node does not rely on sizes FIFOwaitPeer<0, DirectSend, 0, 1, 1, 0>(0, inpIx, offset, realSize);subBarrier();#pragma unroll// Loop over peersfor (int j=0; j<fan.nsend(); j++) {int i = (j+shift)%fan.nsend();ssize_t pOffset = i*peerOffset;// Skip the data I am responsible of reducing myselfif (skip >= 0 && i >= skip) pOffset += peerElem;void* src0 = (T*)ncclShmem.groups[group].srcs[0] + pOffset;ssize_t realPeerSize = min(realSize, totalElem-pOffset);if (realPeerSize > 0 && ncclShmem.groups[group].dsts[i] != nullptr) {reduceCopy<Unroll, RedOp, T, 0,1,1, 0,1,1, PreOpSrcs>(tid, nworkers, ncclShmem.redOpArgs[0], ncclShmem.redOpArgs, false, 1, &src0, 1, ncclShmem.groups[group].dsts+i, realPeerSize);// Mark for threadfence at the endfenceNeeded |= true;}}} else if (Recv) {}}fenceNeeded = barrierAny(fenceNeeded);postPeer<Recv, Send>(fenceNeeded);offset += realSize;}}

如图9所示,scatter做的就是将userBuff的数据按照peerOffset的间隔,发送到所有的sendPeer对应的buff,即图中的peer[0]->send[1].buff和peer[1]->send[1].buff。
在这里插入图片描述

图 9

对于reduce线程,sendPeers为NULL,recvPeers为nvls->down,connIndexRecv为1,因此load第一个recv conn,然后执行recv。

	  else if (tid < tidEndReduce) {// Reduce through NVLSusing Proto = ProtoSimple<1, 1, COLL_UNROLL, 1, 0>; Primitives<T, RedOp, FanAsymmetric<1, 0>, /*Direct=*/0, Proto, 0>prims(tid - tidEndScatter, nThreadsReduce, &nvls->down, NULL, NULL, args->recvbuff,args->redOpArg, 3 * Proto::MaxGroupWidth, 0, 0); for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) {ssize_t offset = gridOffset + bid * chunkSize;int nelem = min(chunkSize, size - offset);prims.recv(offset, nelem);}   }   

recv函数中执行会将dst设置为args->recvbuff,src为自己rank对应的Multicast buffer,如图10所示,执行完成之后,GPU0的recvBuff就拿到了所有卡对应值reduce的结果。
在这里插入图片描述

图 10

reduceCopy kernel

以ReduceScatter过程为例,我们看下reduceCopy kernel如何同时支持Unicast buffer和Multicast buffer的。

template<int Unroll, typename RedFn, typename T,int MultimemSrcs, int MinSrcs, int MaxSrcs,int MultimemDsts, int MinDsts, int MaxDsts, int PreOpSrcs,typename IntBytes>
__device__ __forceinline__ void reduceCopy(int thread, int nThreads,uint64_t redArg, uint64_t *preOpArgs, bool postOp,int nSrcs, void **srcPtrs, int nDsts, void **dstPtrs,IntBytes nElts) {int lane = thread%WARP_SIZE;// If a multimem src is present then our biggest pack size is limited to what// is supported for this redfn/type.constexpr int BigPackSize = (MultimemSrcs == 0) ? 16 : LoadMultimem_BigPackSize<RedFn>::BigPackSize;IntBytes nBytesBehind = 0;IntBytes nBytesAhead = nElts*sizeof(T);#if __cpp_if_constexprif constexpr (BigPackSize > sizeof(T)) {#elseif (BigPackSize > sizeof(T)) {#endif// Check that all pointers are BigPackSize aligned.bool aligned = true;if (lane < nSrcs) aligned &= 0 == cvta_to_global(srcPtrs[lane]) % (BigPackSize + !BigPackSize);if (lane < nDsts) aligned &= 0 == cvta_to_global(dstPtrs[lane]) % (BigPackSize + !BigPackSize);aligned = __all_sync(~0u, aligned);if (aligned) {reduceCopyPacks<RedFn, T, Unroll, BigPackSize,MultimemSrcs, MinSrcs, MaxSrcs, MultimemDsts, MinDsts, MaxDsts, PreOpSrcs>(nThreads, /*&*/thread, redArg, preOpArgs, postOp,nSrcs, srcPtrs, nDsts, dstPtrs, /*&*/nBytesBehind, /*&*/nBytesAhead);if (nBytesAhead == 0) return;reduceCopyPacks<RedFn, T, /*Unroll=*/1, BigPackSize,MultimemSrcs, MinSrcs, MaxSrcs, MultimemDsts, MinDsts, MaxDsts, PreOpSrcs>(nThreads, /*&*/thread, redArg, preOpArgs, postOp,nSrcs, srcPtrs, nDsts, dstPtrs, /*&*/nBytesBehind, /*&*/nBytesAhead);if (nBytesAhead == 0) return;}}...
}

模板参数中,MultimemSrcs表示有几个输入是multimem,MultimemDsts表示有几个输出是multimem,入参中thread为tid,nThreads为总的线程数,共有nSrcs个输入,地址位于srcPtrs,共有nDsts个输出,存放于dstPtrs,nElts为元素个数,函数的作用就是将所有的src reduce,然后存到所有的dst。
nBytesBehind表示已经处理过了多少数据,nBytesAhead表示还有多少数据未处理。
然后开始看是否可以用向量化指令,BigPackSize就是load/store指令的粒度,如果输入为非Multimem,那么尝试使用16字节,即128位,如果是multimem,则需要判断Func和数据类型,本例中为FuncSum,因此也是16字节。

  template<typename Fn>struct LoadMultimem_BigPackSize {using T = typename Fn::EltType;static constexpr bool IsSum = std::is_same<Fn, FuncSum<T>>::value ||std::is_same<Fn, FuncPreMulSum<T>>::value ||std::is_same<Fn, FuncSumPostDiv<T>>::value;static constexpr bool IsMinMax = std::is_same<Fn, FuncMinMax<T>>::value;static constexpr bool IsFloat = IsFloatingPoint<T>::value;static constexpr int BigPackSize =IsFloat && IsSum && sizeof(T) < 8 ? 16 :IsFloat && IsSum ? 8 :IsFloat && IsMinMax && sizeof(T)==2 ? 16 :!IsFloat && (IsSum||IsMinMax) && sizeof(T)>=4 ? sizeof(T) :/*multimem.ld_reduce not supported:*/ 0;};

使用向量化的前提是输入输出有需要是对齐的,以对齐为例看下reduceCopyPacks的逻辑。

template<typename RedFn, typename T, int Unroll, int BytePerPack,int MultimemSrcs, int MinSrcs, int MaxSrcs,int MultimemDsts, int MinDsts, int MaxDsts, int PreOpSrcs,typename IntBytes>
__device__ __forceinline__ void reduceCopyPacks(int nThreads, int &thread,uint64_t redArg, uint64_t *preOpArgs, bool postOp,int nSrcs, void **srcPtrs, int nDsts, void **dstPtrs,IntBytes &nBytesBehind, IntBytes &nBytesAhead) { // A hunk is the amount of contiguous data a warp consumes per loop iteration// assuming all threads partake.constexpr int BytePerHunk = Unroll*WARP_SIZE*BytePerPack;int nWarps = nThreads/WARP_SIZE;int warp = thread/WARP_SIZE;int lane = thread%WARP_SIZE;...
}

BytePerPack就是BigPackSize,即16字节,假设Unroll为4,那么一个warp的访存模式如图11,一个蓝框的长度为32个16字节,BytePerHunk为一个warp一次性处理的连续数据长度,即4个蓝框,warp里的第一个线程会访问箭头指向的4个蓝框里的第一个16字节。
在这里插入图片描述

图 11

然后初始化各个线程的初始位置

__device__ __forceinline__ void reduceCopyPacks(...) {// This thread's initial position.IntBytes threadBytesBehind = nBytesBehind + (warp*BytePerHunk + lane*BytePerPack);IntBytes threadBytesAhead = nBytesAhead - (warp*BytePerHunk + lane*BytePerPack);// Number of hunks to be consumed over all warps.IntBytes nHunksAhead = nBytesAhead/(BytePerHunk + !BytePerHunk);// Advance collective position.nBytesBehind += nHunksAhead*BytePerHunk;nBytesAhead -= nHunksAhead*BytePerHunk;if (Unroll==1 && BytePerPack <= nBytesAhead) {// Only Unroll=1 can do partial hunks (where not all threads partake).nHunksAhead += 1;nBytesBehind += nBytesAhead - (nBytesAhead%(BytePerPack + !BytePerPack));nBytesAhead = nBytesAhead%(BytePerPack + !BytePerPack);}nHunksAhead -= warp;RedFn redFn(redArg);uintptr_t minSrcs[MinSrcs + !MinSrcs];uintptr_t minDsts[MinDsts + !MinDsts];#pragma unrollfor (int s=0; s < MinSrcs; s++)minSrcs[s] = cvta_to_global(srcPtrs[s]) + threadBytesBehind;#pragma unrollfor (int d=0; d < MinDsts; d++)minDsts[d] = cvta_to_global(dstPtrs[d]) + threadBytesBehind;...
}

threadBytesBehind即当前线程的起始位置,threadBytesAhead为当前线程需要处理的数据量,然后将MinSrcs个src和minDsts个dst指针记录到minSrcs和minDsts。

__device__ __forceinline__ void reduceCopyPacks(...) {...while (Unroll==1 ? (BytePerPack <= threadBytesAhead) : (0 < nHunksAhead)) {BytePack<BytePerPack> acc[Unroll];{ RedFn preFn(0 < PreOpSrcs ? preOpArgs[0] : 0);#pragma unroll Unrollfor (int u=0; u < Unroll; u++) {if (0 < MultimemSrcs) {// applyLoadMultimem uses relaxed semantics for same reason we use volatile below.acc[u] = applyLoadMultimem<RedFn, BytePerPack>(redFn, minSrcs[0]);} else {// Use volatile loads in case credits are polled for with volatile (instead of acquire).acc[u] = ld_volatile_global<BytePerPack>(minSrcs[0]);if (0 < PreOpSrcs) acc[u] = applyPreOp(preFn, acc[u]);}minSrcs[0] += WARP_SIZE*BytePerPack;}}...}

BytePack在这个场景下就是16字节,由一个union描述,acc用于存储reduce结果。

template<>
union alignas(16) BytePack<16> {BytePack<8> half[2];uint8_t u8[16];uint16_t u16[8];uint32_t u32[4];uint64_t u64[2];ulong2 ul2, native;
};

然后开始初始化acc,如果输入中没有multimem,那么会通过ld_volatile_global将第一个蓝框的对应的128b load到acc[0],然后循环Unroll,将所有蓝框的对应数据load到acc,然后看下ld_volatile_global。

#define DEFINE_ld_st_16__space(space, addr_cxx_ty, addr_reg_ty) \template<> \__device__ __forceinline__ BytePack<16> ld_##space<16>(addr_cxx_ty addr) { \BytePack<16> ans; \asm("ld." #space ".v2.b64 {%0,%1}, [%2];" : "=l"(ans.u64[0]), "=l"(ans.u64[1]) : #addr_reg_ty(addr)); \return ans; \} \template<> \__device__ __forceinline__ BytePack<16> ld_volatile_##space<16>(addr_cxx_ty addr) { \BytePack<16> ans; \asm("ld.volatile." #space ".v2.b64 {%0,%1}, [%2];" : "=l"(ans.u64[0]), "=l"(ans.u64[1]) : #addr_reg_ty(addr)); \return ans; \} \template<> \__device__ __forceinline__ void st_##space<16>(addr_cxx_ty addr, BytePack<16> value) { \asm("st." #space ".v2.b64 [%0], {%1,%2};" :: #addr_reg_ty(addr), "l"(value.u64[0]), "l"(value.u64[1]) : "memory"); \}
DEFINE_ld_st_16__space(global, uintptr_t, l)

BytePerPack为16,因此会执行ld_volatile_global<16>,其实就是ld.volatile.global.v2.b64将128b load到ans的u64[0]和u64[1]。
而当输入中包括了multimem之后,会通过applyLoadMultimem将数据load并reduce然后存储到acc;st_global<16>就是将BytePack的u64[0]和u64[1]通过st.global.v2.b64存储到addr。

#define SIZEOF_BytePack_field_u32 4
#define PTX_REG_BytePack_field_u32 "r"
DEFINE_Apply_LoadMultimem_sum_v4(float, f32, u32)
#define DEFINE_Apply_LoadMultimem_sum_v4(T, ptx_ty, pack_field) \template<> \struct Apply_LoadMultimem<FuncSum<T>, 4*(SIZEOF_BytePack_field_##pack_field)> { \static constexpr int PackSize = 4*(SIZEOF_BytePack_field_##pack_field); \__device__ static BytePack<PackSize> load(FuncSum<T> fn, uintptr_t addr) { \BytePack<PackSize> ans; \asm("multimem.ld_reduce.relaxed.sys.global.add.v4." #ptx_ty " {%0,%1,%2,%3}, [%4];" \: "=" PTX_REG_BytePack_field_##pack_field(ans.pack_field[0]), \"=" PTX_REG_BytePack_field_##pack_field(ans.pack_field[1]), \"=" PTX_REG_BytePack_field_##pack_field(ans.pack_field[2]), \"=" PTX_REG_BytePack_field_##pack_field(ans.pack_field[3]) \: "l"(addr)); \return ans; \} \};

可以看到用的是multimem.ld_reduce.relaxed.sys.global.add.v4.f32将对4个float都执行了reduce操作结果存到acc。

完成第一个src的读取之后,继续读取其他的src到tmp,然后通过Apply_Reduce 执行reduce操作,Apply_Reduce其实就是将4个float执行elmentwise的求和。

__device__ __forceinline__ void reduceCopyPacks(...) {...while (Unroll==1 ? (BytePerPack <= threadBytesAhead) : (0 < nHunksAhead)) {...#pragma unroll (MinSrcs-1 + !(MinSrcs-1))for (int s=1; s < MinSrcs; s++) {BytePack<BytePerPack> tmp[Unroll];RedFn preFn(s < PreOpSrcs ? preOpArgs[s] : 0);#pragma unroll Unrollfor (int u=0; u < Unroll; u++) {if (s < MultimemSrcs) {// applyLoadMultimem uses relaxed semantics for same reason we use volatile below.acc[u] = applyLoadMultimem<RedFn, BytePerPack>(redFn, minSrcs[s]);} else {// Use volatile loads in case credits are polled for with volatile (instead of acquire).tmp[u] = ld_volatile_global<BytePerPack>(minSrcs[s]);}minSrcs[s] += WARP_SIZE*BytePerPack;}#pragma unroll Unrollfor (int u=0; u < Unroll; u++) {if (s < PreOpSrcs) tmp[u] = applyPreOp(preFn, tmp[u]);acc[u] = applyReduce(redFn, acc[u], tmp[u]);}}for (int s=MinSrcs; (MinSrcs < MaxSrcs) && (s < MaxSrcs) && (s < nSrcs); s++) {uintptr_t src = cvta_to_global(srcPtrs[s]) + threadBytesBehind;BytePack<BytePerPack> tmp[Unroll];RedFn preFn(s < PreOpSrcs ? preOpArgs[s] : 0);#pragma unroll Unrollfor (int u=0; u < Unroll; u++) {// Use volatile loads in case credits are polled for with volatile (instead of acquire).tmp[u] = ld_volatile_global<BytePerPack>(src);src += WARP_SIZE*BytePerPack;}#pragma unroll Unrollfor (int u=0; u < Unroll; u++) {if (s < PreOpSrcs) tmp[u] = applyPreOp(preFn, tmp[u]);acc[u] = applyReduce(redFn, acc[u], tmp[u]);}}...}...
}
template<typename Fn, int EltPerPack>
struct Apply_Reduce {template<int Size>__device__ static BytePack<Size> reduce(Fn fn, BytePack<Size> a, BytePack<Size> b) {a.half[0] = Apply_Reduce<Fn, EltPerPack/2>::reduce(fn, a.half[0], b.half[0]);a.half[1] = Apply_Reduce<Fn, EltPerPack/2>::reduce(fn, a.half[1], b.half[1]);return a;}
};
template<typename T>
struct Apply_Reduce<FuncSum<T>, /*EltPerPack=*/1> {__device__ static BytePack<sizeof(T)> reduce(FuncSum<T> fn, BytePack<sizeof(T)> a, BytePack<sizeof(T)> b) {return toPack<T>(fromPack<T>(a) + fromPack<T>(b));}
};

到这里就拿到了所有输入reduce的结果,然后开始存储到所有的输出dst。

__device__ __forceinline__ void reduceCopyPacks(...) {...while (Unroll==1 ? (BytePerPack <= threadBytesAhead) : (0 < nHunksAhead)) {...#pragma unroll (MinDsts + !MinDsts)for (int d=0; d < MinDsts; d++) {#pragma unroll Unrollfor (int u=0; u < Unroll; u++) {if (d < MultimemDsts) {multimem_st_global(minDsts[d], acc[u]);} else {st_global<BytePerPack>(minDsts[d], acc[u]);}minDsts[d] += WARP_SIZE*BytePerPack;}}for (int d=MinDsts; (MinDsts < MaxDsts) && (d < MaxDsts) && (d < nDsts); d++) {uintptr_t dst = cvta_to_global(dstPtrs[d]) + threadBytesBehind;#pragma unroll Unrollfor (int u=0; u < Unroll; u++) {st_global<BytePerPack>(dst, acc[u]);dst += WARP_SIZE*BytePerPack;}}}...
}  

到这里就完成了reduceCopy的过程,这里再提一下nvls场景的head,tail等flag都是mcBuff,我们看下waitPeer中如何判断是否需要等待的。

  inline __device__ uint64_t loadStepValue(uint64_t* ptr) {#if __CUDA_ARCH__ >= 900 && CUDART_VERSION >= 12010if (flags & NvlsMinPolling) {uint64_t ans;asm("multimem.ld_reduce.acquire.sys.global.min.u64 %0, [%1];" : "=l"(ans) : "l"(cvta_to_global(ptr)));return ans;}   #endif// volatile is faster than acquire but not as correct. Make sure reduceCopy// loads data using volatile so it doesn't see stale data in L1.return ld_volatile_global(ptr);}

当为nvls场景的时候,flags会有NvlsMinPolling,这里读取使用的是multimem.ld_reduce读取所有peer的step,然后取min,因此知道所有peer都准备好数据时候,才会接收数据,这里使用aquire语义,和postPeer的release配对,保证内存序。

然后看下postPeer

  template<int Recv, int Send>inline __device__ void postPeer(bool dataStored) {if (flags & (Recv*RolePostRecv | Send*RolePostSend)) {step += StepPerSlice;if (Send && (flags & RolePostSend) && dataStored) fence_acq_rel_sys();st_relaxed_sys_global(connStepPtr, step);}}

这里会使用非multimem的指令去写mcBuff,PTX手册中说这是一种未定义的行为,但是官方说对于写操作是可以的。

AllReduce

allreduce kernel主要有三种线程组,scatter线程逻辑如下

      using Proto = ProtoSimple<1, 1, COLL_UNROLL>;Primitives<T, RedOp, FanAsymmetric<0, NCCL_MAX_NVLS_ARITY>, /*Direct=*/0, Proto, 0>prims(tid, nThreadsScatter, NULL, nvls->up, args->sendbuff, NULL,args->redOpArg, 0 * Proto::MaxGroupWidth, 1, 1); for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) {ssize_t offset = gridOffset + bid * nvls->nHeads * chunkSize;int nelem = args->regUsed ? 0 : min(nvls->nHeads * chunkSize, size - offset);prims.scatter(offset, nelem, chunkSize, chunkSize, -1, 0); }   

和ring allreduce一样,nvls一次循环大小为nranks * chunkSize,即变量nlem或者loopSize,scatter线程负责将sendbuff中的数据发送到nvls->up,connIndexSend为1,因此使用第一个send conn。执行完之后如图12所示。
在这里插入图片描述

图 12

reduce线程组逻辑如下所示。
	else if (tid < tidEndReduce && nvls->headRank != -1) {if (!hasOut) {// Reduce, broadcast through NVLSusing Proto = ProtoSimple<1, 1, COLL_UNROLL, 1, 1>;Primitives<T, RedOp, FanSymmetric<1>, /*Direct=*/1, Proto, 0>prims(tid - tidEndGather, nThreadsReduce, &nvls->down, &nvls->down, NULL, NULL,args->redOpArg, 2 * Proto::MaxGroupWidth, 0, 0, args);for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) {ssize_t offset = gridOffset + (bid * nvls->nHeads + nvls->headRank) * chunkSize;int nelem = min(chunkSize, size - offset);prims.directRecvDirectSend(offset, offset, nelem);}}...}

这里MultimemSrcs和MultimemDsts为1,使用第0个conn,因此执行directRecvDirectSend之后效果如图13,GPU 0执行流程如黄色箭头,将两卡的浅黄数据块reduce后得到深黄数据块,然后通过multimem.st将数据广播到两卡,同理GPU 1为绿色箭头,将两卡浅蓝数据块reduce后得到深蓝数据块,此时两卡都有了全局数据。
在这里插入图片描述

图 13

最后是gather线程负责将到嗯全局数据拷贝到recvbuff。
    } else if (tid < tidEndGather) {// Gatherusing Proto = ProtoSimple<1, 1, COLL_UNROLL>;Primitives<T, RedOp, FanAsymmetric<NCCL_MAX_NVLS_ARITY, 0>, /*Direct=*/0, Proto, 0>prims(tid - tidEndScatter, nThreadsGather, nvls->up, NULL, NULL, args->recvbuff,args->redOpArg, 1 * Proto::MaxGroupWidth, 1, 1); for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) {ssize_t offset = gridOffset + bid * nvls->nHeads * chunkSize;int nelem = args->regUsed ? 0 :min(nvls->nHeads * chunkSize, size - offset);prims.gather(offset, nelem, chunkSize, chunkSize, -1, 0); }   

gather流程就是ScatterGatherOp执行recv分支,不再赘述,执行后如图14,到这里就完成了allreduce。
在这里插入图片描述

图 14

搜索时带宽

在这里插入图片描述

图 15
前边提到搜索channel时head对应的带宽需要乘2,以图15为例,回顾刚说的Allreduce过程,GPU 0执行ld_reduce,此时带宽消耗如图16

在这里插入图片描述

图 16
然后执行multimem.st,此时产生的带宽如图17,所以head需要2倍带宽。

在这里插入图片描述

图 17

这篇关于NVIDIA NCCL 源码学习(十四)- NVLink SHARP的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/917634

相关文章

HarmonyOS学习(七)——UI(五)常用布局总结

自适应布局 1.1、线性布局(LinearLayout) 通过线性容器Row和Column实现线性布局。Column容器内的子组件按照垂直方向排列,Row组件中的子组件按照水平方向排列。 属性说明space通过space参数设置主轴上子组件的间距,达到各子组件在排列上的等间距效果alignItems设置子组件在交叉轴上的对齐方式,且在各类尺寸屏幕上表现一致,其中交叉轴为垂直时,取值为Vert

Ilya-AI分享的他在OpenAI学习到的15个提示工程技巧

Ilya(不是本人,claude AI)在社交媒体上分享了他在OpenAI学习到的15个Prompt撰写技巧。 以下是详细的内容: 提示精确化:在编写提示时,力求表达清晰准确。清楚地阐述任务需求和概念定义至关重要。例:不用"分析文本",而用"判断这段话的情感倾向:积极、消极还是中性"。 快速迭代:善于快速连续调整提示。熟练的提示工程师能够灵活地进行多轮优化。例:从"总结文章"到"用

【前端学习】AntV G6-08 深入图形与图形分组、自定义节点、节点动画(下)

【课程链接】 AntV G6:深入图形与图形分组、自定义节点、节点动画(下)_哔哩哔哩_bilibili 本章十吾老师讲解了一个复杂的自定义节点中,应该怎样去计算和绘制图形,如何给一个图形制作不间断的动画,以及在鼠标事件之后产生动画。(有点难,需要好好理解) <!DOCTYPE html><html><head><meta charset="UTF-8"><title>06

学习hash总结

2014/1/29/   最近刚开始学hash,名字很陌生,但是hash的思想却很熟悉,以前早就做过此类的题,但是不知道这就是hash思想而已,说白了hash就是一个映射,往往灵活利用数组的下标来实现算法,hash的作用:1、判重;2、统计次数;

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

零基础学习Redis(10) -- zset类型命令使用

zset是有序集合,内部除了存储元素外,还会存储一个score,存储在zset中的元素会按照score的大小升序排列,不同元素的score可以重复,score相同的元素会按照元素的字典序排列。 1. zset常用命令 1.1 zadd  zadd key [NX | XX] [GT | LT]   [CH] [INCR] score member [score member ...]

【机器学习】高斯过程的基本概念和应用领域以及在python中的实例

引言 高斯过程(Gaussian Process,简称GP)是一种概率模型,用于描述一组随机变量的联合概率分布,其中任何一个有限维度的子集都具有高斯分布 文章目录 引言一、高斯过程1.1 基本定义1.1.1 随机过程1.1.2 高斯分布 1.2 高斯过程的特性1.2.1 联合高斯性1.2.2 均值函数1.2.3 协方差函数(或核函数) 1.3 核函数1.4 高斯过程回归(Gauss

【学习笔记】 陈强-机器学习-Python-Ch15 人工神经网络(1)sklearn

系列文章目录 监督学习:参数方法 【学习笔记】 陈强-机器学习-Python-Ch4 线性回归 【学习笔记】 陈强-机器学习-Python-Ch5 逻辑回归 【课后题练习】 陈强-机器学习-Python-Ch5 逻辑回归(SAheart.csv) 【学习笔记】 陈强-机器学习-Python-Ch6 多项逻辑回归 【学习笔记 及 课后题练习】 陈强-机器学习-Python-Ch7 判别分析 【学

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

系统架构师考试学习笔记第三篇——架构设计高级知识(20)通信系统架构设计理论与实践

本章知识考点:         第20课时主要学习通信系统架构设计的理论和工作中的实践。根据新版考试大纲,本课时知识点会涉及案例分析题(25分),而在历年考试中,案例题对该部分内容的考查并不多,虽在综合知识选择题目中经常考查,但分值也不高。本课时内容侧重于对知识点的记忆和理解,按照以往的出题规律,通信系统架构设计基础知识点多来源于教材内的基础网络设备、网络架构和教材外最新时事热点技术。本课时知识