spark算子cogroup讲解

    xiaoxiao2021-03-25  163

    1.cogroup是什么

    cogroup定义如下:

    def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] Permalink For each key k in this or other1 or other2, return a resulting RDD that contains a tuple with the list of values for that key in this, other1 and other2.    对于每一个k,在other1或者other2里边都可以,返回一个结果RDD,包含了一个元组,元组里面的每一个key,对应每一个other1,other2。

    2.产生两个RDD

    val rdd1 = sc.parallelize(Array(("aa",1),("bb",2),("cc",6))) val rdd2 = sc.parallelize(Array(("aa",3),("dd",4),("aa",5)))

    3.进行cogroup操作

    val rdd3 = rdd1.cogroup(rdd2).collect()

    4.遍历输出集合

    for (i <- 0 to rdd3.length-1){ println(rdd3(i)) }

    5.完整代码及结果

    object joinDemo { def main(args: Array[String]) { val sparkConf = new SparkConf().setMaster("local").setAppName("cogroup Demo") val sc = new SparkContext(sparkConf) val rdd1 = sc.parallelize(Array(("aa",1),("bb",2),("cc",6))) val rdd2 = sc.parallelize(Array(("aa",3),("dd",4),("aa",5))) val rdd3 = rdd1.cogroup(rdd2).collect() for (i <- 0 to rdd3.length-1){ println(rdd3(i)) } } }结果为: (aa,(CompactBuffer(1),CompactBuffer(3, 5))) (dd,(CompactBuffer(),CompactBuffer(4))) (bb,(CompactBuffer(2),CompactBuffer())) (cc,(CompactBuffer(6),CompactBuffer()))

    转载请注明原文地址: https://ju.6miu.com/read-9303.html

    最新回复(0)