贴上几种典型的,这类型的子类太多了
HadoopRDD[K, V]( @transient sc: SparkContext, broadcastedConf: Broadcast[SerializableConfiguration], initLocalJobConfFuncOpt: Option[JobConf => Unit], inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int) extends RDD // 说明: HadoopRDD 来源于 jsc.textFile 而产生 MapPartitionsRDD[U: ClassTag, T: ClassTag]( prev: RDD[T], f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator) preservesPartitioning: Boolean = false) extends RDD // 说明: MapPartitionsRDD来源于rdd.map 、filter 、 flatMap、 glom、mapPartitions、mapPartitionsWithIndex、 mapPartitionsWithContext 这些操作,Transformation操作大部分就是这种rdd。这rdd未实现 getPreferredLocations 方法 ShuffledRDD[K, V, C]( @transient var prev: RDD[_ <: Product2[K, V]], part: Partitioner) extends RDD // 说明: 来源 action动作先列下它的定义及几个子孙类:
@DeveloperApi abstract class Dependency[T] extends Serializable { def rdd: RDD[T] } @DeveloperApi abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] { def getParents(partitionId: Int): Seq[Int] override def rdd: RDD[T] = _rdd } @DeveloperApi class ShuffleDependency[K, V, C]( @transient _rdd: RDD[_ <: Product2[K, V]], val partitioner: Partitioner, val serializer: Option[Serializer] = None, val keyOrdering: Option[Ordering[K]] = None, val aggregator: Option[Aggregator[K, V, C]] = None, // 这个变量在shuffle中用到,做combine val mapSideCombine: Boolean = false) extends Dependency[Product2[K, V]] { override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]] val shuffleId: Int = _rdd.context.newShuffleId() val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle( shuffleId, _rdd.partitions.size, this) _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this)) } @DeveloperApi class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) { override def getParents(partitionId: Int): List[Int] = List(partitionId) } @DeveloperApi class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int) extends NarrowDependency[T](rdd) { override def getParents(partitionId: Int): List[Int] = { if (partitionId >= outStart && partitionId < outStart + length) { List(partitionId - outStart + inStart) } else { Nil } } }Dependency 类用于标示rdd之间的依赖关系。从上面RDD的定义可以看到 deps: Seq[Dependency[_]] 是RDD的主构造参数。那它是如何初始化的呢?流程如下:
以 rdd_A.map(…) 为例:
map() { // 新建 MapPartitionsRDD 对象,将 rdd_A 本身作为第一个参数 new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)) }再看 MapPartitionsRDD 的构造 :
class MapPartitionsRDD[U: ClassTag, T: ClassTag]( prev: RDD[T], f: (TaskContext, Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false) extends RDD[U](prev)MapPartitionsRDD 第一个参数 prev 来自父类 RDD,构造 MapPartitionsRDD 时先调用 super(prev), 即调RDD的构造函数
super(prev) => 调 RDD私有构造函数: def this(@transient oneParent: RDD[_]) = this(oneParent.context , List(new OneToOneDependency(oneParent))) => 调 RDD主构造函数: RDD[T: ClassTag]( @transient private var _sc: SparkContext, @transient private var deps: Seq[Dependency[_]])可以看到封装为List传入RDD主构造函数的第二个参数即其 deps ;所以RDD的deps在执行map等算子时即在生成的子RDD里初始化。
现在再来看下ShuffleDependency中 aggregator这个变量 这个变量在 shuffle阶段用到,用来做combine,比较重要。 它的定义为:
case class Aggregator[K, V, C] ( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) {...}ShuffleDependency中 aggregator 的产生过程: 在调用RDD的shuffle算子时就会在子RDD中生成 ShuffleDependency成员变量。 流程如下:
rdd.reduceByKey(func) => => rdd.reduceByKey(partitioner, func) => combineByKey[V]((v: V) => v, func, func, partitioner)combineByKey函数的定义如下:(省掉了些七七八八)
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)] { val aggregator = new Aggregator[K, V, C]( self.context.clean(createCombiner), self.context.clean(mergeValue), self.context.clean(mergeCombiners)) if (self.partitioner == Some(partitioner)) { }else{ new ShuffledRDD[K, V, C](self, partitioner) .setSerializer(serializer) .setAggregator(aggregator) .setMapSideCombine(mapSideCombine) } }从源码来看aggregator的构造参数就来自reduceByKey中的’函数参数’。
任务的最终执行者 父类:
private[spark] abstract class Task[T]( val stageId: Int, val stageAttemptId: Int, val partitionId: Int, internalAccumulators: Seq[Accumulator[Long]]) extends Serializable { final def run() { ... (runTask(context), context.collectAccumulators()) // runTask方法为子类实现 ... } def runTask(context: TaskContext): T // 抽象方法 }子类:
private[spark] class ShuffleMapTask( stageId: Int, stageAttemptId: Int, taskBinary: Broadcast[Array[Byte]], // 该成员为序列化的任务信息 partition: Partition, @transient private var locs: Seq[TaskLocation], internalAccumulators: Seq[Accumulator[Long]]) extends Task[MapStatus](stageId, stageAttemptId, partition.index, internalAccumulators) { override def runTask(context: TaskContext): MapStatus = {...} } private[spark] class ResultTask[T, U]( stageId: Int, stageAttemptId: Int, taskBinary: Broadcast[Array[Byte]], partition: Partition, @transient locs: Seq[TaskLocation], val outputId: Int, internalAccumulators: Seq[Accumulator[Long]]) extends Task[U](stageId, stageAttemptId, partition.index, internalAccumulators) with Serializable {task 轨迹: 从 Executor端接收到driver发来的任务执行请求后:
CoarseGrainedExecutorBackend { receive() { case LaunchTask(data) val taskDesc = ser.deserialize[TaskDescription](data.value) // 反序列化 => executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,taskDesc.name, taskDesc.serializedTask) // Executor 启动任务 => { val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,serializedTask) threadPool.execute(tr) // 执行TaskRunner线程 } } }任务线程内:
TaskRunner.run() { task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader) // 继续反序列化 task.run(...) => task.runTask() // 子类执行 runTask() }任务最佳位置
private[spark] sealed trait TaskLocation { def host: String } private [spark] case class ExecutorCacheTaskLocation(override val host: String, executorId: String) extends TaskLocation private [spark] case class HostTaskLocation(override val host: String) extends TaskLocation { override def toString: String = host } private [spark] case class HDFSCacheTaskLocation(override val host: String) extends TaskLocation { override def toString: String = TaskLocation.inMemoryLocationTag + host } private[spark] object TaskLocation { val inMemoryLocationTag = "hdfs_cache_" def apply(host: String, executorId: String): TaskLocation = { new ExecutorCacheTaskLocation(host, executorId) } def apply(str: String): TaskLocation = { val hstr = str.stripPrefix(inMemoryLocationTag) if (hstr.equals(str)) { new HostTaskLocation(str) } else { new HostTaskLocation(hstr) } } }