Spark源码分析之Excutor资源分配流程

2024-01-10 10:32

本文主要是介绍Spark源码分析之Excutor资源分配流程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一.前言

在用户提交应用程序时,SparkContext会向Master发送注册消息,并由Master给该应用分配Executor。

这里的SparkContext主要用于负责和ClusterManager通信,进行资源的管理,任务分配和监控,负责作业执行的生命周期管理,ClusterManager提供了资源的分配和管理。

在不同模式下ClusterManager的角色不同,Standalone中由Master担任,在Yarn模式下由ResourceManager担任。SparkContext对运行的作业划分并分配资源后,会把任务发送到Executor去运行。

本文主要着重对Excutor资源分配的过程进行梳理。

二.Excutor资源分配原理分析

运行一个作业都会一定会有SparkContext,这里我们要明确,在SparkContext启动过程中,会首先创建DAGScheduler和TaskSechduler两个调度器,还有schedulerBackend用于分配当前可用的资源。

1.DAGScheduler
DAGScheduler主要负责将用户的应用的DAG划分为不同的Stage,其中每个Stage由可以并发执行的一组Task构成, 这些Task的执行逻辑完全相同,只是作用于不同的数据。

2.TaskSechduler
负责具体任务的调度执行,从DAGScheduler接收不同Stage的任务,按照调度算法,分配给应用程序的资源Executor上执行相关任务,并为执行特别慢的任务启动备份任务。 TaskSchedulerImpl创建时创建SchedulableBuilder,SchedulableBuilder根据类型分为FIFOSchedulableBuilder和FairSchedulableBuilder两类,。调度器的区别我会在以后的文章中说明,这里先记住有这个概念就行。

3.SchedulerBackend
分配当前可用的资源, 具体就是向当前等待分配计算资源的Task分配计算资源(即Executor) , 并且在分配的Executor上启动Task, 完成计算的调度过程。 它使用reviveOffers完成上述的任务调度。

SparkContext新建时,内部创建一个SparkEnv,SparkEnv内部创建一个RpcEnv

  • SparkEnv:用户执行的环境信息,包括通信相关的端点。
  • RpcEnv:SparkContext中远程通信环境

Excutor资源分配首先会通过TaskSchedulerImpl的start方法,调用StandaloneSchedulerBackend的start方法,在向RPCEnv注册DriverEndpoint和ClientEndpoint端点。

