Spark DAG之划分Stage

    xiaoxiao2021-04-18  55

    概要

    介绍Stage的定义,DAGScheduler划分Stage流程。

    Stage

    查看Stage定义 Stage中有两个重要属性,rdd和parents,分别记录的是切分处的RDD和父Stage信息,这一点结合我后面的例子更好理解。Stage有两个子类,ShuffleMapStageResultStage,两者分别增加了一个重要属性信息,如下

    stage差异属性作用ShuffleMapStageshuffleDep: ShuffleDependency保存Dependency信息ResultStagefunc: (TaskContext, Iterator[_]) => _保存action对应的处理函数

    处理JobSubmitted事件

    上一篇博客Spark DAG之SubmitJob最后讲到调用DAGScheduler的handleJobSubmitted方法处理JobSubmitted事件,查看该方法 如上图注释处,handleJobSubmitted方法主要职责如下

    调用newResultStage方法,划分DAG生成Stage。创建ActiveJob,并添加到对应集合管理。调用submitStage、submitWaitingStages提交Stage。

    划分Stage

    DAGScheduler的newResultStage方法负责划分DAG生成Stage,newResultStage方法依次调用getParentStagesAndId、getParentStages方法,查看getParentStages方法 如上图注释处,会在shuffle处切分DAG,并且getShuffleMapStage方法会递归切分shufDep.rdd,getShuffleMapStage方法涉及到的方法调用过多,限于篇幅不再介绍。

    举个例子

    通过一个例子,详细分析整个划分DAG流程。下面代码是Wordcount的变形,这么处理是为了让代码简单的同时,Stage也足够丰富。

    val sc = new SparkContext("local","wordcount") val data = sc.parallelize(List("a c", "a b", "b c", "b d", "c d"), 2) val wordcount = data.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).map(x => (x._2, x._1)).reduceByKey(_ + _) val data2 = sc.parallelize(List("a c", "a b", "b c", "b d", "c d"), 2) val wordcount2 = data2.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).map(x => (x._2, x._1)).reduceByKey(_ + _) wordcount.join(wordcount2).collect()

    打印出上述代码中RDD的依赖关系 整理如下

    最左一列的parallelize、map等表示实例代码中的transformation。圆角矩形表示transformation操作生成的RDD和该RDD的Dependency,其中ShuffleDependency使用蓝色标注。

    在上图ShuffleDependency处切分DAG生成Stage,结果如下

    圆角矩形代表Stage,结果为四个ShuffleMapStage ,一个ResultStage。圆角矩形内为Stage的两个属性。ShuffleMapStage和ResultStage有差别。

    到这里,Stage就划分完成了,最后贴张spark webUI的图片

    总结

    介绍了Stage的结构和实现类,举了一个例子,从物理结构上介绍了Stage的划分,以及划分后的Stage保存了哪些重要信息,了解这些是后续分析根据Stage生成Task的前提。

    转载请注明原文地址: https://ju.6miu.com/read-674770.html

    最新回复(0)