spark的缓存

    xiaoxiao2021-03-25  127

    spark的缓存

    缓存的作用,无论是在传统程序,还是分布式程序,缓存的作用主要针对频繁操作的数据,下次操作的时候直接读取。spark亦是。Spark 支持把数据集拉到集群内的内存缓存中。当要重复访问时这是非常有用的


    cache:定义

    用MEMORY_ONLY储存级别对RDD进行缓存,其内部实现是调用persist()函数的。官方文档定义: Persist this RDD with the default storage level (MEMORY_ONLY)

    函数原型 def cache() : this.type 实例 scala> var data = sc.parallelize(List(1,2,3,4)) data: org.apache.spark.rdd.RDD[Int] =   ParallelCollectionRDD[44] at parallelize at <console>:12 scala> data.getStorageLevel res65: org.apache.spark.storage.StorageLevel =   StorageLevel(false, false, false, false, 1) scala> data.cache res66: org.apache.spark.rdd.RDD[Int] =   ParallelCollectionRDD[44] at parallelize at <console>:12 scala> data.getStorageLevel res67: org.apache.spark.storage.StorageLevel =   StorageLevel(false, true, false, true, 1)

    我们先是定义了一个RDD,然后通过getStorageLevel函数得到该RDD的默认存储级别,这里是NONE。然后我们调用cache函数,将RDD的存储级别改成了MEMORY_ONLY(看StorageLevel的第二个参数)。关于StorageLevel的其他的几种存储级别介绍请参照下面StorageLevel类进行了解。

    cache和persist的区别

    cache和persist都是用于将一个RDD进行缓存的,这样在之后使用的过程中就不需要重新计算了,可以大大节省程序运行时间。源码:

    /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def cache(): this.type = persist()

    说明是cache()调用了persist(), 想要知道二者的不同还需要看一下persist函数:

    /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

    可以看到persist()内部调用了persist(StorageLevel.MEMORY_ONLY),继续深入:

    /** * Set this RDD's storage level to persist its values across operations after the first time * it is computed. This can only be used to assign a new storage level if the RDD does not * have a storage level set yet.. */ def persist(newLevel: StorageLevel): this.type = { // TODO: Handle changes of StorageLevel if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) { throw new UnsupportedOperationException( "Cannot change storage level of an RDD after it was already assigned a level") } sc.persistRDD(this) // Register the RDD with the ContextCleaner for automatic GC-based cleanup sc.cleaner.foreach(_.registerRDDForCleanup(this)) storageLevel = newLevel this }

    可以看出来persist有一个 StorageLevel 类型的参数,这个表示的是RDD的缓存级别。

    至此便可得出cache和persist的区别了:cache只有一个默认的缓存级别MEMORY_ONLY ,而persist可以根据情况设置其它的缓存级别。

    RDD的缓存级别

    顺便看一下RDD都有哪些缓存级别,查看 StorageLevel 类的源码:

    object StorageLevel { val NONE = new StorageLevel(false, false, false, false) val DISK_ONLY = new StorageLevel(true, false, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, false, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, false, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) val OFF_HEAP = new StorageLevel(false, false, true, false) ...... }

    可以看到这里列出了12种缓存级别,但这些有什么区别呢?可以看到每个缓存级别后面都跟了一个StorageLevel的构造函数,里面包含了4个或5个参数,如下

    val MEMORY_ONLY = new StorageLevel(false, true, false, true)

    查看其构造函数

    class StorageLevel private( private var _useDisk: Boolean, private var _useMemory: Boolean, private var _useOffHeap: Boolean, private var _deserialized: Boolean, private var _replication: Int = 1) extends Externalizable { ...... def useDisk: Boolean = _useDisk def useMemory: Boolean = _useMemory def useOffHeap: Boolean = _useOffHeap def deserialized: Boolean = _deserialized def replication: Int = _replication ...... }

    可以看到StorageLevel类的主构造器包含了5个参数: useDisk:使用硬盘(外存) useMemory:使用内存 useOffHeap:使用堆外内存,这是Java虚拟机里面的概念,堆外内存意味着把内存对象分配在Java虚拟机的堆以外的内存,这些内存直接受操作系统管理(而不是虚拟机)。这样做的结果就是能保持一个较小的堆,以减少垃圾收集对应用的影响。 deserialized:反序列化,其逆过程序列化(Serialization)是java提供的一种机制,将对象表示成一连串的字节;而反序列化就表示将字节恢复为对象的过程。序列化是对象永久化的一种机制,可以将对象及其属性保存起来,并能在反序列化后直接恢复这个对象 replication:备份数(在多个节点上备份)

    理解了这5个参数,StorageLevel 的12种缓存级别就不难理解了。

    val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) 就表示使用这种缓存级别的RDD将存储在硬盘以及内存中,使用序列化(在硬盘中),并且在多个节点上备份2份(正常的RDD只有一份) 另外还注意到有一种特殊的缓存级别

    val OFF_HEAP = new StorageLevel(false, false, true, false)

    使用了堆外内存,StorageLevel 类的源码中有一段代码可以看出这个的特殊性,它不能和其它几个参数共存。

    if (useOffHeap) { require(!useDisk, "Off-heap storage level does not support using disk") require(!useMemory, "Off-heap storage level does not support using heap memory") require(!deserialized, "Off-heap storage level does not support deserialized storage") require(replication == 1, "Off-heap storage level does not support multiple replication") }
    转载请注明原文地址: https://ju.6miu.com/read-6788.html

    最新回复(0)