一个Job在OneFlow中的执行过程—上篇

2023-10-17 18:59
文章标签 过程 执行 job oneflow

本文主要是介绍一个Job在OneFlow中的执行过程—上篇,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

前言

之前,同事成诚创作了三篇文章:

  • 仅此一文让您掌握OneFlow框架的系统设计(上篇)

  • 仅此一文让您掌握OneFlow框架的系统设计(中篇)

  • 仅此一文让您掌握OneFlow框架的系统设计(下篇)

以Top down的视角,介绍了分布式深度学习框架OneFlow的Actor、SBP、流式调度等先进的分布式系统架构设计及理念,本文将建立在前面的基础上,将基于上述文章,以Bottom up的角度,从一个简单的预测任务的demo入手,讲解一个Job(用户定义的训练/预测任务)在Oneflow中的调用入口、数据流转过程、从python端到c++端的代码执行流程,记录学习和梳理的过程。

下一篇文章,将详细梳理Job编译构图、Plan构建融合的过程。

需要特别说明的是,此系列文章仅在于描述流程、重点模块的代码,oneflow正处于快速版本更新和迭代过程,所以很多api在未来可能会有较大幅度的变动。目前在推进的工作有interface 1.0工作,包含了对齐pytorch api、multi client设计、构图编译期优化等,敬请期待!


Job执行流程

下面通过一个简单的静态图Job,描述一下在这个Job执行过程中的整体流程。 这个名为pad_Job的任务很简单,完整代码就10多行,目的是为输入的矩阵x通过reflection pad,得到输出矩阵y(可看做是一个非常简单的预测网络,输入x输出y)。

其主要方法即调用flow.reflection_pad2d()的API实现:

import oneflow as flow
import oneflow.typing as tp
import numpy as np@flow.global_function()
def pad_Job(x: tp.Numpy.Placeholder((2, 1, 3, 3))) -> tp.Numpy:with flow.scope.placement("cpu", "0:0"):loss = flow.reflection_pad2d(x, padding=1)return lossx = np.arange(18).reshape((2, 1, 3, 3)).astype(np.float32)
y = pad_Job(x)
print("in:\n", x, "y:\n", y)

输出:

in:[[[[ 0.  1.  2.][ 3.  4.  5.][ 6.  7.  8.]]][[[ 9. 10. 11.][12. 13. 14.][15. 16. 17.]]]] 
y:[[[[ 4.  3.  4.  5.  4.][ 1.  0.  1.  2.  1.][ 4.  3.  4.  5.  4.][ 7.  6.  7.  8.  7.][ 4.  3.  4.  5.  4.]]][[[13. 12. 13. 14. 13.][10.  9. 10. 11. 10.][13. 12. 13. 14. 13.][16. 15. 16. 17. 16.][13. 12. 13. 14. 13.]]]]

首先,代码进入到global_function()中,由于是一个静态类型的job,故会触发执行_RunLazyJob()方法:

def _RunLazyJob(session, job_func, *args, **kwargs):return session.TryInit().LazyRun(job_func, *args, **kwargs)

在其中,会初始化一个session对象,用于负责所有被global_function()修饰的job的执行(此处只定义了一个user job);初始化机器的环境;根据input op、中间op、output op的信息推导出所有op的shape、sbp属性等信息,从而编译job,构建出物理图(graph);根据物理图和集群环境等信息生成执行计划plan,并最终运行整个plan,返回输出数据,销毁session,完成整个job的执行过程。

代码调用过程

下面,还是通过这个简单的pad_Job,来讲解运行时从python到c++层面的代码调用流程,简单梳理下oneflow框架中的各个系统模块及代码调用流程。

import oneflow as flow
import oneflow.typing as tp
import numpy as np@flow.global_function()
def pad_Job(x: tp.Numpy.Placeholder((2, 1, 3, 3))) -> tp.Numpy:with flow.scope.placement("cpu", "0:0"):loss = flow.reflection_pad2d(x, padding=1)return lossx = np.arange(18).reshape((2, 1, 3, 3)).astype(np.float32)
y = pad_Job(x)
print("in:\n", x, "y:\n", y)

调用入口

首先,@flow.global_function()作为job调用的入口,会触发lazy_oneflow_function()[1]方法:

