分布式执行引擎ray入门--(3)Ray Train

2024-03-11 03:20

本文主要是介绍分布式执行引擎ray入门--(3)Ray Train,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Ray Train中包含4个部分

  1. Training function: 包含训练模型逻辑的函数

  2. Worker: 用来跑训练的

  3. Scaling configuration: 配置

  4. Trainer: 协调以上三个部分

Ray Train+PyTorch

这一块比较建议直接去官网看diff,官网色块标注的比较清晰,非常直观。

import os
import tempfileimport torch
from torch.nn import CrossEntropyLoss
from torch.optim import Adam
from torch.utils.data import DataLoader
from torchvision.models import resnet18
from torchvision.datasets import FashionMNIST
from torchvision.transforms import ToTensor, Normalize, Composeimport ray.train.torchdef train_func(config):# Model, Loss, Optimizermodel = resnet18(num_classes=10)model.conv1 = torch.nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)# model.to("cuda")  # This is done by `prepare_model`# [1] Prepare model.model = ray.train.torch.prepare_model(model)criterion = CrossEntropyLoss()optimizer = Adam(model.parameters(), lr=0.001)# Datatransform = Compose([ToTensor(), Normalize((0.5,), (0.5,))])data_dir = os.path.join(tempfile.gettempdir(), "data")train_data = FashionMNIST(root=data_dir, train=True, download=True, transform=transform)train_loader = DataLoader(train_data, batch_size=128, shuffle=True)# [2] Prepare dataloader.train_loader = ray.train.torch.prepare_data_loader(train_loader)# Trainingfor epoch in range(10):for images, labels in train_loader:# This is done by `prepare_data_loader`!# images, labels = images.to("cuda"), labels.to("cuda")outputs = model(images)loss = criterion(outputs, labels)optimizer.zero_grad()loss.backward()optimizer.step()# [3] Report metrics and checkpoint.metrics = {"loss": loss.item(), "epoch": epoch}with tempfile.TemporaryDirectory() as temp_checkpoint_dir:torch.save(model.module.state_dict(),os.path.join(temp_checkpoint_dir, "model.pt"))ray.train.report(metrics,checkpoint=ray.train.Checkpoint.from_directory(temp_checkpoint_dir),)if ray.train.get_context().get_world_rank() == 0:print(metrics)# [4] Configure scaling and resource requirements.
scaling_config = ray.train.ScalingConfig(num_workers=2, use_gpu=True)# [5] Launch distributed training job.
trainer = ray.train.torch.TorchTrainer(train_func,scaling_config=scaling_config,# [5a] If running in a multi-node cluster, this is where you# should configure the run's persistent storage that is accessible# across all worker nodes.# run_config=ray.train.RunConfig(storage_path="s3://..."),
)
result = trainer.fit()# [6] Load the trained model.
with result.checkpoint.as_directory() as checkpoint_dir:model_state_dict = torch.load(os.path.join(checkpoint_dir, "model.pt"))model = resnet18(num_classes=10)model.conv1 = torch.nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)model.load_state_dict(model_state_dict)

模型 

  ray.train.torch.prepare_model() 

model = ray.train.torch.prepare_model(model)
相当于model.to(device_id or "cpu") +  DistributedDataParallel(model, device_ids=[device_id])

将model移动到合适的device上,同时实现分布式

数据

ray.train.torch.prepare_data_loader() 

报告 checkpoints 和 metrics

+import ray.train
+from ray.train import Checkpointdef train_func(config):...torch.save(model.state_dict(), f"{checkpoint_dir}/model.pth"))
+    metrics = {"loss": loss.item()} # Training/validation metrics.
+    checkpoint = Checkpoint.from_directory(checkpoint_dir) # Build a Ray Train checkpoint from a directory
+    ray.train.report(metrics=metrics, checkpoint=checkpoint)...
data_loader = ray.train.torch.prepare_data_loader(data_loader)

将batches移动到合适的device上,同时实现分布式sampler

配置 scale 和 GPUs

from ray.train import ScalingConfig
scaling_config = ScalingConfig(num_workers=2, use_gpu=True)

配置持久化存储

多节点分布式训练时必须指定,本地路径会有问题。

from ray.train import RunConfig# Local path (/some/local/path/unique_run_name)
run_config = RunConfig(storage_path="/some/local/path", name="unique_run_name")# Shared cloud storage URI (s3://bucket/unique_run_name)
run_config = RunConfig(storage_path="s3://bucket", name="unique_run_name")# Shared NFS path (/mnt/nfs/unique_run_name)
run_config = RunConfig(storage_path="/mnt/nfs", name="unique_run_name")

