介绍Stage的定义,DAGScheduler划分Stage流程。
查看Stage定义 Stage中有两个重要属性,rdd和parents,分别记录的是切分处的RDD和父Stage信息,这一点结合我后面的例子更好理解。Stage有两个子类,ShuffleMapStage、ResultStage,两者分别增加了一个重要属性信息,如下
stage差异属性作用ShuffleMapStageshuffleDep: ShuffleDependency保存Dependency信息ResultStagefunc: (TaskContext, Iterator[_]) => _保存action对应的处理函数上一篇博客Spark DAG之SubmitJob最后讲到调用DAGScheduler的handleJobSubmitted方法处理JobSubmitted事件,查看该方法 如上图注释处,handleJobSubmitted方法主要职责如下
调用newResultStage方法,划分DAG生成Stage。创建ActiveJob,并添加到对应集合管理。调用submitStage、submitWaitingStages提交Stage。DAGScheduler的newResultStage方法负责划分DAG生成Stage,newResultStage方法依次调用getParentStagesAndId、getParentStages方法,查看getParentStages方法 如上图注释处,会在shuffle处切分DAG,并且getShuffleMapStage方法会递归切分shufDep.rdd,getShuffleMapStage方法涉及到的方法调用过多,限于篇幅不再介绍。
通过一个例子,详细分析整个划分DAG流程。下面代码是Wordcount的变形,这么处理是为了让代码简单的同时,Stage也足够丰富。
val sc = new SparkContext("local","wordcount") val data = sc.parallelize(List("a c", "a b", "b c", "b d", "c d"), 2) val wordcount = data.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).map(x => (x._2, x._1)).reduceByKey(_ + _) val data2 = sc.parallelize(List("a c", "a b", "b c", "b d", "c d"), 2) val wordcount2 = data2.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).map(x => (x._2, x._1)).reduceByKey(_ + _) wordcount.join(wordcount2).collect()打印出上述代码中RDD的依赖关系 整理如下
最左一列的parallelize、map等表示实例代码中的transformation。圆角矩形表示transformation操作生成的RDD和该RDD的Dependency,其中ShuffleDependency使用蓝色标注。在上图ShuffleDependency处切分DAG生成Stage,结果如下
圆角矩形代表Stage,结果为四个ShuffleMapStage ,一个ResultStage。圆角矩形内为Stage的两个属性。ShuffleMapStage和ResultStage有差别。到这里,Stage就划分完成了,最后贴张spark webUI的图片
介绍了Stage的结构和实现类,举了一个例子,从物理结构上介绍了Stage的划分,以及划分后的Stage保存了哪些重要信息,了解这些是后续分析根据Stage生成Task的前提。