@enable_if.condition(hob.in_normal_mode & ~hob.eager_execution_enabled & ~hob.session_initialized
)
def lazy_oneflow_function(function_config=FunctionConfig()):assert isinstance(function_config, FunctionConfig)def Decorator(job_func):if not hasattr(job_func, "__oneflow_function_signature__"):job_func.__oneflow_function_signature__ = inspect.signature(job_func)oft_util.CheckGlobalFunctionAnnotation(job_func.__oneflow_function_signature__)sess = session_ctx.GetDefaultSession()@functools.wraps(job_func)def Func(*args, **kwargs):return _RunLazyJob(sess, job_func, *args, **kwargs)sess.AddJob(_CloneFunctionDesc(function_config.function_desc, job_func))for x in dir(job_func):if x.startswith("__oneflow_"):setattr(Func, x, getattr(job_func, x))return Funcreturn Decorator

其中,lazy_oneflow_function()定义了一个闭包函数Decorator(),Decorator()中将job添加至session,并返回一个Func函数,当Func()被调用时,实际执行的是_RunLazyJob()[2]

def _RunLazyJob(session, job_func, *args, **kwargs):return session.TryInit().LazyRun(job_func, *args, **kwargs)

_RunLazyJob()将通过TryInit()方法将Session类对象初始化,并将用户在global function中定义的user job、系统的push/pull job、model io job等所有job编译成物理图task graph,然后将物理图编译成整体的运行时计划—plan,最后调用LazyRun()触发运行。

注:user job即用户定义的job,通常为训练或者预测任务;push/pull job则是在用户的user job编译为可执行plan时,系统自动添加的用于处理输入输出的系统级job;model io job则是用于保存/加载模型的系统级job。见:CompileAndMergePlanOnMaster()[3]

数据流转

下面,我们跳出代码api层面,从数据层面看一下User Job的运行过程:首先,User Job可能有多个输入、多个输出,oneflow会遍历所有User Job中的Input Op和Return Op,针对每个Input Op,分别构建一个对应的Push Job;针对每个Return Op,分别构建一个对应的Pull Job。 User Job中,数据流转的示意图如下(看流程即可,不用细看数字):系统自动添加的Push Job用于接收输入数据,其ForeignInput Op[4] 内部维护一个buffer,该buffer等待Python端喂数据;Push Job处理完输入数据X1后,由于X1在Push Job和User Job间是内存共享的,可以直接被User Job所消费,从而继续被Op_a、Op_b处理,最后得到输出数据Y1;同样,系统添加的Pull Job专门用于处理输出数据,Pull Job中有一个ForeignOutput Op[5],其内部同样维护一个buffer,当往该buffer内填完数据以后,python端对应的of blob对象中的numpy就拷贝了对应的数据。从而完整整个从输入到输出的数据流转过程。

具体代码调用流程: python端:LaunchUserJob[6] >> push_util.AsyncPush() >> _AsyncPushArg() >> arg_blob_def.CheckAndAsyncPush(session, arg_ndarray) >> _MakePushNdarrayCallback() >> ofblob.CopyFromNdarray() >> oneflow_api.Dtype_GetOfBlobStaticTensorCopyFromBufferFuncName()[7] >> c++端:Dtype_GetOfBlobStaticTensorCopyFromBufferFuncName()[8]

TryInit()

TryInit()主要通过Init()[9]做了一些环境初始化、和静态图编译相关的事情,比如:

  • 1.初始化集群环境

  • 2.初始化静态图session对象

  • 3.编译Job逻辑图

  • 4.启动session,编译Job物理图&Plan生成 

  • 5.初始化model IO

代码如下:

def Init(self):assert self.status_ is SessionStatus.OPENself.status_ = SessionStatus.RUNNINGif not oneflow_api.IsEnvInited():oneflow.env.init()_TryCompleteConfigProto(self.config_proto)self.resource_ = self.config_proto.resourceif not oneflow_api.EagerExecutionEnabled():c_api_util.InitLazyGlobalSession(self.config_proto)for job_name, func_desc in self.job_name2function_desc_.items():compiler.Compile(self, func_desc, self.config_proto)self.existed_module_names_ = set()self.job_name2var_name2var_blob_ = dict()assert len(self.job_name2function_desc_.items()) > 0oneflow_api.StartLazyGlobalSession()self.inter_user_job_info_ = c_api_util.GetInterUserJobInfo()# Get latest op_attr and job_name after compiler.Compileself.UpdateInfo4InterfaceOp()if not config_util.api_legacy_model_io_enabled():check_point_v2.Init()else:self.eager_config_proto_ctx_ = oneflow_api.LogicalConfigProtoContext(str(self.config_proto))return self

