spark常用函数:transformation和action Example

    xiaoxiao2022-06-24  41

    1、RDD提供了两种类型的操作:transformation和action

    所有的transformation都是采用的懒策略,如果只是将transformation提交是不会执行计算的,计算只有在action被提交的时候才被触发。

    1)transformation操作:得到一个新的RDD,比如从数据源生成一个新的RDD,从RDD生成一个新的RDD

    map(func):对调用map的RDD数据集中的每个element都使用func,然后返回一个新的RDD,这个返回的数据集是分布式的数据集

    mapValues顾名思义就是输入函数应用于RDDKev-ValueValue,原RDD中的Key保持不变,与新的Value一起组成新的RDD中的元素。因此,该函数只适用于元素为KV对的RDD。

    mapWith是map的另外一个变种,map只需要一个输入函数,而mapWith有两个输入函数。第一个函数是把RDD的partition index(index0开始)作为输入,输出为新类型A;第二个函数是把二元组(T, A)作为输入(其中T为原RDD中的元素,A为第一个函数的输出),输出类型为U。

    mapPartitions(func):和map很像,但是map是每个element,而mapPartitions是每个partition

    mapPartitionsWithSplit(func):和mapPartitions很像,但是func作用的是其中一个split上,所以func中应该有index

    mapPartitionsWithIndex(func)函数:mapPartitionsWithIndex的func接受两个参数,第一个参数是分区的索引,第二个是一个数据集分区的迭代器。而输出的是一个包含经过该函数转换的迭代器。下面测试中,将分区索引和分区数据一起输出。

    sample(withReplacement,faction,seed):抽样,withReplacement为true表示有放回;faction表示采样的比例;seed为随机种子

    takeSample() 函数和上面的sample 函数是一个原理,但是不使用相对比例采样,而是按设定的采样个数进行采样,同时返回结果不再是RDD,而是相当于对采样后的数据进行Collect(),返回结果的集合为单机的数组。

    filter(func) : 对调用filter的RDD数据集中的每个元素都使用func,然后返回一个包含使func为true的元素构成的RDD

    flatMap(func):和map差不多,但是flatMap生成的是多个结果

    flatMapValues类似于mapValues,不同的在于flatMapValues应用于元素为KV对的RDD中Value。每个一元素的Value被输入函数映射为一系列的值,然后这些值再与原RDD中的Key组成一系列新的KV对。

    flatMapWith与mapWith很类似,都是接收两个函数,一个函数把partitionIndex作为输入,输出是一个新类型A;另外一个函数是以二元组(T,A)作为输入,输出为一个序列,这些序列里面的元素组成了新的RDD。

    union(otherDataset):返回一个新的dataset,包含源dataset和给定dataset的元素的集合

    distinct([numTasks]):返回一个包含源数据集中所有不重复元素的新数据集

    groupByKey(numTasks):返回(K,Seq[V]),也就是hadoop中reduce函数接受的key-valuelist

    reduceByKey(func,[numTasks]):就是用一个给定的reducefunc再作用在groupByKey产生的(K,Seq[V]),比如求和,求平均数

    sortBy (dataSet, boolean)函数:排序。第二个参数默认为true,即升序排序。

    sortByKey([ascending],[numTasks]):按照key来进行排序,是升序还是降序,ascending是boolean类型

    join(otherDataset,[numTasks]):当有两个KV的dataset(K,V)和(K,W),返回的是(K,(V,W))的dataset,numTasks为并发的任务数。本质是通过cogroup算子先进行协同划分,再通过flatMapValues将合并的数据打散。

    cogroup(otherDataset,[numTasks]):当有两个KV的dataset(K,V)和(K,W),返回的是(K,Seq[V],Seq[W])的dataset,numTasks为并发的任务数

    cartesian(otherDataset):笛卡尔积就是m*n,大家懂的

    coalesce(numPartitions, true)函数:对RDD中的分区重新进行合并。返回一个新的RDD,且该RDD的分区个数等于numPartitions个数。如果shuffle设置为true,则会进行shuffle。

    repartition(numPartitions)随机重新shuffle RDD中的数据,并创建numPartitions个分区。此操作总会通过网络来shuffle全部数据

    pipe(command, [envVars])通过POSIX 管道来将每个RDD分区的数据传入一个shell命令(例如Perl或bash脚本)。RDD元素会写入到进程的标准输入,其标准输出会作为RDD字符串返回。

     

    2)action操作:action是得到一个值,或者一个结果(直接将RDD cache到内存中)

    reduce(func):说白了就是聚集,但是传入的函数是两个参数输入返回一个值,这个函数必须是满足交换律和结合律的

    collect():一般filter或者足够小的结果的时候,再用collect封装返回一个数组

    count():返回的是dataset中的element的个数

    first():返回的是dataset中的第一个元素      类似于take(1)

    take(n):返回前n个elements,这个士driver program返回的

    takeSample(withReplacement,num,seed):抽样返回一个dataset中的num个元素,随机种子seed

    takeOrdered(n, [ordering])返回一个由数据集的前n个元素组成的有序数组,使用自然序或自定义的比较器。

    saveAsTextFile(path):把dataset写到一个text file中,或者hdfs,或者hdfs支持的文件系统中。对于每个元素,Spark将会调用toString方法,spark把每条记录都转换为一行记录,然后写到file中。

    saveAsSequenceFile(path)将数据集的元素,以Hadoopsequencefile的格式,保存到指定的目录下,本地系统,HDFS或者任何其它hadoop支持的文件系统。这个只限于由key-value对组成,并实现了Hadoop的Writable接口,或者隐式的可以转换为Writable的RDD。(Spark包括了基本类型的转换,例如Int,Double,String,等等)

    saveAsObjectFile(path)将数据集元素写入Java序列化的可以被SparkContext.objectFile()加载的简单格式中

    countByKey():返回的是key对应的个数的一个map,作用于一个RDD

    foreach(func):对dataset中的每个元素都使用func

     

    3)其它 函数操作:

    lookup操作:通过key找value值。Lookup 函数对(Key, Value) 型的RDD 操作,返回指定Key 对应的元素形成的Seq。

    contains(str)函数:包含str字符串

    take 和 takeAsTextFile操作

    fold,foldLeft, and foldRight之间的区别

    trim函数:把字符两端的空格截掉

    top 返回最大的 k 个元素。

    take 返回最小的 k 个元素。

    takeOrdered 返回最小的 k 个元素,并且在返回的数组中保持元素的顺序。

    first 相当于top(1) 返回整个RDD中的前k 个元素,可以定义排序的方式 Ordering[T]。返回的是一个含前k 个元素的数组。

    fold 和reduce 的原理相同,但是与reduce 不同,相当于每个reduce 时,迭代器取的第一个元素是zeroValue。

    aggregate 先对每个分区的所有元素进行aggregate 操作,再对分区的结果进行fold 操作。aggreagate 与fold 和reduce 的不同之处在于,aggregate相当于采用归并的方式进行数据聚集,这种聚集是并行化的。而在fold 和reduce 函数的运算过程中,每个分区中需要进行串行处理,每个分区串行计算完结果,结果再按之前的方式进行聚集,并返回最终聚集结果。

     

    转载请注明原文地址: https://ju.6miu.com/read-1123717.html

    最新回复(0)