SparkのRDD

    xiaoxiao2024-12-24  17

    转载自:http://blog.sina.com.cn/s/blog_4a7854d90102wrvb.html               http://blog.csdn.net/slq1023/article/details/50927954               http://ifeve.com/%e3%80%8aspark-%e5%ae%98%e6%96%b9%e6%96%87%e6%a1%a3%e3%80%8bspark%e7%bc%96%e7%a8%8b%e6%8c%87%e5%8d%97/

    RDD介绍

           RDD(Resillient Distributed DataSets)弹性分布式数据集是基于工作集的。        1.自动的进行内存和磁盘数据存储的切换;        2.基于Lineage的高校容错;        3.Task如果失败会自动进行特定次数的重试;        4.Stage如果失败会自动进行特定次数的重试,而且只会只计算失败的分片;        5.checkpoint和persist;        6.数据调度弹性:DAG,TASK和资源管理无关;        7.数据分片的高度弹性,自行设置分片coalesce(分区减少,不用设置shuffle=true,如果强制设置true,也是会shuffle;相反,则要设置shuffle)。        Spark适合任意(大、小)规模的计算原因:基于内存迭代;shuffle多种实现。        RDD是分布式函数式编程的抽象。Spark的每一步操作都是对RDD进行操作,RDD是一个可分区的元素集合,其包含的元素可以分布在集群各个节点上,并且可以执行一些分布式并行操作。基于RDD的编程一般都是通过高阶函数的方式。        RDD的核心之一就是它的Lazy,因为这不计算,开始时只是对数据处理进行标记而已。例如WordCount中的map、flatmap其实并不计算数据,只是对数据操作的标记而已。        例如flatMap的源码如下: /** * Return a new RDD by first applying a function to all elements of this * RDD, and then flattening the results. */ def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF)) }        可以看出在flatMap中创建了一个MapPartitionsRDD,但第一个参数是this,这个this是指它依赖的父RDD。每次创建新的RDD都会把父RDD作为第一个参数传入,所以RDD每次构建对象都依赖于父RDD。        由于RDD是只读的,为了应对计算模型,RDD又是lazy级别的。每次操作都会产生RDD,每次构建新的RDD都是把父RDD作为第一个参数传入,这就构成了一个链条。在最后Action时才触发,这就构成了一个从后往前回溯的过程,其实就是函数展开的过程。        由于这种从后往前的回溯机制,Spark的容错的开销会非常低。

    Spark计算模型

           输入与构造RDD ---> 转换Transformation ---> 输出Action。其中Transformations操作是延迟计算的,而Actions算子会触发Spark提交作业,并将数据输出Spark系统。        RDD支持两种类型的算子(operation):transformation算子 和 action算子;transformation 算子可以将已有RDD转换得到一个新的RDD,而action算子则是基于数据集计算,并将结果返回给驱动器(driver)。        Spark中所有transformation算子都是懒惰的,也就是说,这些算子并不立即计算结果,而是记录下对基础数据集的转换操作。只有等到某个action算子需要计算一个结果返回给驱动器的时候,transformation算子所记录的操作才会被计算。        默认情况下,每次调用action算子的时候,每个由transformation转换得到的RDD都会被重新计算。当然也可以通过调用 persist(或者cache)操作来持久化一个RDD,这意味着Spark将会把RDD的元素都保存在集群中,因此下一次访问这些元素的速度将大大提 高。同时,Spark还支持将RDD元素持久化到内存或者磁盘上,甚至可以支持跨节点多副本。

    RDD的创建

           1.读取文件: val textRDD = sparkContext.textFile("...")        2.在DriverProgram中创建: val collection = sparkContext.parallelize(1 to 100)//根据集合创建了ParallelCollectionRDD        3.依据其它RDD转化: val otherRDD = sparkContext.parallelize(1 to 100)//根据集合创建了ParallelCollectionRDD val newRDD = otherRDD.map(_ * 2)        4.其它方式。。。

    transformation算子

    map   接收一个函数,把这个函数应用到RDD的每一个元素,并返一个函数作用后的新的RDD。

    filter   接收一个函数,返回只包含满足func函数的元素的新RDD。

    flatMap

           对每个输入元素,可以输出多个输出元素。但最终会将RDD中元素压扁后返回一个新的RDD,注意返回的RDD中的元素类型同原RDD中的元素类型相同。

    val a = sparkContext.parallelize(List("spark,flume", "zookeeper", "kafka")) println(a.collect().mkString(" | "))// spark,flume | zookeeper | kafka val b = a.flatMap(_.split(",")) println(b.collect().mkString(" | "))// spark | flume | zookeeper | kafka

           例子中所示,a中元素类型是List[String],b中元素类型也是List[String],虽然对"spark,flume"作split(",")操作会返回是个String[],但是会对这个String[]集合作flat操作,最终为spark、flume单个字符串放到List中。

    distinct           去除重复的元素,该操作是很耗时的操作,因为它需要通过网络,shuffle所有的数据,以保证元素不重复。例如:RDD1(”a”, ”a”, “b”, “c”, “d”),RDD2(”a”, “c”, “e”),RDD1. distinct()结果为(”a” , “b”, “c”, “d”)。 mapPartitions        类似于map,但基于每个RDD分区(或者数据block)独立运行,所以如果RDD包含元素类型为T,则 func 必须是 Iterator<T> => Iterator<U> 的映射函数。 mapPartitionsWithIndex        类似于 mapPartitions,只是func 多了一个整型的分区索引值,因此如果RDD包含元素类型为T,则 func 必须是 Iterator<T> => Iterator<U> 的映射函数。 glom        将每一个分区形成一个数组,形成新的RDD类型时RDD[Array[T]]。 union         对两个RDD求并集。例如:RDD1(”a”, ”a”, “b”, “c”, “d”),RDD2(”a”, “c”, “e”),RDD1. union(RDD2)结果为(”a” , ”a”, “b”, “c”, “d”, ”a”, ”c”, ”e”)。 intersection        求交集。耗时操作,因为需要shuffle。例如:RDD1(”a”, ”a”, “b”, “c”, “d”),RDD2(”a”, “c”, “e”),RDD1. intersection(RDD2)结果为(”a” , “c”)。 subtract        求差集,在第一个RDD中存在,而不存在与第二个RDD的元素。需要shuffle。例如:RDD1(”a”, ”a”, “b”, “c”, “d”),RDD2(”a”, “c”, “e”),RDD1. subtract(RDD2)结果为(”b” , “d”)。 cartesian           笛卡尔积,耗时操作。 val a = sparkContext.parallelize(Array(("a", 1), ("b", 3), ("c", 5))) val b = sparkContext.parallelize(Array(("a", 2), ("c", 6))) val c = a.cartesian(b) c.foreach(println) //笛卡尔积 // ((a,1),(a,2)) // ((a,1),(c,6)) // ((b,3),(a,2)) // ((b,3),(c,6)) // ((c,5),(a,2)) // ((c,5),(c,6)) reduceByKey           接收一个函数,对key相同的value作该函数的调用。 val a = sparkContext.parallelize(List(("a", 1), ("a", 5), ("b", 5))) val b = a.reduceByKey(_ + _) println(b.collect().mkString(" | "))//(a,6) | (b,5) groupByKey           把相同的key的values分组。如源RDD包含 (K, V) 对,则该算子返回一个新的数据集包含 (K, Iterable<V>) 对。 aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])            如果源RDD包含 (K, V) 对,则返回新RDD包含 (K, U) 对,其中每个key对应的value都是由 combOp 函数 和 一个“0”值zeroValue 聚合得到。允许聚合后value类型和输入value类型不同,避免了不必要的开销。和 groupByKey 类似,可以通过可选参数 numTasks 指定reduce任务的个数。 sortByKey([ascending], [numTasks])           如果源RDD包含元素类型 (K, V) 对,其中K可排序,则返回新的RDD包含 (K, V) 对,并按照 K 排序(升序还是降序取决于 ascending 参数) cogroup(otherDataset, [numTasks])        如果源RDD包含元素类型 (K, V) 且参数RDD(otherDataset)包含元素类型(K, W),则返回的新RDD中包含 (K, (Iterable<V>, Iterable<W>))。 mapValues           函数作用于pairRDD的每个元素,key不变。 join           联接操作。 //只取相同的key,value组合 val a = sparkContext.parallelize(Array(("a", 1), ("b", 3), ("c", 5))) val b = sparkContext.parallelize(Array(("a", 2), ("c", 6))) val c = a.join(b) c.foreach(println) // (a,(1,2)) // (c,(5,6)) leftOuterJoin           左外部联接操作。 val a = sparkContext.parallelize(Array(("a", 1), ("b", 3), ("c", 5))) val b = sparkContext.parallelize(Array(("a", 2), ("c", 6))) val c = a.leftOuterJoin(b) c.foreach(println) // (a,(1,Some(2))) // (b,(3,None)) // (c,(5,Some(6)))

    fullOuterJoin

           完全外部联接。

    val a = sparkContext.parallelize(Array(("a", 1), ("b", 3), ("c", 5))) val b = sparkContext.parallelize(Array(("a", 2), ("c", 6))) val c = a.fullOuterJoin(b) c.foreach(println) // (a,(Some(1),Some(2))) // (b,(Some(3),None)) // (c,(Some(5),Some(6))) combineByKey(createCombiner,mergeValue,mergeCombiners,partitioner)        combineByKey()遍历partition中的元素,遍历的元素的key,要么是之前见过的,要么不是。如果是新元素, 则调用createCombiner()函数来创建这个元素的key累加器的初始值。combineByKey是将RDD[(K,V)]转换为RDD[(K,C)]。因此,首先需要提供一个函数,能够完成从V到C的combine,称之为combiner。如果是这个partition中已经存在的key,就会使用mergeValue()函数,把当前累加器的值和新的值累加。合并每个partition的结果的时候,如果多个partitions都有同一个key的累加器,则调用mergeCombiners()函数合计这些累加器。 coalesce(numPartitions)        将RDD的分区数减少到numPartitions。当以后大数据集被过滤成小数据集后,减少分区数,可以提升效率。 repartition(numPartitions)        将RDD数据重新混洗(reshuffle)并随机分布到新的分区中,使数据分布更均衡,新的分区个数取决于numPartitions。该算子总是需要通过网络混洗所有数据。 repartitionAndSortWithinPartitions(partitioner)        根据partitioner(spark自带有HashPartitioner和RangePartitioner等)重新分区RDD,并且在每个结果分区中按key做排序。这是一个组合算子,功能上等价于先 repartition 再在每个分区内排序,但这个算子内部做了优化(将排序过程下推到混洗同时进行),因此性能更好。

    action算子

    reduce(func)        将RDD中元素按func进行聚合(func是一个 (T,T) => T 的映射函数,其中T为源RDD元素类型,并且func需要满足 交换律 和 结合律 以便支持并行计算)。 collect()        将数据集中所有元素以数组形式返回驱动器(driver)程序。通常用于,在RDD进行了filter或其他过滤操作后,将一个足够小的数据子集返回到驱动器内存中。

    count()   返回数据集中元素个数。

    first()   返回数据集中首个元素(类似于 take(1) )。

    take(n)   返回数据集中前 n 个元素。

    countByValue()   返回一个map,表示唯一元素出现的个数。

    countByKey()   只适用于包含键值对(K, V)的RDD,并返回一个哈希表,包含 (K, Int) 对,表示每个key的个数。

    collectAsMap()   把结果作为map返回。

    lookup(key)   提供给定key的所有值。

    foreach(func)        在RDD的每个元素上运行 func 函数。通常被用于累加操作,如:更新一个累加器(Accumulator )或者和外部存储系统互操作。 top(num)        返回前几个元素。 takeSample(withReplacement,num, [seed])        返回数据集的随机采样子集,最多包含 num 个元素,withReplacement 表示是否使用回置采样,最后一个参数为可选参数seed,随机数生成器的种子。 takeOrdered(n, [ordering])         按元素排序(可以通过 ordering 自定义排序规则)后,返回前 n 个元素。 saveAsTextFile(path)         将数据集中元素保存到指定目录下的文本文件中(或者多个文本文件),支持本地文件系统、HDFS 或者其他任何Hadoop支持的文件系统。保存过程中,Spark会调用每个元素的toString方法,并将结果保存成文件中的一行。 saveAsSequenceFile(path)        将数据集中元素保存到指定目录下的Hadoop Sequence文件中,支持本地文件系统、HDFS 或者其他任何Hadoop支持的文件系统。适用于实现了Writable接口的键值对RDD。在Scala中,同样也适用于能够被隐式转换为Writable的类型(Spark实现了所有基本类型的隐式转换,如:Int,Double,String 等)。 saveAsObjectFile(path)        将RDD元素以Java序列化的格式保存成文件,保存结果文件可以使用 SparkContext.objectFile 来读取。
    转载请注明原文地址: https://ju.6miu.com/read-1294953.html
    最新回复(0)