1.初始化集群环境

主要通过oneflow.env.init() 初始化EnvProto对象,包含machine、ctrl_port、data_port、cpp_logging_conf这几个对象,用于记录集群环境中机器的环境信息(控制端口、数据端口)、log配置等信息。

2.初始化静态图session对象

python代码中的c_api_util.InitLazyGlobalSession()会调用session.h[10]中定义的InitLazyGlobalSession()[11]实际地创建一个c++的session对象。

inline Maybe<void> InitLazyGlobalSession(const std::string& config_proto_str) {CHECK_NOTNULL_OR_RETURN(Global<EnvDesc>::Get()) << "env not found";CHECK_OR_RETURN(Global<MachineCtx>::Get()->IsThisMachineMaster());ClusterInstruction::MasterSendSessionStart();ConfigProto config_proto;CHECK_OR_RETURN(TxtString2PbMessage(config_proto_str, &config_proto))<< "failed to parse config_proto: " << config_proto_str;FixCpuDeviceNum(&config_proto);Global<CtrlClient>::Get()->PushKV("config_proto", config_proto);CHECK_ISNULL_OR_RETURN(Global<SessionGlobalObjectsScope>::Get());Global<SessionGlobalObjectsScope>::SetAllocated(new SessionGlobalObjectsScope());JUST(Global<SessionGlobalObjectsScope>::Get()->Init(config_proto));LOG(INFO) << "NewGlobal " << typeid(SessionGlobalObjectsScope).name();return Maybe<void>::Ok();
}

首先,会通过Global::Get()->IsThisMachineMaster()来检查当前节点是否为master节点,是则继续否则return跳出。然后将string类型的config转换为protobuf的配置对象—config_proto,再通过:Global<CtrlClient>::Get()->PushKV("config_proto", config_proto);将config_proto和"config_proto"这个key绑定,用于后续的传递。最后初始化一个全局的session对象(SessionGlobalObjectsScope),拥有唯一的id。

3.编译Job逻辑图

主要通过compiler.Compile(self, func_desc, self.config_proto) 中的 _CompileJob[12]()来编译user_job、推导输入输出的op shape、sbp信息、构建Job基于op的逻辑图等,代码如下:

def Compile(session, function_desc, config_proto):with InterpretScope(session, function_desc, config_proto):_CompileJob(function_desc)oneflow_api.CurJobBuildAndInferCtx_Complete()def _CompileJob(function_desc):func = function_desc.job_funcparameters = func.__oneflow_function_signature__.parametersif len(parameters) == 0:func.__oneflow_input_blob_defs__ = ()elif all(p.annotation is inspect._empty for _, p in parameters.items()):func.__oneflow_input_blob_defs__ = _GetArgDefault(func)elif all(p.annotation is not inspect._empty for _, p in parameters.items()):func.__oneflow_input_blob_defs__ = _MakeInputBlobDefFromParameterSignature(parameters)else:raise NotImplementedError("All parameters of global function should be annotated")inputs = _RecursiveMakeInputBlobs(func.__oneflow_input_blob_defs__)ret = func(*inputs)return_annotation = func.__oneflow_function_signature__.return_annotationoft_util.CheckReturnByAnnotation(func.__name__, ret, return_annotation)func.__oneflow_output_remote_blobs__ = _RecursiveMakeRetRemoteBlobs(ret, allow_cpu_return_op=function_desc.function_attribute.allow_cpu_return_op)

其中,JobBuildAndInferCtx会保存已经加入的Op及其OpConf(SBP、shape等),并通过_RecursiveMakeInputBlobs()和_RecursiveMakeRetRemoteBlobs()  为userjob的输入输出自动插入相应的blob op,其中_RecursiveMakeInputBlobs会自动插入Input op用于数据输入;_RecursiveMakeRetRemoteBlobs将自动插入Return op用于数据输出。

