RDD的计算是惰性的,一系列转换操作只有在遇到动作操作是才会去计算数据,而分区作为数据计算的基本单位。在计算链中,无论一个RDD有多么复杂,其最终都会调用内部的compute函数来计算一个分区的数据。
1.compute方法
RDD抽象类要求其所有子类都必须实现compute方法,该方法介绍的参数之一是一个Partition对象,目的是计算该分区中的数据。以
MapPartitionsRDD类为例,其compute方法如下
override def compute(split: Partition, context: TaskContext): Iterator[U] =
f(context, split.index, firstParent[T].iterator(split, context))MapPartitionsRDD类的compute方法调用当前RDD内的第一个父RDD的iterator方法,该方法的目的是拉取父RDD对应分区的数据,iterator方法会返回一个迭代器对象,迭代器内部存储的每一个元素即父RDD对应分区内的数据记录。
RDD的粗粒度转换体现在map方法上,f函数是map转换操作函数,RDD会对一个分区(而不是一条一条数据记录)内的数据执行单的的操作f,最终返回包含所有经过转换过的数据记录的新迭代器,即新的分区。
其他RDD子类的compute方法与之类似,在需要用用到父RDD的分区数据时,就会调用iterator方法,然后根据需求在得到的数据上执行相应的操作。换句话说,compute函数负责的是父RDD分区数据到子RDD分区数据的变换逻辑。
2.iterator方法
iterator方法在
RDD抽象类中,代码如下
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
} else {
computeOrReadCheckpoint(split, context)
}
}iterator方法首先检查当前RDD的存储级别
(1)如果存储级别不为none
说明分区数据要么已经存储在文件系统中,要么当前的RDD曾经执行过cache、persist等持久化操作,因此需要想办法把数据从存储介质中提取出来。iterator方法会继续调用cacheManager的getOrCompute方法
/** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. */
def getOrCompute[T](
rdd: RDD[T],
partition: Partition,
context: TaskContext,
storageLevel: StorageLevel): Iterator[T] = {
//....只保留我们关心的代码...
val key = RDDBlockId(rdd.id, partition.index)
blockManager.get(key) match {
case Some(blockResult) =>
// 分区已经持久化直接返回..
case None =>
val computedValues = rdd.computeOrReadCheckpoint(partition, context)
val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
}
}
}
<1>据已经在存储在存储介质中,可能数据本身就在存储介质(如读取HDFS中的文件创建得到的RDD)当中,也可能是RDD见过持久化后保存后的数据,这时候数据可以成功提取到并将其返回。
<2>数据不在存储介质中,可能是数据丢失,或者
RDD经过持久化操作,但是当前分区是第一次被计算,因此会出现拉取数据为none的情况。这就意味着需要重新计算分区数据,继续调用RDD类computeOrReadCheckpoint方法,并将计算得到的数据缓存到存储介质中,下次就无需再重复计算。
(2)如果存储级别为none
说明未经持久化的RDD,需要重新计算RDD内的数据,这时候调用RDD类
computeOrReadCheckpoint方法,该方法也在持久化RDD的分区获取数据失败时被调用。
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
if (isCheckpointedAndMaterialized) {
firstParent[T].iterator(split, context)
} else {
compute(split, context)
}
}computeOrReadCheckpoint方法会检查当前RDD是否已经被标记成检查点(
Spark基础随笔:持久化&检查点),如果未被标记成检查点,则执行自身的compute方法来计算分区数据,否则就直接拉取父RDD分区内的数据。
从某种意义上来说,对于没有持久化的RDD,compute方法实现了依赖链的递归调用(compute->firstParent.iterator->compute)
转载请注明原文地址: https://ju.6miu.com/read-677689.html