Spark Master资源调度--SparkContext向所有master注册

2024-03-31 13:58

本文主要是介绍Spark Master资源调度--SparkContext向所有master注册,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Spark Master资源调度–SparkContext向所有master注册

更多资源

  • github: https://github.com/opensourceteams/spark-scala-maven
  • csdn(汇总视频在线看): https://blog.csdn.net/thinktothings/article/details/84726769

Youtube视频分享

  • Spark Master资源调度–SparkContext向所有master注册 : https://youtu.be/AXxCnCc5Mh0​

Bilibili视频分享

  • Spark Master资源调度–SparkContext向所有master注册 : https://www.bilibili.com/video/av37442295/
width="800" height="500" src="//player.bilibili.com/player.html?aid=37442295&page=1" scrolling="no" border="0" allowfullscreen="true">

SparkContext启动向master发送消息

  • ClientEndpoint向master发送消息: RegisterApplication
    /***  Register with all masters asynchronously and returns an array `Future`s for cancellation.*/private def tryRegisterAllMasters(): Array[JFuture[_]] = {for (masterAddress <- masterRpcAddresses) yield {registerMasterThreadPool.submit(new Runnable {override def run(): Unit = try {if (registered.get) {return}logInfo("Connecting to master " + masterAddress.toSparkURL + "...")val masterRef =rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)masterRef.send(RegisterApplication(appDescription, self))} catch {case ie: InterruptedException => // Cancelledcase NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)}})}}

master处理消息RegisterApplication

  • 创建 Application 并注册到master上
  • Application 保存到 master 存储引擎中
  • 向driver发送已注册成功消息: RegisteredApplication
    case RegisterApplication(description, driver) => {// TODO Prevent repeated registrations from some driverif (state == RecoveryState.STANDBY) {// ignore, don't send response} else {logInfo("Registering app " + description.name)val app = createApplication(description, driver)registerApplication(app)logInfo("Registered app " + description.name + " with ID " + app.id)persistenceEngine.addApplication(app)driver.send(RegisteredApplication(app.id, self))schedule()}}
  • 过滤所有已注册的Worker(状态为ALIVE)
  • 遍历 waitingDrivers,如果有等待中的Drivers,给worker发送启动Driver消息: LaunchDriver
  • 调用在worker上启动executor方法
 /*** Schedule the currently available resources among waiting apps. This method will be called* every time a new app joins or resource availability changes.*/private def schedule(): Unit = {if (state != RecoveryState.ALIVE) {return}// Drivers take strict precedence over executorsval shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))val numWorkersAlive = shuffledAliveWorkers.sizevar curPos = 0for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers// We assign workers to each waiting driver in a round-robin fashion. For each driver, we// start from the last worker that was assigned a driver, and continue onwards until we have// explored all alive workers.var launched = falsevar numWorkersVisited = 0while (numWorkersVisited < numWorkersAlive && !launched) {val worker = shuffledAliveWorkers(curPos)numWorkersVisited += 1if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {launchDriver(worker, driver)waitingDrivers -= driverlaunched = true}curPos = (curPos + 1) % numWorkersAlive}}startExecutorsOnWorkers()}
  • 过滤waitingApps,刚才注册的Application已经在ArrayBuffer中
  • 对已注册的worker进行过滤
  • 过滤条件状态为ALIVE,可用cpu内核数大于等于每个executor的内核数,可用内存大于等于Application在每个executor需要的内存数
  • 对可用worker进行排序(按可用内核数从大到小排序)
  • 调用方法 scheduleExecutorsOnWorkers,worker给executor分配多少个cpu内核
/*** Schedule and launch executors on workers*/private def startExecutorsOnWorkers(): Unit = {// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app// in the queue, then the second app, etc.for (app <- waitingApps if app.coresLeft > 0) {val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor// Filter out workers that don't have enough resources to launch an executorval usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE).filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&worker.coresFree >= coresPerExecutor.getOrElse(1)).sortBy(_.coresFree).reverseval assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)// Now that we've decided how many cores to allocate on each worker, let's allocate themfor (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {allocateWorkerResourceToExecutors(app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))}}}
  • 进行具体的当前Application在Worker上给executor分配几个cpu内核
 /*** Schedule executors to be launched on the workers.* Returns an array containing number of cores assigned to each worker.** There are two modes of launching executors. The first attempts to spread out an application's* executors on as many workers as possible, while the second does the opposite (i.e. launch them* on as few workers as possible). The former is usually better for data locality purposes and is* the default.** The number of cores assigned to each executor is configurable. When this is explicitly set,* multiple executors from the same application may be launched on the same worker if the worker* has enough cores and memory. Otherwise, each executor grabs all the cores available on the* worker by default, in which case only one executor may be launched on each worker.** It is important to allocate coresPerExecutor on each worker at a time (instead of 1 core* at a time). Consider the following example: cluster has 4 workers with 16 cores each.* User requests 3 executors (spark.cores.max = 48, spark.executor.cores = 16). If 1 core is* allocated at a time, 12 cores from each worker would be assigned to each executor.* Since 12 < 16, no executors would launch [SPARK-8881].*/private def scheduleExecutorsOnWorkers(app: ApplicationInfo,usableWorkers: Array[WorkerInfo],spreadOutApps: Boolean): Array[Int] = {val coresPerExecutor = app.desc.coresPerExecutorval minCoresPerExecutor = coresPerExecutor.getOrElse(1)val oneExecutorPerWorker = coresPerExecutor.isEmptyval memoryPerExecutor = app.desc.memoryPerExecutorMBval numUsable = usableWorkers.lengthval assignedCores = new Array[Int](numUsable) // Number of cores to give to each workerval assignedExecutors = new Array[Int](numUsable) // Number of new executors on each workervar coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)/** Return whether the specified worker can launch an executor for this app. */def canLaunchExecutor(pos: Int): Boolean = {val keepScheduling = coresToAssign >= minCoresPerExecutorval enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor// If we allow multiple executors per worker, then we can always launch new executors.// Otherwise, if there is already an executor on this worker, just give it more cores.val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0if (launchingNewExecutor) {val assignedMemory = assignedExecutors(pos) * memoryPerExecutorval enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutorval underLimit = assignedExecutors.sum + app.executors.size < app.executorLimitkeepScheduling && enoughCores && enoughMemory && underLimit} else {// We're adding cores to an existing executor, so no need// to check memory and executor limitskeepScheduling && enoughCores}}// Keep launching executors until no more workers can accommodate any// more executors, or if we have reached this application's limitsvar freeWorkers = (0 until numUsable).filter(canLaunchExecutor)while (freeWorkers.nonEmpty) {freeWorkers.foreach { pos =>var keepScheduling = truewhile (keepScheduling && canLaunchExecutor(pos)) {coresToAssign -= minCoresPerExecutorassignedCores(pos) += minCoresPerExecutor// If we are launching one executor per worker, then every iteration assigns 1 core// to the executor. Otherwise, every iteration assigns cores to a new executor.if (oneExecutorPerWorker) {assignedExecutors(pos) = 1} else {assignedExecutors(pos) += 1}// Spreading out an application means spreading out its executors across as// many workers as possible. If we are not spreading out, then we should keep// scheduling executors on this worker until we use all of its resources.// Otherwise, just move on to the next worker.if (spreadOutApps) {keepScheduling = false}}}freeWorkers = freeWorkers.filter(canLaunchExecutor)}assignedCores}
  • 分配worker资源给executor
  • 给worker发送启动executor消息: LaunchExecutor
  • 给driver发送Executor已增加消息:ExecutorAdded
/*** Allocate a worker's resources to one or more executors.* @param app the info of the application which the executors belong to* @param assignedCores number of cores on this worker for this application* @param coresPerExecutor number of cores per executor* @param worker the worker info*/private def allocateWorkerResourceToExecutors(app: ApplicationInfo,assignedCores: Int,coresPerExecutor: Option[Int],worker: WorkerInfo): Unit = {// If the number of cores per executor is specified, we divide the cores assigned// to this worker evenly among the executors with no remainder.// Otherwise, we launch a single executor that grabs all the assignedCores on this worker.val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)val coresToAssign = coresPerExecutor.getOrElse(assignedCores)for (i <- 1 to numExecutors) {val exec = app.addExecutor(worker, coresToAssign)launchExecutor(worker, exec)app.state = ApplicationState.RUNNING}}

这篇关于Spark Master资源调度--SparkContext向所有master注册的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/864499

相关文章

搭建Kafka+zookeeper集群调度

前言 硬件环境 172.18.0.5        kafkazk1        Kafka+zookeeper                Kafka Broker集群 172.18.0.6        kafkazk2        Kafka+zookeeper                Kafka Broker集群 172.18.0.7        kafkazk3

Spring 源码解读:自定义实现Bean定义的注册与解析

引言 在Spring框架中,Bean的注册与解析是整个依赖注入流程的核心步骤。通过Bean定义,Spring容器知道如何创建、配置和管理每个Bean实例。本篇文章将通过实现一个简化版的Bean定义注册与解析机制,帮助你理解Spring框架背后的设计逻辑。我们还将对比Spring中的BeanDefinition和BeanDefinitionRegistry,以全面掌握Bean注册和解析的核心原理。

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering)

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering) Power Iteration Clustering (PIC) 是一种基于图的聚类算法,用于在大规模数据集上进行高效的社区检测。PIC 算法的核心思想是通过迭代图的幂运算来发现数据中的潜在簇。该算法适用于处理大规模图数据,特别是在社交网络分析、推荐系统和生物信息学等领域具有广泛应用。Spa

一种改进的red5集群方案的应用、基于Red5服务器集群负载均衡调度算法研究

转自: 一种改进的red5集群方案的应用: http://wenku.baidu.com/link?url=jYQ1wNwHVBqJ-5XCYq0PRligp6Y5q6BYXyISUsF56My8DP8dc9CZ4pZvpPz1abxJn8fojMrL0IyfmMHStpvkotqC1RWlRMGnzVL1X4IPOa_  基于Red5服务器集群负载均衡调度算法研究 http://ww

java线程深度解析(四)——并发模型(Master-Worker)

http://blog.csdn.net/daybreak1209/article/details/51372929 二、Master-worker ——分而治之      Master-worker常用的并行模式之一,核心思想是由两个进程协作工作,master负责接收和分配任务,worker负责处理任务,并把处理结果返回给Master进程,由Master进行汇总,返回给客

49个权威的网上学习资源网站

艺术与音乐 Dave Conservatoire — 一个完全免费的音乐学习网站,口号是“让每一个人都可以接受世界级的音乐教育”,有视频,有练习。 Drawspace — 如果你想学习绘画,或者提高自己的绘画技能,就来Drawspace吧。 Justin Guitar — 超过800节免费的吉他课程,有自己的app,还有电子书、DVD等实用内容。 数学,数据科学与工程 Codecad

Golang进程权限调度包runtime

关于 runtime 包几个方法: Gosched:让当前线程让出 cpu 以让其它线程运行,它不会挂起当前线程,因此当前线程未来会继续执行GOMAXPROCS:设置最大的可同时使用的 CPU 核数Goexit:退出当前 goroutine(但是defer语句会照常执行)NumGoroutine:返回正在执行和排队的任务总数GOOS:目标操作系统NumCPU:返回当前系统的 CPU 核数量 p

Collection的所有的方法演示

import java.util.ArrayList;import java.util.Collection;import java.util.Iterator;public class TestCollection {/*** @param args* Collection的所有的方法演示* 此程序没有使用泛型,所以可以添加任意类型* 以后如果写到泛型会补充这一方面的内容*/public s

Temu官方宣导务必将所有的点位材料进行检测-RSL资质检测

关于饰品类产品合规问题宣导: 产品法规RSL要求 RSL测试是根据REACH法规及附录17的要求进行测试。REACH法规是欧洲一项重要的法规,其中包含许多对化学物质进行限制的规定和高度关注物质。 为了确保珠宝首饰的安全性,欧盟REACH法规规定,珠宝首饰上架各大电商平台前必须进行RSLReport(欧盟禁限用化学物质检测报告)资质认证,以确保产品不含对人体有害的化学物质。 RSL-铅,

文章解读与仿真程序复现思路——电力自动化设备EI\CSCD\北大核心《考虑燃料电池和电解槽虚拟惯量支撑的电力系统优化调度方法》

本专栏栏目提供文章与程序复现思路,具体已有的论文与论文源程序可翻阅本博主免费的专栏栏目《论文与完整程序》 论文与完整源程序_电网论文源程序的博客-CSDN博客https://blog.csdn.net/liang674027206/category_12531414.html 电网论文源程序-CSDN博客电网论文源程序擅长文章解读,论文与完整源程序,等方面的知识,电网论文源程序关注python