_RecursiveMakeInputBlobs >> InputOpByArgBlobDef >>>>>>>>>>>>>>>>>>>>
op_conf >>>>>>>>>>>>>>>>>>>> name: "Input_0"
device_tag: "gpu"
scope_symbol_id: 4611686018427420670
input_conf {out: "out"blob_conf {shape {dim: 2dim: 1dim: 3dim: 3}data_type: kFloatsplit_axis {value: 0}batch_axis {value: 0}is_dynamic: falseis_tensor_list: false}
}_RecursiveMakeRetRemoteBlobs >> LazyReturnRemoteBlob >>>>>>>>>>>>>>>>>>>>
op_conf >>>>>>>>>>>>>>>>>>>> name: "Return_2"
device_tag: "cpu"
return_conf {in: "Reflection_Pad2d1/y_0"out: "out"
}

在经过上述操作后,Job推导时需要用到的所有op都已经齐全,将在后续的CurJobBuildAndInferCtx_Complete()[13]中调用c++的LazyJobBuildAndInferCtx::Complete()[14],开始Job逻辑图编译的主过程:

Maybe<void> LazyJobBuildAndInferCtx::Complete() {CHECK_NOTNULL(Global<JobDesc>::Get());Global<JobDesc>::Delete();if (job().job_conf().has_train_conf()) {CHECK_OR_RETURN(job().job_conf().train_conf().has_model_update_conf());CHECK_OR_RETURN(job().job_conf().train_conf().has_primary_lr());}auto scope = std::make_unique<GlobalJobDescScope>(mut_job()->job_conf(), job_id());JobPassCtx job_pass_ctx(GlobalJobDesc());auto DoPass = [&](const std::string& pass_name "&") -> Maybe<void> {return JobPass4Name(pass_name)(mut_job(), &job_pass_ctx);};if (GlobalJobDesc().Bool("__is_user_function__")) {JUST(DoPass("CompleteOfrecordDecoder"));JUST(DoPass("SetDefaultVariableConf"));
#ifdef WITH_CUDAJUST(DoPass("AutoMixedPrecision"));
#endifJUST(DoPass("OptimizerPlacementOptimizationPass"));JUST(DoPass("DynamicLossScaleSchedulePass"));JUST(DoPass("AutoTrainStep"));JUST(DoPass("AutoLearningRate"));JUST(DoPass("GenerateBackwardAndOptimizerOpConfs"));JUST(DoPass("AddSspVariableProxy"));JUST(DoPass("CheckpointingPass"));JUST(DoPass("CudnnFusedNormalizationAddReluPass"));JUST(DoPass("PruneCastToStaticShapeOpsPass"));JUST(DoPass("FuseAddToOutputPass"));JUST(DoPass("IndexedSlicesOptimizerRewritePass"));JUST(DoPass("SplitSparseSoftmaxCrossEntropyOpPass"));JUST(DoPass("DoParallelCastBeforeWideningTypeCast"));JUST(DoPass("AddLbiDiffWatcherOpConfs"));JUST(DoPass("FuseCastScalePass"));JUST(DoPass("PruneParallelCastOpsPass"));JUST(DoPass("FuseUpdateOpsPass"));JUST(DoPass("DumpVariableInfoPass"));}JUST(DoPass("DumpTimeShapeAndBlobParallelConfPass"));return Maybe<void>::Ok();
}

此过程主要基于由op节点(OpNode)构成的Job逻辑图(OpGragh),进行了一系列pass的系统优化过程,每个pass对逻辑图进行了一次图修改/重写,最终完成User Job的第一个阶段的编译过程。编译的具体过程,将在下一篇文章中详细说明。

4.启动session,编译Job物理图&Plan生成

这一步将会对之前编译完成的、由各种op节点构成的User Job(jobset)进一步处理,触发Oneflow::Init()过程,该过程主要内容为生成最终Jobs,编译Jobs物理图生成一个最终的运行时执行计划—Plan。

首先,通过oneflow_api.StartLazyGlobalSession()[15],启动session:

inline Maybe<void> StartLazyGlobalSession() {CHECK_NOTNULL_OR_RETURN(Global<SessionGlobalObjectsScope>::Get()) << "session not found";CHECK_OR_RETURN(Global<MachineCtx>::Get()->IsThisMachineMaster());const JobSet& job_set = Global<LazyJobBuildAndInferCtxMgr>::Get()->job_set();if (Global<ResourceDesc, ForSession>::Get()->enable_debug_mode()) {TeePersistentLogStream::Create("job_set.prototxt")->Write(job_set);}if (job_set.job().empty()) { return Error::JobSetEmptyError() << "no function defined"; }CHECK_ISNULL_OR_RETURN(Global<Oneflow>::Get());Global<CtrlClient>::Get()->PushKV("session_job_set", job_set);Global<const InterJobReuseMemStrategy>::New(job_set.inter_job_reuse_mem_strategy());Global<Oneflow>::New();JUST(Global<Oneflow>::Get()->Init(job_set));return Maybe<void>::Ok();
}

通过SessionGlobalObjectsScope的Get()方法检查之前的SessionGlobalObjectsScope对象是否成功Init,如果成功Init且本机为master节点,则进行后续操作:

  • 通过LazyJobBuildAndInferCtxMgr获取本session的 job_set;

  • 将job_set绑定到protobuf的key-value中,key为session_job_set;

  • 创建一个job间内存复用管理策略的对象InterJobReuseMemStrategy;

  • 创建一个Oneflow对象,并通过Oneflow::Init()[16]方法,触发job编译成plan的主过程并生成最终的可执行计划—Plan;

5.初始化model IO

主要通过check_point_v2.Init()[17]初始化model IO,用于加载/保存模型文件。至此基本完成了整个job从编译至生成运行时plan的完整流程。

编译Job物理图&Plan融合

104751691-b0ea8a00-5790-11eb-94f3-8dff215b13eb.png

此过程主要发生在Oneflow::Init()[18]方法中,Init过程的代码如下:

Maybe<void> Oneflow::Init(const oneflow::JobSet& job_set) {OF_PROFILER_RANGE_GUARD("Oneflow::Init");// RuntimeOF_PROFILER_RANGE_PUSH("CompileAndMergePlanOnMaster");JUST(CompileAndMergePlanOnMaster(job_set.job(), &plan_));OF_PROFILER_RANGE_POP();  // CompileAndMergePlanOnMasterif (Global<MachineCtx>::Get()->IsThisMachineMaster()) {runtime_buffers_scope_.reset(new RuntimeBuffersScope(plan_));}OF_PROFILER_RANGE_PUSH("new Runtime");runtime_.reset(new Runtime(plan_, GetMaxVal<size_t>(), false));OF_PROFILER_RANGE_POP();  // new Runtimereturn Maybe<void>::Ok();
}

主要过程为:

  • CompileAndMergePlanOnMaster() 将jobs编译成物理图并生成运行时计划—plan

  • new RuntimeBuffersScope() 新建一个运行时buffer scope对象用于内存管理

  • new Runtime() 根据生成的最终可执行plan,新建运行时—runtime对象

CompileAndMergePlanOnMaster()[19]

在CompileAndMergePlanOnMaster()[20]的过程中,系统将动态为每个user job添加一个push job用于处理数据输入;添加一个pull job用于处理数据输出,如果开启enable_legacy_model_io则添加model io相关的job至jobs中; 最后通过

  • CompileCurJobOnMaster()

  • MergeSubPlanWithoutGenNetTopo()

  • MakeMainJob()

  • CompileMainJob()

  • LinkMainPlan()

等一系列方法将jobs物理图编译融合至Plan中,最后new Runtime()会创建一个运行时的runtimeruntime``_.reset(new Runtime(plan_``, GetMaxVal<size_t>(), false));不过Runtime[21]并没有立即执行,而是会等待一个tick信号触发。通常,系统自动添加的Push Job中的ForeignInput Op[22] 内部维护一个buffer,该buffer等待Python端喂数据,一旦有数据输入此op,将触发tick信号,开启整个Plan的运行过程。

总结来说,编译时将每个job编译成运行时计划plan,并将所有plan融合的过程,分为几步:1) 将每个Job生成的Plan(SubPlan)合并到一个大的MergedPlan中 2) Job之间的内存复用和内存共享 3) 计算CriticalSection[23]4) 生成MainJob[24]5) 编译MainJob得到MainPlan 6) 将MainPlan和MergedPlan中每个Job生成的SubPlan进行link,得到最终的MergedPlan 关于CriticalSection[25]的设计和描述,请参考:仅此一文让您掌握OneFlow框架的系统设计(上篇)