启动训练任务

from ray.train.torch import TorchTrainertrainer = TorchTrainer(train_func, scaling_config=scaling_config, run_config=run_config
)
result = trainer.fit()

这篇关于分布式执行引擎ray入门--(3)Ray Train的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/796446

相关文章

Python FastAPI+Celery+RabbitMQ实现分布式图片水印处理系统

《PythonFastAPI+Celery+RabbitMQ实现分布式图片水印处理系统》这篇文章主要为大家详细介绍了PythonFastAPI如何结合Celery以及RabbitMQ实现简单的分布式... 实现思路FastAPI 服务器Celery 任务队列RabbitMQ 作为消息代理定时任务处理完整

Spring Boot + MyBatis Plus 高效开发实战从入门到进阶优化(推荐)

《SpringBoot+MyBatisPlus高效开发实战从入门到进阶优化(推荐)》本文将详细介绍SpringBoot+MyBatisPlus的完整开发流程,并深入剖析分页查询、批量操作、动... 目录Spring Boot + MyBATis Plus 高效开发实战:从入门到进阶优化1. MyBatis

Spring定时任务只执行一次的原因分析与解决方案

《Spring定时任务只执行一次的原因分析与解决方案》在使用Spring的@Scheduled定时任务时,你是否遇到过任务只执行一次,后续不再触发的情况?这种情况可能由多种原因导致,如未启用调度、线程... 目录1. 问题背景2. Spring定时任务的基本用法3. 为什么定时任务只执行一次?3.1 未启用

redis+lua实现分布式限流的示例

《redis+lua实现分布式限流的示例》本文主要介绍了redis+lua实现分布式限流的示例,可以实现复杂的限流逻辑,如滑动窗口限流,并且避免了多步操作导致的并发问题,具有一定的参考价值,感兴趣的可... 目录为什么使用Redis+Lua实现分布式限流使用ZSET也可以实现限流,为什么选择lua的方式实现

Seata之分布式事务问题及解决方案

《Seata之分布式事务问题及解决方案》:本文主要介绍Seata之分布式事务问题及解决方案,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Seata–分布式事务解决方案简介同类产品对比环境搭建1.微服务2.SQL3.seata-server4.微服务配置事务模式1

Python FastAPI入门安装使用

《PythonFastAPI入门安装使用》FastAPI是一个现代、快速的PythonWeb框架,用于构建API,它基于Python3.6+的类型提示特性,使得代码更加简洁且易于绶护,这篇文章主要介... 目录第一节:FastAPI入门一、FastAPI框架介绍什么是ASGI服务(WSGI)二、FastAP

MySQL常见的存储引擎和区别说明

《MySQL常见的存储引擎和区别说明》MySQL支持多种存储引擎,如InnoDB、MyISAM、MEMORY、Archive、CSV和Blackhole,每种引擎有其特点和适用场景,选择存储引擎时需根... 目录mysql常见的存储引擎和区别说明1. InnoDB2. MyISAM3. MEMORY4. A

grom设置全局日志实现执行并打印sql语句

《grom设置全局日志实现执行并打印sql语句》本文主要介绍了grom设置全局日志实现执行并打印sql语句,包括设置日志级别、实现自定义Logger接口以及如何使用GORM的默认logger,通过这些... 目录gorm中的自定义日志gorm中日志的其他操作日志级别Debug自定义 Loggergorm中的

MySQL InnoDB引擎ibdata文件损坏/删除后使用frm和ibd文件恢复数据

《MySQLInnoDB引擎ibdata文件损坏/删除后使用frm和ibd文件恢复数据》mysql的ibdata文件被误删、被恶意修改,没有从库和备份数据的情况下的数据恢复,不能保证数据库所有表数据... 参考:mysql Innodb表空间卸载、迁移、装载的使用方法注意!此方法只适用于innodb_fi

JavaScript中的reduce方法执行过程、使用场景及进阶用法

《JavaScript中的reduce方法执行过程、使用场景及进阶用法》:本文主要介绍JavaScript中的reduce方法执行过程、使用场景及进阶用法的相关资料,reduce是JavaScri... 目录1. 什么是reduce2. reduce语法2.1 语法2.2 参数说明3. reduce执行过程