//TaskSchedulerImpl的start方法
override def start() {//StandaloneSchedulerBackend 启动backend.start()if (!isLocal && conf.getBoolean("spark.speculation", false)) {logInfo("Starting speculative execution thread")speculationScheduler.scheduleWithFixedDelay(new Runnable {override def run(): Unit = Utils.tryOrStopSparkContext(sc) {checkSpeculatableTasks()}}, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)}}//StandaloneSchedulerBackend的start方法
override def start() {/*** super.start()中有创建Driver的通信邮箱也就是Driver的引用* 未来Executor就是向 StandaloneSchedulerBackend中父类 CoarseGrainedSchedulerBackend 中反向注册信息的.*/super.start()// SPARK-21159. The scheduler backend should only try to connect to the launcher when in client// mode. In cluster mode, the code that submits the application to the Master needs to connect// to the launcher instead.if (sc.deployMode == "client") {launcherBackend.connect()}// The endpoint for executors to talk to usval driverUrl = RpcEndpointAddress(sc.conf.get("spark.driver.host"),sc.conf.get("spark.driver.port").toInt,CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toStringval args = Seq("--driver-url", driverUrl,"--executor-id", "{{EXECUTOR_ID}}","--hostname", "{{HOSTNAME}}","--cores", "{{CORES}}","--app-id", "{{APP_ID}}","--worker-url", "{{WORKER_URL}}")val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions").map(Utils.splitCommandString).getOrElse(Seq.empty)val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath").map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath").map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)// When testing, expose the parent class path to the child. This is processed by// compute-classpath.{cmd,sh} and makes all needed jars available to child processes// when the assembly is built with the "*-provided" profiles enabled.val testingClassPath =if (sys.props.contains("spark.testing")) {sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq} else {Nil}// Start executors with a few necessary configs for registering with the schedulerval sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)val javaOpts = sparkJavaOpts ++ extraJavaOptsval command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)val webUrl = sc.ui.map(_.webUrl).getOrElse("")val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)// If we're using dynamic allocation, set our initial executor limit to 0 for now.// ExecutorAllocationManager will send the real initial limit to the Master later.val initialExecutorLimit =if (Utils.isDynamicAllocationEnabled(conf)) {Some(0)} else {None}val appDesc: ApplicationDescription = ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)//提交应用程序的描述信息//封装 appDesc,这里已经传入了StandaloneAppClient 中client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)//启动StandaloneAppClient,之后会向Driver注册application的信息client.start()launcherBackend.setState(SparkAppHandle.State.SUBMITTED)waitForRegistration()launcherBackend.setState(SparkAppHandle.State.RUNNING)}

上面源码首先会调用父类的start方法,准备DriverEndpoint的引用信息,这里的DriverEndpoint是CoarseGrainedSchedulerBackend内部的一个对象,主要用于和其他端点交互,传输信息。

未来Executor就是向 StandaloneSchedulerBackend中父类 CoarseGrainedSchedulerBackend 中反向注册信息的.

之后通过StandaloneAppClient实例调用start方法,StandaloneAppClient的start方法如下。

 def start() {// Just launch an rpcEndpoint; it will call back into the listener./***  这里就是给空的 endpoint[AtomicReference] 设置下 信息,*  主要是rpcEnv.setupEndpoint 中创建了 ClientEndpoint 只要设置Endpoint 肯定会调用 ClientEndpoint的onStart方法*/endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))}

这里会调用ClientEndpoint端点的onStart方法,onStart中会调用registerWithMaster向Master注册应用程序的基本信息。

//onStart 方法override def onStart(): Unit = {try {//向Master 注册当前application的信息registerWithMaster(1)} catch {case e: Exception =>logWarning("Failed to connect to master", e)markDisconnected()stop()}}

之后看一下Master中的receive方法如何处理RegisterApplication的消息的。

override def receive: PartialFunction[Any, Unit] = {....//Driver 端提交过来的要注册Applicationcase RegisterApplication(description, driver) =>// TODO Prevent repeated registrations from some driver//如果Master状态是standby 忽略不提交任务if (state == RecoveryState.STANDBY) {// ignore, don't send response} else {logInfo("Registering app " + description.name)//这里封装application信息,注意,在这里可以跟进去看到默认一个application使用的core的个数就是 Int.MaxValueval app = createApplication(description, driver)//注册app ,这里面有向 waitingApps中加入当前applicationregisterApplication(app)logInfo("Registered app " + description.name + " with ID " + app.id)persistenceEngine.addApplication(app)driver.send(RegisteredApplication(app.id, self))//最终又会执行通用方法schedule()schedule()}....
}

可以看到Matser收到注册消息后把该应用加入到等待运行应用列表,调用执行通用方法schedule(),计算需要分配的资源信息,这里指的的是Executor资源。schedule方法中会执行startExecutorsOnWorkers方法。

private def startExecutorsOnWorkers(): Unit = {// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app// in the queue, then the second app, etc.//从waitingApps中获取提交的appfor (app <- waitingApps) {//coresPerExecutor 在application中获取启动一个Executor使用几个core 。参数--executor-core可以指定,下面指明不指定就是1val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)// If the cores left is less than the coresPerExecutor,the cores left will not be allocated//判断是否给application分配够了core,因为后面每次给application 分配core后 app.coresLeft 都会相应的减去分配的core数if (app.coresLeft >= coresPerExecutor) {// Filter out workers that don't have enough resources to launch an executor//过滤出可用的workerval usableWorkers : Array[WorkerInfo]= workers.toArray.filter(_.state == WorkerState.ALIVE).filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&worker.coresFree >= coresPerExecutor).sortBy(_.coresFree).reverse/*** 下面就是去worker中划分每个worker提供多少core和启动多少Executor,注意:spreadOutApps 是true* 返回的 assignedCores 就是每个worker节点中应该给当前的application分配多少core*/val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)// Now that we've decided how many cores to allocate on each worker, let's allocate themfor (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {//在worker中给Executor划分资源allocateWorkerResourceToExecutors(app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos))}}}}

上述方法首先会从waitingApps中获取提交的app,然后会对每个worker提供多少core和启动多少Executor做一个计算。主要调用了scheduleExecutorsOnWorkers方法。

 private def scheduleExecutorsOnWorkers(app: ApplicationInfo,usableWorkers: Array[WorkerInfo],spreadOutApps: Boolean): Array[Int] = {//启动一个Executor使用多少core,这里如果提交任务没有指定 --executor-core这个值就是Noneval coresPerExecutor : Option[Int]= app.desc.coresPerExecutor//这里指定如果提交任务没有指定启动一个Executor使用几个core,默认就是1val minCoresPerExecutor = coresPerExecutor.getOrElse(1)//oneExecutorPerWorker 当前为trueval oneExecutorPerWorker :Boolean= coresPerExecutor.isEmpty//默认启动一个Executor使用的内存就是1024M,这个设置在SparkContext中464行//若提价命令中有 --executor-memory 5*1024 就是指定的参数val memoryPerExecutor = app.desc.memoryPerExecutorMB//可用worker的个数val numUsable = usableWorkers.length//创建两个重要对象val assignedCores = new Array[Int](numUsable) // Number of cores to give to each workerval assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker/*** coresToAssign 指的是当前要给Application分配的core是多少? app.coresLeft 与集群所有worker剩余的全部core 取个最小值* 这里如果提交application时指定了 --total-executor-core 那么app.coresLeft  就是指定的值*/var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)def canLaunchExecutor(pos: Int): Boolean = {..}var freeWorkers = (0 until numUsable).filter(canLaunchExecutor)while (freeWorkers.nonEmpty) {freeWorkers.foreach { pos =>var keepScheduling = truewhile (keepScheduling && canLaunchExecutor(pos)) {coresToAssign -= minCoresPerExecutorassignedCores(pos) += minCoresPerExecutor// If we are launching one executor per worker, then every iteration assigns 1 core// to the executor. Otherwise, every iteration assigns cores to a new executor.if (oneExecutorPerWorker) {assignedExecutors(pos) = 1} else {assignedExecutors(pos) += 1}// Spreading out an application means spreading out its executors across as// many workers as possible. If we are not spreading out, then we should keep// scheduling executors on this worker until we use all of its resources.// Otherwise, just move on to the next worker.if (spreadOutApps) {keepScheduling = false}}}freeWorkers = freeWorkers.filter(canLaunchExecutor)}//最后返回每个Worker上分配多少coreassignedCores}

scheduleExecutorsOnWorkers的主要过程其实就是worker提供多少core,每个Executor需要多少core(默认为1),一个worker启动多少Executor,同时判断每个Executor使用多少内存,默认启动一个executor使用的内存就是1024M。

在默认情况下,Standalone模式中每个应用程序所能分配到的CPU核数可以由spark.deploy.defaultCores进行设置,但是该配置项默认情况下为Int.Max,也就是不限制,当然用户可以设置total-executor-core 配置项,约束每个application最多使用多少core,还有可以设置--executor-cores设置一个executor上最少几个core。

另外在分配应用程序资源时,会根据Worker的分配策略进行,分配算法有两种:
1.应用程序运行在尽可能多的Worker上,这种分配算法不仅能充分使用集群资源,而且还有利于数据处理的本地性。
2.应用程序运行在尽可能少的Worker上,该情况适合CPU密集型而内存使用较少的场景。

该策略可以通过spark.deploy.spreadOut参数进行配置,默认情况下为true。也就是说默认就是在Woker上均分,每一个worker都会分配executor。

如果设置了false,是在一个worker中尽可能多分配excutor,然后此worker不满足分配条件之后再向其他worker中分配executor,会根据--total-executor-core 和 --executor-core来判断需要多少个executor和每个executor多少core,但是注意这个core是最小值,也就是说如果oneExecutorPerWorker为true,一个worker一个excutor则,如果worker数不够,也就会导致executor数不足预期的executor数,那就会导致为了满足total-executor-core的要求,会在executor上多分配core,也就是比你配置的executor-core值要大。

之后会将这些Executor的启动信息封装发送到woker上去。

private def allocateWorkerResourceToExecutors(app: ApplicationInfo,assignedCores: Int,coresPerExecutor: Option[Int],worker: WorkerInfo): Unit = {// If the number of cores per executor is specified, we divide the cores assigned// to this worker evenly among the executors with no remainder.// Otherwise, we launch a single executor that grabs all the assignedCores on this worker.val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)//每个Executor要分配多少个coreval coresToAssign = coresPerExecutor.getOrElse(assignedCores)for (i <- 1 to numExecutors) {val exec: ExecutorDesc = app.addExecutor(worker, coresToAssign)//去worker中启动ExecutorlaunchExecutor(worker, exec)app.state = ApplicationState.RUNNING}}

woker中的receive方法,会匹配LaunchExecutor

     //启动Executorcase LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>//创建ExecutorRunnerval manager = new ExecutorRunner(appId,execId,/*** appDesc 中有 Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",.......) 中* 第一个参数就是Executor类*/appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),cores_,memory_,self,workerId,host,webUi.boundPort,publicAddress,sparkHome,executorDir,workerUri,conf,appLocalDirs, ExecutorState.RUNNING)executors(appId + "/" + execId) = manager/*** 启动ExecutorRunner* 启动的就是 CoarseGrainedExecutorBackend 类,* 下面看 CoarseGrainedExecutorBackend 类中的main 方法有反向注册给Driver*/manager.start()coresUsed += cores_memoryUsed += memory_sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))} catch {...}....

可以看到上面源码中,此时会在woker中创建ExecutorRunner,ExecutorRunner构建时传入的参数appDesc中有 command,而这个command就是CommandCoarseGrainedExecutorBackend的全限定名。

之后的manager.start(),会在woker中开辟线程异步的启动CoarseGrainedExecutorBackend,然后调用CoarseGrainedExecutorBackend的main方法。

 private[worker] def start() {workerThread = new Thread("ExecutorRunner for " + fullId) {override def run() { fetchAndRunExecutor() }}workerThread.start()
....}

CommandCoarseGrainedExecutorBackend的main方法中会调用run方法,初始化sparkEnv,rpcEnv等相关基础设施。

之后是创建执行容器executor的步骤,具体是从RPCEnv中拿到DriverEndpoint(CoarseGrainedSecheduleBackend类中)的引用,给DriverEndpoint类反向注册Executor并等待返回。

源码过程如下

 private def run(driverUrl: String,executorId: String,hostname: String,cores: Int,appId: String,workerUrl: Option[String],userClassPath: Seq[URL]) {....val env = SparkEnv.createExecutorEnv(driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false)//注册Executor的通信邮箱,会调用CoarseGrainedExecutorBackend的onstart方法env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))workerUrl.foreach { url =>env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))}env.rpcEnv.awaitTermination()....
}

CoarseGrainedExecutorBackend的onStart方法

override def onStart() {logInfo("Connecting to driver: " + driverUrl)//从RPC中拿到Driver的引用,给Driver反向注册ExecutorrpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>// This is a very fast action so we can use "ThreadUtils.sameThread"//拿到Driver的引用driver = Some(ref)/*** 给Driver反向注册Executor信息,这里就是注册给之前看到的 CoarseGrainedSchedulerBackend 类中的DriverEndpoint* DriverEndpoint类中会有receiveAndReply 方法来匹配RegisterExecutor*/ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))}(ThreadUtils.sameThread).onComplete {// This is a very fast action so we can use "ThreadUtils.sameThread"case Success(msg) =>// Always receive `true`. Just ignore itcase Failure(e) =>exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)}(ThreadUtils.sameThread)}

DriverEndpoint处理RegisterExecutor请求,对excutor信息封装加入到相关容器中去,发送消息给ExecutorRef 告诉Executor已经被注册。

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
....//反向注册的Executorcase RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) =>/*** 拿到Execuotr的通信邮箱,发送消息给ExecutorRef 告诉 Executor已经被注册。* 在 CoarseGrainedExecutorBackend 类中 receive方法一直监听有没有被注册,匹配上就会启动Executor**/executorRef.send(RegisteredExecutor)// Note: some tests expect the reply to come after we put the executor in the mapcontext.reply(true)listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))makeOffers()
....}

此时上面源码中的ExecutorRef为CoarseGrainedExecutorBackend的引用,然后会在receive函数中匹配上Driver端发过来的消息,表示已经接受注册Executor了,下面要启动Executor,Executor内部维护一个线程池,Executor中有线程池用于task运行。

override def receive: PartialFunction[Any, Unit] = {//匹配上Driver端发过来的消息,已经接受注册Executor了,下面要启动Executorcase RegisteredExecutor =>logInfo("Successfully registered with driver")try {//下面创建Executor,Executor真正的创建Executor,Executor中有线程池用于task运行【Executor中89行】executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)} catch {case NonFatal(e) =>exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)}
}

到此Excutor资源分配就完成了。之后该Excutor会定时向Driver发送心跳信息,等待Driver下发任务。

private def startDriverHeartbeater(): Unit = {val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")// Wait a random interval so the heartbeats don't end up in syncval initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int]val heartbeatTask = new Runnable() {override def run(): Unit = Utils.logUncaughtExceptions(reportHeartBeat())}heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS)}

之后Excutor会接受DriverEndPoint端点发送的LaunchTask的执行任务的消息,任务执行是在Executor的launchTask方法实现的。在执行是会创建TaskRunner进程,该进程主要负责任务的执行,处理完毕后发送状态变更statusUpdate消息返回给DriverEndPoint。DriverEndPoint接受到消息后会调用TaskScheduleImpl的statusUpdate方法,根据任务执行不同的结果进行处理,处理完毕后再给该Executor分配执行任务。

其中再DriverEndPoint端点处理状态变更的代码如下。

override def receive: PartialFunction[Any, Unit] = {case StatusUpdate(executorId, taskId, state, data) =>scheduler.statusUpdate(taskId, state, data.value)if (TaskState.isFinished(state)) {executorDataMap.get(executorId) match {case Some(executorInfo) =>executorInfo.freeCores += scheduler.CPUS_PER_TASKmakeOffers(executorId)case None =>// Ignoring the update since we don't know about the executor.logWarning(s"Ignored task status update ($taskId state $state) " +s"from unknown executor with ID $executorId")}}

三.总结

用一张图总结上述的调度流程

14534869-f47d33e5079dbcb4.png
Executor调度流程

至此Executor资源分配的整个流程的源码就分析完毕了。

下一篇文章会对具体任务的调度流程进行分析,敬请关注。

这篇关于Spark源码分析之Excutor资源分配流程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/590485

相关文章

Security OAuth2 单点登录流程

单点登录(英语:Single sign-on,缩写为 SSO),又译为单一签入,一种对于许多相互关连,但是又是各自独立的软件系统,提供访问控制的属性。当拥有这项属性时,当用户登录时,就可以获取所有系统的访问权限,不用对每个单一系统都逐一登录。这项功能通常是以轻型目录访问协议(LDAP)来实现,在服务器上会将用户信息存储到LDAP数据库中。相同的,单一注销(single sign-off)就是指

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

如何在Visual Studio中调试.NET源码

今天偶然在看别人代码时,发现在他的代码里使用了Any判断List<T>是否为空。 我一般的做法是先判断是否为null,再判断Count。 看了一下Count的源码如下: 1 [__DynamicallyInvokable]2 public int Count3 {4 [__DynamicallyInvokable]5 get

SWAP作物生长模型安装教程、数据制备、敏感性分析、气候变化影响、R模型敏感性分析与贝叶斯优化、Fortran源代码分析、气候数据降尺度与变化影响分析

查看原文>>>全流程SWAP农业模型数据制备、敏感性分析及气候变化影响实践技术应用 SWAP模型是由荷兰瓦赫宁根大学开发的先进农作物模型,它综合考虑了土壤-水分-大气以及植被间的相互作用;是一种描述作物生长过程的一种机理性作物生长模型。它不但运用Richard方程,使其能够精确的模拟土壤中水分的运动,而且耦合了WOFOST作物模型使作物的生长描述更为科学。 本文让更多的科研人员和农业工作者

MOLE 2.5 分析分子通道和孔隙

软件介绍 生物大分子通道和孔隙在生物学中发挥着重要作用,例如在分子识别和酶底物特异性方面。 我们介绍了一种名为 MOLE 2.5 的高级软件工具,该工具旨在分析分子通道和孔隙。 与其他可用软件工具的基准测试表明,MOLE 2.5 相比更快、更强大、功能更丰富。作为一项新功能,MOLE 2.5 可以估算已识别通道的物理化学性质。 软件下载 https://pan.quark.cn/s/57

工厂ERP管理系统实现源码(JAVA)

工厂进销存管理系统是一个集采购管理、仓库管理、生产管理和销售管理于一体的综合解决方案。该系统旨在帮助企业优化流程、提高效率、降低成本,并实时掌握各环节的运营状况。 在采购管理方面,系统能够处理采购订单、供应商管理和采购入库等流程,确保采购过程的透明和高效。仓库管理方面,实现库存的精准管理,包括入库、出库、盘点等操作,确保库存数据的准确性和实时性。 生产管理模块则涵盖了生产计划制定、物料需求计划、

衡石分析平台使用手册-单机安装及启动

单机安装及启动​ 本文讲述如何在单机环境下进行 HENGSHI SENSE 安装的操作过程。 在安装前请确认网络环境,如果是隔离环境,无法连接互联网时,请先按照 离线环境安装依赖的指导进行依赖包的安装,然后按照本文的指导继续操作。如果网络环境可以连接互联网,请直接按照本文的指导进行安装。 准备工作​ 请参考安装环境文档准备安装环境。 配置用户与安装目录。 在操作前请检查您是否有 sud