附:Plan流程图(png/svg)制作
  1. 开启debug模式export ONEFLOW_DEBUG_MODE=1开启后,会保存log,log/dot目录下会有merged_plan.dot

  2. 安装graphviz ubuntu下可通过sudo apt-get install graphviz命令

  3. 根据merged_plan.dot创建svg/png图dot -Tsvg merged_plan.dot > merged_plan.svgdot -Tpng merged_plan.dot > merged_plan.png

LazyRun()

TryInit()之后,整个执行计划plan构建完成,通过session.TryInit().LazyRun(job_func, *args, **kwargs)[26]发送tick信号,触发整个plan的运行。LazyRun()方法如下:[27]

def LazyRun(self, job_func, *arg):assert self.status_ is SessionStatus.RUNNINGremote_blobs = self.LaunchUserJob(job_func, *arg)if remote_blobs is None:returnfuture_blob = LazyFutureRemoteBlobs(self).SetResult(remote_blobs).Inited()annotation = inspect.signature(job_func).return_annotationreturn oft_util.TransformGlobalFunctionResult(future_blob, annotation)

整个过程即传入输入数据至用户定义的user job中,计算得到最终的remote_blobs(remote_blobs = self.LaunchUserJob(job_func, *arg)),然后通过转换得到输出数据。

