Spark核心RDD:计算函数compute

    xiaoxiao2021-09-18  49

    RDD的计算是惰性的,一系列转换操作只有在遇到动作操作是才会去计算数据,而分区作为数据计算的基本单位。在计算链中,无论一个RDD有多么复杂,其最终都会调用内部的compute函数来计算一个分区的数据。

    1.compute方法

    RDD抽象类要求其所有子类都必须实现compute方法,该方法介绍的参数之一是一个Partition对象,目的是计算该分区中的数据。以 MapPartitionsRDD类为例,其compute方法如下 override def compute(split: Partition, context: TaskContext): Iterator[U] = f(context, split.index, firstParent[T].iterator(split, context))MapPartitionsRDD类的compute方法调用当前RDD内的第一个父RDD的iterator方法,该方法的目的是拉取父RDD对应分区的数据,iterator方法会返回一个迭代器对象,迭代器内部存储的每一个元素即父RDD对应分区内的数据记录。 RDD的粗粒度转换体现在map方法上,f函数是map转换操作函数,RDD会对一个分区(而不是一条一条数据记录)内的数据执行单的的操作f,最终返回包含所有经过转换过的数据记录的新迭代器,即新的分区。 其他RDD子类的compute方法与之类似,在需要用用到父RDD的分区数据时,就会调用iterator方法,然后根据需求在得到的数据上执行相应的操作。换句话说,compute函数负责的是父RDD分区数据到子RDD分区数据的变换逻辑。

    2.iterator方法

    iterator方法在 RDD抽象类中,代码如下 final def iterator(split: Partition, context: TaskContext): Iterator[T] = { if (storageLevel != StorageLevel.NONE) { SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel) } else { computeOrReadCheckpoint(split, context) } }iterator方法首先检查当前RDD的存储级别

    (1)如果存储级别不为none

    说明分区数据要么已经存储在文件系统中,要么当前的RDD曾经执行过cache、persist等持久化操作,因此需要想办法把数据从存储介质中提取出来。iterator方法会继续调用cacheManager的getOrCompute方法

    /** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. */ def getOrCompute[T]( rdd: RDD[T], partition: Partition, context: TaskContext, storageLevel: StorageLevel): Iterator[T] = { //....只保留我们关心的代码... val key = RDDBlockId(rdd.id, partition.index) blockManager.get(key) match { case Some(blockResult) => // 分区已经持久化直接返回.. case None => val computedValues = rdd.computeOrReadCheckpoint(partition, context) val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks) } } }

    <1>据已经在存储在存储介质中,可能数据本身就在存储介质(如读取HDFS中的文件创建得到的RDD)当中,也可能是RDD见过持久化后保存后的数据,这时候数据可以成功提取到并将其返回。

    <2>数据不在存储介质中,可能是数据丢失,或者 RDD经过持久化操作,但是当前分区是第一次被计算,因此会出现拉取数据为none的情况。这就意味着需要重新计算分区数据,继续调用RDD类computeOrReadCheckpoint方法,并将计算得到的数据缓存到存储介质中,下次就无需再重复计算。

    (2)如果存储级别为none

    说明未经持久化的RDD,需要重新计算RDD内的数据,这时候调用RDD类 computeOrReadCheckpoint方法,该方法也在持久化RDD的分区获取数据失败时被调用。 private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = { if (isCheckpointedAndMaterialized) { firstParent[T].iterator(split, context) } else { compute(split, context) } }computeOrReadCheckpoint方法会检查当前RDD是否已经被标记成检查点( Spark基础随笔:持久化&检查点),如果未被标记成检查点,则执行自身的compute方法来计算分区数据,否则就直接拉取父RDD分区内的数据。 从某种意义上来说,对于没有持久化的RDD,compute方法实现了依赖链的递归调用(compute->firstParent.iterator->compute)
    转载请注明原文地址: https://ju.6miu.com/read-677689.html

    最新回复(0)