NCCL集合通信算子DEMO及性能测试

2024-04-13 05:44

本文主要是介绍NCCL集合通信算子DEMO及性能测试,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

NCCL集合通信算子DEMO及性能测试

  • 一.复现代码

以下代码用于测试NCCL算子的性能及正确性

一.复现代码

tee ccl_benchmark.py <<-'EOF'
import os
import torch
import argparse
import torch.distributed as dist
from torch.distributed import ReduceOp
from datetime import datetime
import time
import argparse
import numpy as np
dev_type="cuda"class Timer:def __init__(self,duration):        self.duration=durationdef __enter__(self):dist.barrier()self.beg= datetime.now().timestamp() * 1e6def __exit__(self, exc_type, exc_val, exc_tb):dist.barrier()self.end=datetime.now().timestamp() * 1e6self.duration.append(self.end-self.beg)op_mapping={}
class ccl_benchmark:def __init__(self,func):global op_mapping  op_mapping[func.__name__]=funcself.func=funcdef __call__(self,*args,**kwargs):return self.func(*args,**kwargs)@ccl_benchmark
def all_gather(shape,device,rank,world_size,iters=5):'''将每个rank input_tensor的数据在dim 0维度拼接在一起'''duration=[]input_tensor=(torch.ones((shape[0]//world_size,shape[1]),dtype=torch.int64)*(100+rank)).to(device)gather_list=[torch.zeros((shape[0]//world_size,shape[1]),dtype=torch.int64).to(device) for _ in range(world_size)]for _ in range(iters):with Timer(duration):dist.all_gather(gather_list,input_tensor)   output=torch.cat(gather_list,dim=0)gt=[torch.ones((shape[0]//world_size,shape[1]),dtype=torch.int64)*(100+i) for i in range(world_size)]gt=torch.cat(gt,dim=0)return duration,(output.cpu()==gt).all()@ccl_benchmark
def scatter(shape,device,rank,world_size,iters=5):'''将每个rank从scatter_list[rank]取数据到output_tensor'''duration=[]output_tensor=torch.zeros((shape[0]//world_size,shape[1]),dtype=torch.int64).to(device)scatter_list=[(torch.ones((shape[0]//world_size,shape[1]),dtype=torch.int64)*i).to(device) for i in range(world_size)]for _ in range(iters):with Timer(duration):if rank == 0:dist.scatter(output_tensor,scatter_list=scatter_list,src =0)else:dist.scatter(output_tensor,src  = 0)gt=torch.ones((shape[0]//world_size,shape[1]),dtype=torch.int64)*rankreturn duration,(output_tensor.cpu()==gt).all()@ccl_benchmark
def gather(shape,device,rank,world_size,iters=5):'''将每个rank input_tensor的数据在dim 0维度拼接在一起 只在批定的rank做'''duration=[]input_tensor=(torch.ones((shape[0]//world_size,shape[1]),dtype=torch.int64)*(100+rank)).to(device)gather_list=[torch.zeros((shape[0]//world_size,shape[1]),dtype=torch.int64).to(device) for _ in range(world_size)]for _ in range(iters):with Timer(duration):if rank == 0:dist.gather(input_tensor,gather_list=gather_list,dst=0)else:dist.gather(input_tensor,dst=0)ret=Trueif rank==0:output=torch.cat(gather_list,dim=0)gt=[torch.ones((shape[0]//world_size,shape[1]),dtype=torch.int64)*(100+i) for i in range(world_size)]gt=torch.cat(gt,dim=0)ret=(output.cpu()==gt).all()return duration,ret@ccl_benchmark
def reduce(shape,device,rank,world_size,iters=5):'''将每个rank input_tensor的数据在dim 0维度拼接在一起 只在批定的rank做'''duration=[]   for _ in range(iters):input_tensor=(torch.ones((shape[0],shape[1]),dtype=torch.int64)*(100+rank)).to(device)# input_tensor的内容会被修改,所以放在循环里with Timer(duration):dist.reduce(input_tensor,dst=0,op=dist.ReduceOp.SUM)ret=Trueif rank==0:gt=[torch.ones((shape[0],shape[1]),dtype=torch.int64)*(100+i) for i in range(world_size)]gt_=gt[0]       for i in range(1,world_size):gt_=gt_+gt[i]ret=(input_tensor.cpu()==gt_).all()return duration,ret@ccl_benchmark
def broadcast(shape,device,rank,world_size,iters=5):'''将src的rank的数据广播到其它rank'''duration=[]   for _ in range(iters):input_tensor=(torch.ones((shape[0],shape[1]),dtype=torch.int64)*(100+rank)).to(device)with Timer(duration):dist.broadcast(input_tensor,src=0)gt=(torch.ones((shape[0],shape[1]),dtype=torch.int64)*(100+0)).to('cpu')ret=(input_tensor.cpu()==gt).all()return duration,ret@ccl_benchmark
def p2p(shape,device,rank,world_size,iters=5):'''将src的rank的数据广播到其它rank'''duration=[]   for _ in range(iters):input_tensor=(torch.ones((shape[0],shape[1]),dtype=torch.int64)*(100+rank)).to(device)with Timer(duration):if rank!=0:dist.recv(input_tensor,rank-1)               if rank!=world_size-1:               dist.send(input_tensor,dst=rank+1)   gt=(torch.ones((shape[0],shape[1]),dtype=torch.int64)*(100+0)).to('cpu')ret=(input_tensor.cpu()==gt).all()return duration,ret@ccl_benchmark
def all_reduce(shape,device,rank,world_size,iters=5):'''将每个rank input_tensor的数据在dim 0维度拼接在一起'''duration=[]   for _ in range(iters):input_tensor=(torch.ones((shape[0],shape[1]),dtype=torch.int64)*(100+rank)).to(device)# input_tensor的内容会被修改,所以放在循环里with Timer(duration):dist.all_reduce(input_tensor,op=dist.ReduceOp.SUM)gt=[torch.ones((shape[0],shape[1]),dtype=torch.int64)*(100+i) for i in range(world_size)]gt_=gt[0]       for i in range(1,world_size):gt_=gt_+gt[i]ret=(input_tensor.cpu()==gt_).all()return duration,ret@ccl_benchmark
def reduce_scatter(shape,device,rank,world_size,iters=5):''''''duration=[]output_tensor=torch.zeros((shape[0]//world_size,shape[1]),dtype=torch.int64).to(device)input_list=[(torch.ones((shape[0]//world_size,shape[1]),dtype=torch.int64)*(100*rank)+chunk_id).to(device) for chunk_id in range(world_size)]for _ in range(iters):with Timer(duration):dist.reduce_scatter(output_tensor,input_list=input_list,op=dist.ReduceOp.SUM)gt_list=[(torch.ones((shape[0]//world_size,shape[1]),dtype=torch.int64)*(100*rk)+rank).to('cpu') for rk in range(world_size)]gt_=gt_list[0]       for i in range(1,world_size):gt_=gt_+gt_list[i]    return duration,(output_tensor.cpu()==gt_).all()def main():dist.init_process_group(backend='nccl')if not torch.distributed.is_initialized():returnparser = argparse.ArgumentParser(description='test')parser.add_argument('--shape', type=str, default="(1024,8192)", help='Number of epochs to train.')parser.add_argument('--iters', type=int, default=5, help='Number of epochs to train.')parser.add_argument('--op', type=str, default="", help='Number of epochs to train.')args = parser.parse_args()global op_mappingif args.op in op_mapping:torch.manual_seed(1)world_size = torch.distributed.get_world_size()rank = torch.distributed.get_rank()local_rank=int(os.environ['LOCAL_RANK'])torch.cuda.set_device(local_rank)device = torch.device(dev_type,local_rank)shape=eval(args.shape)duration,passed=op_mapping[args.op](shape,device,rank,world_size,args.iters)time.sleep(0.1*rank)print("rank:{} op:{} shape:{} iters:{} mean(us):{:.3f} passed:{}".format(rank,args.op,shape,args.iters,np.mean(duration[len(duration)//2:]),passed))dist.destroy_process_group()if __name__=='__main__':main()EOFexport NCCL_DEBUG=error
export NCCL_SOCKET_IFNAME=ens8
export NCCL_IB_DISABLE=1  
torchrun -m --nnodes=1 --nproc_per_node=4 ccl_benchmark --op=all_gather --shape="(1024,4096)" --iters=5
torchrun -m --nnodes=1 --nproc_per_node=4 ccl_benchmark --op=scatter --shape="(1024,4096)" --iters=5
torchrun -m --nnodes=1 --nproc_per_node=4 ccl_benchmark --op=gather --shape="(1024,4096)" --iters=5
torchrun -m --nnodes=1 --nproc_per_node=4 ccl_benchmark --op=reduce --shape="(1024,4096)" --iters=5
torchrun -m --nnodes=1 --nproc_per_node=4 ccl_benchmark --op=broadcast --shape="(1024,4096)" --iters=5
torchrun -m --nnodes=1 --nproc_per_node=4 ccl_benchmark --op=p2p --shape="(1024,4096)" --iters=5
torchrun -m --nnodes=1 --nproc_per_node=4 ccl_benchmark --op=all_reduce --shape="(1024,4096)" --iters=5
torchrun -m --nnodes=1 --nproc_per_node=4 ccl_benchmark --op=reduce_scatter --shape="(1024,4096)" --iters=5

这篇关于NCCL集合通信算子DEMO及性能测试的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/899274

相关文章

C#使用yield关键字实现提升迭代性能与效率

《C#使用yield关键字实现提升迭代性能与效率》yield关键字在C#中简化了数据迭代的方式,实现了按需生成数据,自动维护迭代状态,本文主要来聊聊如何使用yield关键字实现提升迭代性能与效率,感兴... 目录前言传统迭代和yield迭代方式对比yield延迟加载按需获取数据yield break显式示迭

Java实现任务管理器性能网络监控数据的方法详解

《Java实现任务管理器性能网络监控数据的方法详解》在现代操作系统中,任务管理器是一个非常重要的工具,用于监控和管理计算机的运行状态,包括CPU使用率、内存占用等,对于开发者和系统管理员来说,了解这些... 目录引言一、背景知识二、准备工作1. Maven依赖2. Gradle依赖三、代码实现四、代码详解五

如何测试计算机的内存是否存在问题? 判断电脑内存故障的多种方法

《如何测试计算机的内存是否存在问题?判断电脑内存故障的多种方法》内存是电脑中非常重要的组件之一,如果内存出现故障,可能会导致电脑出现各种问题,如蓝屏、死机、程序崩溃等,如何判断内存是否出现故障呢?下... 如果你的电脑是崩溃、冻结还是不稳定,那么它的内存可能有问题。要进行检查,你可以使用Windows 11

正则表达式高级应用与性能优化记录

《正则表达式高级应用与性能优化记录》本文介绍了正则表达式的高级应用和性能优化技巧,包括文本拆分、合并、XML/HTML解析、数据分析、以及性能优化方法,通过这些技巧,可以更高效地利用正则表达式进行复杂... 目录第6章:正则表达式的高级应用6.1 模式匹配与文本处理6.1.1 文本拆分6.1.2 文本合并6

基于Redis有序集合实现滑动窗口限流的步骤

《基于Redis有序集合实现滑动窗口限流的步骤》滑动窗口算法是一种基于时间窗口的限流算法,通过动态地滑动窗口,可以动态调整限流的速率,Redis有序集合可以用来实现滑动窗口限流,本文介绍基于Redis... 滑动窗口算法是一种基于时间窗口的限流算法,它将时间划分为若干个固定大小的窗口,每个窗口内记录了该时间

Vue3 的 shallowRef 和 shallowReactive:优化性能

大家对 Vue3 的 ref 和 reactive 都很熟悉,那么对 shallowRef 和 shallowReactive 是否了解呢? 在编程和数据结构中,“shallow”(浅层)通常指对数据结构的最外层进行操作,而不递归地处理其内部或嵌套的数据。这种处理方式关注的是数据结构的第一层属性或元素,而忽略更深层次的嵌套内容。 1. 浅层与深层的对比 1.1 浅层(Shallow) 定义

性能测试介绍

性能测试是一种测试方法,旨在评估系统、应用程序或组件在现实场景中的性能表现和可靠性。它通常用于衡量系统在不同负载条件下的响应时间、吞吐量、资源利用率、稳定性和可扩展性等关键指标。 为什么要进行性能测试 通过性能测试,可以确定系统是否能够满足预期的性能要求,找出性能瓶颈和潜在的问题,并进行优化和调整。 发现性能瓶颈:性能测试可以帮助发现系统的性能瓶颈,即系统在高负载或高并发情况下可能出现的问题

字节面试 | 如何测试RocketMQ、RocketMQ?

字节面试:RocketMQ是怎么测试的呢? 答: 首先保证消息的消费正确、设计逆向用例,在验证消息内容为空等情况时的消费正确性; 推送大批量MQ,通过Admin控制台查看MQ消费的情况,是否出现消费假死、TPS是否正常等等问题。(上述都是临场发挥,但是RocketMQ真正的测试点,还真的需要探讨) 01 先了解RocketMQ 作为测试也是要简单了解RocketMQ。简单来说,就是一个分

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

【测试】输入正确用户名和密码,点击登录没有响应的可能性原因

目录 一、前端问题 1. 界面交互问题 2. 输入数据校验问题 二、网络问题 1. 网络连接中断 2. 代理设置问题 三、后端问题 1. 服务器故障 2. 数据库问题 3. 权限问题: 四、其他问题 1. 缓存问题 2. 第三方服务问题 3. 配置问题 一、前端问题 1. 界面交互问题 登录按钮的点击事件未正确绑定,导致点击后无法触发登录操作。 页面可能存在