def LaunchUserJob(self, job_func, *arg):assert self.status_ is SessionStatus.RUNNINGjob_name = job_func.__name__push_util.AsyncPush(self, job_func, *arg)self.LaunchJob(job_instance_util.MakeUserJobInstance(job_name))return job_func.__oneflow_output_remote_blobs__

其中,运行时Plan的触发是通过LaunchUserJob触发的,具体请看数据流转部分。

总结

总体来说,在oneflow中一个用户自定义的job执行过程,主要分为编译时运行时。编译时将每个job编译成运行时计划plan;运行时,则是plan的实际执行过程。

以上,只是基于文章:仅此一文让您掌握OneFlow框架的系统设计(上篇)仅此一文让您掌握OneFlow框架的系统设计(上篇)")和代码,梳理出的粗浅总结,Oneflow的系统设计远不止上述这些,譬如:流控机制、Actor、Register、SBP抽象、支持epoll/RDMA通信的网络模块CommNet、内存管理模块、ID编址系统等、Op/Kernel的设计...

更多介绍,请参考:

  • 仅此一文让您掌握OneFlow框架的系统设计(上篇)

  • 仅此一文让您掌握OneFlow框架的系统设计(中篇)

  • 仅此一文让您掌握OneFlow框架的系统设计(下篇)

参考资料

[1]

lazy_oneflow_function(): https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/python/framework/function_util.py#L150

[2]

_RunLazyJob(): https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/python/framework/function_util.py#L220

[3]

CompileAndMergePlanOnMaster(): https://github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/job/oneflow.cpp#L897

[4]

ForeignInput Op: https://link.zhihu.com/?target=https%3A//github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/kernel/foreign_input_kernel.cpp%23L27

[5]

ForeignOutput Op: https://link.zhihu.com/?target=https%3A//github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/kernel/foreign_output_kernel.cpp%23L27

[6]

LaunchUserJob: https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/python/framework/session_util.py#L298

[7]

oneflow_api.Dtype_GetOfBlobStaticTensorCopyFromBufferFuncName(): https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/python/framework/ofblob.py#L113

[8]

Dtype_GetOfBlobStaticTensorCopyFromBufferFuncName(): https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/api/python/ofblob/ofblob.e.h#L80

[9]

Init(): https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/python/framework/session_util.py#L207

[10]

session.h: https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/api/python/session/session.h#L41

[11]

InitLazyGlobalSession(): https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/api/python/session/session.h#L41

[12]

_CompileJob: https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/python/framework/compiler.py#L86

[13]

CurJobBuildAndInferCtx_Complete(): https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/python/framework/compiler.py#L48

[14]

LazyJobBuildAndInferCtx::Complete(): https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/core/job/job_build_and_infer_ctx.cpp#L939

[15]

oneflow_api.StartLazyGlobalSession(): https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/python/framework/session_util.py#L221

[16]

Oneflow::Init(): https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/core/job/oneflow.cpp#L991

[17]

check_point_v2.Init(): https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/python/framework/session_util.py#L226

[18]

Oneflow::Init(): https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/core/job/oneflow.cpp#L991

[19]

CompileAndMergePlanOnMaster(): https://github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/job/oneflow.cpp#L983

[20]

CompileAndMergePlanOnMaster(): https://github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/job/oneflow.cpp#L983

[21]

Runtime: https://github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/job/runtime.cpp#L63

[22]

ForeignInput Op: https://link.zhihu.com/?target=https%3A//github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/kernel/foreign_input_kernel.cpp%23L27

[23]

CriticalSection: https://github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/job/oneflow.cpp#L737

[24]

