Spark具有用于在计算之间调度资源的若干设施。首先,回想一下,如集群模式概述中所述,每个Spark应用程序(SparkContext的实例)运行一组独立的执行器进程。Spark运行的集群管理器提供用于跨应用程序调度的工具。其次, 在每个Spark应用程序中,如果多个“作业”(Spark操作)由不同的线程提交,它们可能会同时运行。如果您的应用程序通过网络提供请求,这是很常见的。Spark包括公平的调度程序来调度每个SparkContext中的资源。
当在集群上运行时,每个Spark应用程序都会获得一组独立的执行器JVM,这些JVM仅运行任务并存储该应用程序的数据。如果多个用户需要共享您的集群,则有不同的选项来管理分配,具体取决于集群管理器。
在所有集群管理器上可用的最简单的选项是资源的静态分区。使用这种方法,每个应用程序被给予它可以使用的最大量的资源,并且在其整个持续时间内保持它们。这是在Spark的独立 和YARN模式中使用的方法,以及 粗粒度Mesos模式。基于集群类型,可以按如下所示配置资源分配:
独立模式:默认情况下,提交到独立模式集群的应用程序将以FIFO(先进先出)顺序运行,每个应用程序将尝试使用所有可用节点。您可以通过设置应用程序使用的配置spark.cores.max属性来限制应用程序使用的节点数,或者更改未设置此设置的应用程序的默认值spark.deploy.defaultCores。最后,除了控制核心,每个应用程序的spark.executor.memory设置控制其内存使用。Mesos:要在Mesos上使用静态分区,请将spark.mesos.coarse配置属性设置为true,并且可选地设置spark.cores.max为在独立模式下限制每个应用程序的资源共享。您还应设置spark.executor.memory为控制执行程序内存。YARN:--num-executors Spark YARN客户端的选项控制它将在集群上分配多少个执行器(spark.executor.instances作为配置属性),而--executor-memory (spark.executor.memory配置属性)和--executor-cores(spark.executor.cores配置属性)控制每个执行器的资源。有关更多信息,请参阅 YARN Spark属性。在Mesos上可用的第二个选项是CPU核心的动态共享。在此模式下,每个Spark应用程序仍然具有固定和独立的内存分配(设置spark.executor.memory),但是当应用程序不在机器上运行任务时,其他应用程序可能会在这些内核上运行任务。当您需要大量未过度活动的应用程序(如来自单独用户的shell会话)时,此模式非常有用。然而,它具有较少可预测的延迟的风险,因为它可能需要一段时间,一个应用程序获得一个节点上的核心,当它有工作要做。要使用此模式,只需使用 mesos://URL并设置spark.mesos.coarse为false。
请注意,目前没有一种模式提供跨应用程序的内存共享。如果您想要以这种方式共享数据,我们建议运行单个服务器应用程序,通过查询相同的RDD来提供多个请求。
Spark提供了一种机制,根据工作负载动态调整应用程序占用的资源。这意味着,如果不再使用资源,您的应用程序可能会将资源返回给群集,并在稍后需要时再次请求它们。如果多个应用程序在Spark集群中共享资源,此功能特别有用。
默认情况下禁用此功能,并且在所有粗粒度集群管理器上可用,即 独立模式,YARN模式和 Mesos粗粒度模式。
使用此功能有两个要求。首先,您的应用程序必须设置 spark.dynamicAllocation.enabled为true。其次,您必须 在同一集群中的每个工作节点上设置外部shuffle服务,并在应用程序中设置spark.shuffle.service.enabled为true。外部shuffle服务的目的是允许删除执行器而不删除它们编写的随机文件(更多细节将 在下面描述)。设置此服务的方式因群集管理器而异:
在独立模式下,只需启动你的工作者,spark.shuffle.service.enabled设置为true。
在Mesos粗粒度模式下,$SPARK_HOME/sbin/start-mesos-shuffle-service.sh在spark.shuffle.service.enabled设置为的所有从节点上运行true。例如,你可以通过马拉松来这样做。
在YARN模式下,按照此处的说明进行操作。
所有其他相关配置都是可选的,位于spark.dynamicAllocation.*和 spark.shuffle.service.*名称空间下。有关详细信息,请参阅 配置页。
在高级别,Spark应该在不再使用执行程序时放弃执行程序,并在需要时获得执行程序。由于没有确定的方法来预测即将被删除的执行器在不久的将来是否将运行任务,或者是否要添加的新执行器实际上将是空闲的,我们需要一组启发式来确定何时删除和请求执行者。
启用了动态分配的Spark应用程序在具有等待调度的挂起任务时请求其他执行程序。该条件必然意味着现有的执行器集合不足以同时饱和已经提交但尚未完成的所有任务。
Spark会轮询请求执行器。当等待任务持续spark.dynamicAllocation.schedulerBacklogTimeout数秒时,触发实际请求,然后spark.dynamicAllocation.sustainedSchedulerBacklogTimeout如果等待任务队列仍然存在,则每隔一秒再次触发实际请求。此外,每轮中请求的执行器数量与上一轮相比呈指数增长。例如,应用程序将在第一轮中添加1个执行程序,然后在后续轮中添加2,4,8等等执行程序。
指数增长政策的动机是双重的。首先,应用程序应该在开始时谨慎地请求执行器,以防事实证明只有少数额外的执行器是足够的。这回应了TCP慢启动的理由。第二,应用程序应该能够及时提高其资源使用率,以防万一确实需要许多执行程序。
删除执行程序的策略要简单得多。Spark应用程序会在执行程序空闲超过spark.dynamicAllocation.executorIdleTimeout几秒钟时删除它。注意,在大多数情况下,该条件与请求条件互斥,因为如果仍有待调度的待决任务,则执行器不应该是空闲的。
在动态分配之前,Spark执行器会在失败或关联的应用程序退出时退出。在这两种情况下,不再需要与执行器相关联的所有状态,并且可以安全地丢弃它们。然而,使用动态分配,当显式删除执行器时,应用程序仍在运行。如果应用程序尝试访问存储在执行程序中或由执行程序写入的状态,则它将必须执行重新计算状态。因此,Spark需要一种机制,通过在删除执行器之前保持其状态来正常停止执行器。
这个要求对于shuffle特别重要。在随机播放期间,Spark执行程序首先将其自己的映射输出本地写入磁盘,然后在其他执行程序尝试读取这些文件时充当这些文件的服务器。如果stragglers(其任务运行时间比它们的同伴长得多),动态分配可以在shuffle完成之前移除执行器,在这种情况下,必须重新计算由该执行器写入的shuffle文件。
保存shuffle文件的解决方案是使用外部shuffle服务,也在Spark 1.2中引入。此服务指的是一个长时间运行的进程,在您的集群的每个节点上运行,与Spark应用程序及其执行程序无关。如果服务已启用,Spark执行程序将从服务获取随机播放文件,而不是彼此。这意味着执行者写入的任何洗牌状态可以继续在执行者的生命周期之外被服务。
除了编写随机播放文件之外,执行器还可以在磁盘或内存中缓存数据。但是,当删除执行程序时,所有缓存的数据将无法再访问。为了减轻这种情况,默认情况下,包含缓存数据的执行器不会被删除。您可以使用配置此行为 spark.dynamicAllocation.cachedExecutorIdleTimeout。在将来的版本中,缓存的数据可以通过堆外存储来保留,类似于如何通过外部shuffle服务保留随机文件。
在给定的Spark应用程序(SparkContext实例)中,多个并行作业可以同时运行,如果它们是从单独的线程提交的。通过“工作”,在本节中,我们的意思是一个Spark操作(例如save, collect),并在需要运行,以评估行动的任何任务。Spark的调度程序是完全线程安全的,并支持这种用例,以启用服务多个请求(例如多个用户的查询)的应用程序。
默认情况下,Spark的调度程序以FIFO方式运行作业。每个作业分为“阶段”(例如,映射和缩减阶段),并且第一个作业在所有可用资源上获得优先级,而其阶段具有要启动的任务,然后第二个作业获得优先级等。如果作业在头部队列不需要使用整个集群,稍后的作业可以立即开始运行,但是如果队列头部的作业较大,则稍后的作业可能显着延迟。
从Spark 0.8开始,也可以配置作业之间的公平共享。在公平共享下,Spark以“循环”方式在作业之间分配任务,以便所有作业获得大致相等的群集资源份额。这意味着,在长作业正在运行时提交的短作业可以立即开始接收资源,并且仍然获得良好的响应时间,而无需等待长作业完成。此模式最适合多用户设置。
要启用公平调度程序,只需设置spark.scheduler.mode属性为FAIR配置SparkContext时:
val conf = new SparkConf().setMaster(...).setAppName(...) conf.set("spark.scheduler.mode", "FAIR") val sc = new SparkContext(conf)公平调度器还支持将作业分组为池,并且为每个池设置不同的调度选项(例如权重)。这对于为更重要的作业创建“高优先级”池或者将每个用户的作业分组在一起并给予用户平等的份额是有用的,而不管他们具有多少并发作业,而不是给予作业相等份额。这种方法是在Hadoop Fair Scheduler之后建模的 。
没有任何干预,新提交的作业将进入默认池,但可以通过将spark.scheduler.pool“本地属性”添加到提交它们的线程中的SparkContext 来设置作业池。这是这样做的:
// Assuming sc is your SparkContext variable sc.setLocalProperty("spark.scheduler.pool", "pool1")设置此本地属性后,所有作业(在此线程通过调用这个线程内提交RDD.save,count,collect等),将使用此池名称。该设置是根据线程,以便于线程代表同一用户运行多个作业。如果你想清除线程关联的池,只需调用:
sc.setLocalProperty("spark.scheduler.pool", null)默认情况下,每个池获得集群的相等份额(也等于默认池中的每个作业的份额),但在每个池中,作业按FIFO顺序运行。例如,如果您为每个用户创建一个池,这意味着每个用户将获得集群的相等份额,并且每个用户的查询将按顺序运行,而不是以后的查询从该用户的较早查询获取资源。
特定池的属性也可以通过配置文件进行修改。每个池支持三个属性:
schedulingMode:这可以是FIFO或FAIR,以控制池中的作业是否在彼此之后排队(默认)或公平地共享池的资源。weight:这将控制池相对于其他池的池的共享。默认情况下,所有池的权重为1.如果您给某个特定池的权重为2,那么它将获得与其他活动池相同的2倍的资源。设置诸如1000的高权重也使得可以在池之间实现 优先级 - 本质上,当weight-1000池具有作业活动时,它将总是首先启动任务。minShare:除了总体权重,每个池可以给予管理员希望拥有的最小份额(作为CPU核心的数量)。在根据权重重新分配额外资源之前,公平调度器总是尝试满足所有活动池的最小份额。因此,该minShare属性可以是另一种方式,以确保池可以总是快速达到一定数量的资源(例如10个核心),而不必为集群的其余部分赋予高优先级。默认情况下,每个池minShare为0。可以通过创建XML文件来设置池属性,类似于conf/fairscheduler.xml.template,并spark.scheduler.allocation.file在SparkConf中设置属性 。
conf.set("spark.scheduler.allocation.file", "/path/to/file")XML文件的格式仅仅是<pool>每个池的一个元素,其中包含用于各种设置的不同元素。例如:
<?xml version="1.0"?> <allocations> <pool name="production"> <schedulingMode>FAIR</schedulingMode> <weight>1</weight> <minShare>2</minShare> </pool> <pool name="test"> <schedulingMode>FIFO</schedulingMode> <weight>2</weight> <minShare>3</minShare> </pool> </allocations>一个完整的例子也可以在conf/fairscheduler.xml.template。请注意,未在XML文件中配置的任何池将简单地获取所有设置(调度模式FIFO,权重1和minShare 0)的默认值。
原文http://spark.apache.org/docs/latest/job-scheduling.html#graceful-decommission-of-executors