map 接收一个函数,把这个函数应用到RDD的每一个元素,并返一个函数作用后的新的RDD。
filter 接收一个函数,返回只包含满足func函数的元素的新RDD。
flatMap
对每个输入元素,可以输出多个输出元素。但最终会将RDD中元素压扁后返回一个新的RDD,注意返回的RDD中的元素类型同原RDD中的元素类型相同。
val a = sparkContext.parallelize(List("spark,flume", "zookeeper", "kafka")) println(a.collect().mkString(" | "))// spark,flume | zookeeper | kafka val b = a.flatMap(_.split(",")) println(b.collect().mkString(" | "))// spark | flume | zookeeper | kafka例子中所示,a中元素类型是List[String],b中元素类型也是List[String],虽然对"spark,flume"作split(",")操作会返回是个String[],但是会对这个String[]集合作flat操作,最终为spark、flume单个字符串放到List中。
distinct 去除重复的元素,该操作是很耗时的操作,因为它需要通过网络,shuffle所有的数据,以保证元素不重复。例如:RDD1(”a”, ”a”, “b”, “c”, “d”),RDD2(”a”, “c”, “e”),RDD1. distinct()结果为(”a” , “b”, “c”, “d”)。 mapPartitions 类似于map,但基于每个RDD分区(或者数据block)独立运行,所以如果RDD包含元素类型为T,则 func 必须是 Iterator<T> => Iterator<U> 的映射函数。 mapPartitionsWithIndex 类似于 mapPartitions,只是func 多了一个整型的分区索引值,因此如果RDD包含元素类型为T,则 func 必须是 Iterator<T> => Iterator<U> 的映射函数。 glom 将每一个分区形成一个数组,形成新的RDD类型时RDD[Array[T]]。 union 对两个RDD求并集。例如:RDD1(”a”, ”a”, “b”, “c”, “d”),RDD2(”a”, “c”, “e”),RDD1. union(RDD2)结果为(”a” , ”a”, “b”, “c”, “d”, ”a”, ”c”, ”e”)。 intersection 求交集。耗时操作,因为需要shuffle。例如:RDD1(”a”, ”a”, “b”, “c”, “d”),RDD2(”a”, “c”, “e”),RDD1. intersection(RDD2)结果为(”a” , “c”)。 subtract 求差集,在第一个RDD中存在,而不存在与第二个RDD的元素。需要shuffle。例如:RDD1(”a”, ”a”, “b”, “c”, “d”),RDD2(”a”, “c”, “e”),RDD1. subtract(RDD2)结果为(”b” , “d”)。 cartesian 笛卡尔积,耗时操作。 val a = sparkContext.parallelize(Array(("a", 1), ("b", 3), ("c", 5))) val b = sparkContext.parallelize(Array(("a", 2), ("c", 6))) val c = a.cartesian(b) c.foreach(println) //笛卡尔积 // ((a,1),(a,2)) // ((a,1),(c,6)) // ((b,3),(a,2)) // ((b,3),(c,6)) // ((c,5),(a,2)) // ((c,5),(c,6)) reduceByKey 接收一个函数,对key相同的value作该函数的调用。 val a = sparkContext.parallelize(List(("a", 1), ("a", 5), ("b", 5))) val b = a.reduceByKey(_ + _) println(b.collect().mkString(" | "))//(a,6) | (b,5) groupByKey 把相同的key的values分组。如源RDD包含 (K, V) 对,则该算子返回一个新的数据集包含 (K, Iterable<V>) 对。 aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 如果源RDD包含 (K, V) 对,则返回新RDD包含 (K, U) 对,其中每个key对应的value都是由 combOp 函数 和 一个“0”值zeroValue 聚合得到。允许聚合后value类型和输入value类型不同,避免了不必要的开销。和 groupByKey 类似,可以通过可选参数 numTasks 指定reduce任务的个数。 sortByKey([ascending], [numTasks]) 如果源RDD包含元素类型 (K, V) 对,其中K可排序,则返回新的RDD包含 (K, V) 对,并按照 K 排序(升序还是降序取决于 ascending 参数) cogroup(otherDataset, [numTasks]) 如果源RDD包含元素类型 (K, V) 且参数RDD(otherDataset)包含元素类型(K, W),则返回的新RDD中包含 (K, (Iterable<V>, Iterable<W>))。 mapValues 函数作用于pairRDD的每个元素,key不变。 join 联接操作。 //只取相同的key,value组合 val a = sparkContext.parallelize(Array(("a", 1), ("b", 3), ("c", 5))) val b = sparkContext.parallelize(Array(("a", 2), ("c", 6))) val c = a.join(b) c.foreach(println) // (a,(1,2)) // (c,(5,6)) leftOuterJoin 左外部联接操作。 val a = sparkContext.parallelize(Array(("a", 1), ("b", 3), ("c", 5))) val b = sparkContext.parallelize(Array(("a", 2), ("c", 6))) val c = a.leftOuterJoin(b) c.foreach(println) // (a,(1,Some(2))) // (b,(3,None)) // (c,(5,Some(6)))fullOuterJoin
完全外部联接。
val a = sparkContext.parallelize(Array(("a", 1), ("b", 3), ("c", 5))) val b = sparkContext.parallelize(Array(("a", 2), ("c", 6))) val c = a.fullOuterJoin(b) c.foreach(println) // (a,(Some(1),Some(2))) // (b,(Some(3),None)) // (c,(Some(5),Some(6))) combineByKey(createCombiner,mergeValue,mergeCombiners,partitioner) combineByKey()遍历partition中的元素,遍历的元素的key,要么是之前见过的,要么不是。如果是新元素, 则调用createCombiner()函数来创建这个元素的key累加器的初始值。combineByKey是将RDD[(K,V)]转换为RDD[(K,C)]。因此,首先需要提供一个函数,能够完成从V到C的combine,称之为combiner。如果是这个partition中已经存在的key,就会使用mergeValue()函数,把当前累加器的值和新的值累加。合并每个partition的结果的时候,如果多个partitions都有同一个key的累加器,则调用mergeCombiners()函数合计这些累加器。 coalesce(numPartitions) 将RDD的分区数减少到numPartitions。当以后大数据集被过滤成小数据集后,减少分区数,可以提升效率。 repartition(numPartitions) 将RDD数据重新混洗(reshuffle)并随机分布到新的分区中,使数据分布更均衡,新的分区个数取决于numPartitions。该算子总是需要通过网络混洗所有数据。 repartitionAndSortWithinPartitions(partitioner) 根据partitioner(spark自带有HashPartitioner和RangePartitioner等)重新分区RDD,并且在每个结果分区中按key做排序。这是一个组合算子,功能上等价于先 repartition 再在每个分区内排序,但这个算子内部做了优化(将排序过程下推到混洗同时进行),因此性能更好。count() 返回数据集中元素个数。
first() 返回数据集中首个元素(类似于 take(1) )。
take(n) 返回数据集中前 n 个元素。
countByValue() 返回一个map,表示唯一元素出现的个数。
countByKey() 只适用于包含键值对(K, V)的RDD,并返回一个哈希表,包含 (K, Int) 对,表示每个key的个数。
collectAsMap() 把结果作为map返回。
lookup(key) 提供给定key的所有值。
foreach(func) 在RDD的每个元素上运行 func 函数。通常被用于累加操作,如:更新一个累加器(Accumulator )或者和外部存储系统互操作。 top(num) 返回前几个元素。 takeSample(withReplacement,num, [seed]) 返回数据集的随机采样子集,最多包含 num 个元素,withReplacement 表示是否使用回置采样,最后一个参数为可选参数seed,随机数生成器的种子。 takeOrdered(n, [ordering]) 按元素排序(可以通过 ordering 自定义排序规则)后,返回前 n 个元素。 saveAsTextFile(path) 将数据集中元素保存到指定目录下的文本文件中(或者多个文本文件),支持本地文件系统、HDFS 或者其他任何Hadoop支持的文件系统。保存过程中,Spark会调用每个元素的toString方法,并将结果保存成文件中的一行。 saveAsSequenceFile(path) 将数据集中元素保存到指定目录下的Hadoop Sequence文件中,支持本地文件系统、HDFS 或者其他任何Hadoop支持的文件系统。适用于实现了Writable接口的键值对RDD。在Scala中,同样也适用于能够被隐式转换为Writable的类型(Spark实现了所有基本类型的隐式转换,如:Int,Double,String 等)。 saveAsObjectFile(path) 将RDD元素以Java序列化的格式保存成文件,保存结果文件可以使用 SparkContext.objectFile 来读取。