MainJob: https://github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/job/oneflow.cpp#L573

[25]

CriticalSection: https://github.com/Oneflow-Inc/oneflow/blob/v0.2.0/oneflow/core/job/oneflow.cpp#L737

[26]

session.TryInit().LazyRun(job_func, *args, **kwargs): https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/python/framework/function_util.py#L221

[27]

LazyRun()方法如下:: https://github.com/Oneflow-Inc/oneflow/blob/f733ab4cb43699cb4e3783032ac00b181166e8d9/oneflow/python/framework/session_util.py#L277

网页端点击阅读原文,获得更好的阅读体验

这篇关于一个Job在OneFlow中的执行过程—上篇的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/227334

相关文章

将Mybatis升级为Mybatis-Plus的详细过程

《将Mybatis升级为Mybatis-Plus的详细过程》本文详细介绍了在若依管理系统(v3.8.8)中将MyBatis升级为MyBatis-Plus的过程,旨在提升开发效率,通过本文,开发者可实现... 目录说明流程增加依赖修改配置文件注释掉MyBATisConfig里面的Bean代码生成使用IDEA生

C# WinForms存储过程操作数据库的实例讲解

《C#WinForms存储过程操作数据库的实例讲解》:本文主要介绍C#WinForms存储过程操作数据库的实例,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、存储过程基础二、C# 调用流程1. 数据库连接配置2. 执行存储过程(增删改)3. 查询数据三、事务处

JSON Web Token在登陆中的使用过程

《JSONWebToken在登陆中的使用过程》:本文主要介绍JSONWebToken在登陆中的使用过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录JWT 介绍微服务架构中的 JWT 使用结合微服务网关的 JWT 验证1. 用户登录,生成 JWT2. 自定义过滤

java中使用POI生成Excel并导出过程

《java中使用POI生成Excel并导出过程》:本文主要介绍java中使用POI生成Excel并导出过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录需求说明及实现方式需求完成通用代码版本1版本2结果展示type参数为atype参数为b总结注:本文章中代码均为

Spring定时任务只执行一次的原因分析与解决方案

《Spring定时任务只执行一次的原因分析与解决方案》在使用Spring的@Scheduled定时任务时,你是否遇到过任务只执行一次,后续不再触发的情况?这种情况可能由多种原因导致,如未启用调度、线程... 目录1. 问题背景2. Spring定时任务的基本用法3. 为什么定时任务只执行一次?3.1 未启用

SpringCloud之LoadBalancer负载均衡服务调用过程

《SpringCloud之LoadBalancer负载均衡服务调用过程》:本文主要介绍SpringCloud之LoadBalancer负载均衡服务调用过程,具有很好的参考价值,希望对大家有所帮助,... 目录前言一、LoadBalancer是什么?二、使用步骤1、启动consul2、客户端加入依赖3、以服务

Oracle存储过程里操作BLOB的字节数据的办法

《Oracle存储过程里操作BLOB的字节数据的办法》该篇文章介绍了如何在Oracle存储过程中操作BLOB的字节数据,作者研究了如何获取BLOB的字节长度、如何使用DBMS_LOB包进行BLOB操作... 目录一、缘由二、办法2.1 基本操作2.2 DBMS_LOB包2.3 字节级操作与RAW数据类型2.

C#原型模式之如何通过克隆对象来优化创建过程

《C#原型模式之如何通过克隆对象来优化创建过程》原型模式是一种创建型设计模式,通过克隆现有对象来创建新对象,避免重复的创建成本和复杂的初始化过程,它适用于对象创建过程复杂、需要大量相似对象或避免重复初... 目录什么是原型模式?原型模式的工作原理C#中如何实现原型模式?1. 定义原型接口2. 实现原型接口3

Spring Security注解方式权限控制过程

《SpringSecurity注解方式权限控制过程》:本文主要介绍SpringSecurity注解方式权限控制过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、摘要二、实现步骤2.1 在配置类中添加权限注解的支持2.2 创建Controller类2.3 Us

Spring AI集成DeepSeek三步搞定Java智能应用的详细过程

《SpringAI集成DeepSeek三步搞定Java智能应用的详细过程》本文介绍了如何使用SpringAI集成DeepSeek,一个国内顶尖的多模态大模型,SpringAI提供了一套统一的接口,简... 目录DeepSeek 介绍Spring AI 是什么?Spring AI 的主要功能包括1、环境准备2