Spark基础随笔:持久化&检查点

    xiaoxiao2021-08-24  79

    1.持久化

    Spark持久化过程包括persist、cache、upersist3个操作 /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def persist(): this.type = persist(StorageLevel.MEMORY_ONLY) /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def cache(): this.type = persist() /** * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. * * @param blocking Whether to block until all blocks are deleted. * @return This RDD. */ def unpersist(blocking: Boolean = true): this.type = { logInfo("Removing RDD " + id + " from persistence list") sc.unpersistRDD(id, blocking) storageLevel = StorageLevel.NONE this } cache方法等价于StorageLevel.MEMORY_ONLY的persist方法,而persist方法也仅仅是简单修改了当前RDD的存储级别而已,SparkContext中维护了一张哈希表persistRdds,用于登记所有被持久化的RDD,执行persist操作是,会将RDD的编号作为键,把RDD记录到persistRdds表中,unpersist函数会调用SparkContext对象的unpersistRDD方法,除了将RDD从哈希表persistRdds中移除之外,该方法还会将该RDD中的分区对于的所有块从存储介质中删除。 如下给出持久化的类型 object StorageLevel { val NONE = new StorageLevel(false, false, false, false) val DISK_ONLY = new StorageLevel(true, false, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, false, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val OFF_HEAP = new StorageLevel(false, false, true, false) class StorageLevel private( private var _useDisk: Boolean, private var _useMemory: Boolean, private var _useOffHeap: Boolean, private var _deserialized: Boolean, private var _replication: Int = 1) extends Externalizable

    2.检查点

    检查点机制的实现和持久化的实现有着较大的区别。检查点并非第一次计算就将结果进行存储,而是等到一个作业结束后启动专门的一个作业完成存储的操作。 checkPoint操作的实现在RDD类中, checkPoint方法会实例化ReliableRDDCheckpointData用于标记当前的RDD /** * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint * directory set with `SparkContext#setCheckpointDir` and all references to its parent * RDDs will be removed. This function must be called before any job has been * executed on this RDD. It is strongly recommended that this RDD is persisted in * memory, otherwise saving it on a file will require recomputation. */ def checkpoint(): Unit = RDDCheckpointData.synchronized { if (context.checkpointDir.isEmpty) { throw new SparkException("Checkpoint directory has not been set in the SparkContext") } else if (checkpointData.isEmpty) { checkpointData = Some(new ReliableRDDCheckpointData(this)) } } RDDCheckpointData类内部有一个枚举类型 CheckpointState  /** * Enumeration to manage state transitions of an RDD through checkpointing * [ Initialized --> checkpointing in progress --> checkpointed ]. */ private[spark] object CheckpointState extends Enumeration { type CheckpointState = Value val Initialized, CheckpointingInProgress, Checkpointed = Value } 用于表示RDD检查点的当前状态,其值有Initialized 、CheckpointingInProgress、 checkpointed。其转换过程如下 (1)Initialized状态 该状态是实例化ReliableRDDCheckpointData后的默认状态,用于标记当前的RDD已经建立了检查点(较v1.4.x少一个MarkForCheckPiont状态) (2)CheckpointingInProgress状态 每个作业结束后都会对作业的末RDD调用其doCheckPoint方法,该方法会顺着RDD的关系依赖链往前遍历,直到遇见内部RDDCheckpointData对象被标记为Initialized的为止,此时将RDD的RDDCheckpointData对象标记为CheckpointingInProgress,并启动一个作业完成数据的写入操作。 (3)Checkpointed状态 新启动作业完成数据写入操作之后,将建立检查点的RDD的所有依赖全部清除,将RDD内部的RDDCheckpointData对象标记为Checkpointed,将父RDD重新设置为一个CheckPointRDD对象, 父RDD的compute方法会直接从系统中读取数据。 如上只简单地介绍了相关概念,详细介绍请参看: https://github.com/JerryLead/SparkInternals/blob/master/markdown/6-CacheAndCheckpoint.md
    转载请注明原文地址: https://ju.6miu.com/read-676992.html

    最新回复(0)