Spark应用(二) 二次排序

    xiaoxiao2021-03-25  188

    二次排序就是对按照从左往右,从上往下排好序 数据: c,18,1956 a,20,1356 d,5,1956 f,18,1256 h,3,2956 c,18,2008 y,8,956 a,18,1956 并保存为mySec.txt,放入HDFS如下: 需求1 需求,排序完如下: (a,18,1956) (a,20,1356) (c,18,1956) (c,18,2008) (d,5,1956) (f,18,1256) (h,3,2956) (y,8,956) 代码如下: object UDsecondary { def mysec1(sc:SparkContext,rdd:RDD[String])={ rdd.map(_.split(",")).map(part=>(s"${part(0)}-${part(1)}",part(2))).sortByKey().map { part => val tmp: Array[String] = part._1.split("-") (tmp(0),tmp(1),part._2) } } def main(args: Array[String]) { val conf = new SparkConf().setAppName("二次排序").setMaster("local") .set("spark.storage.memoryFraction", "0.1") val sc = new SparkContext(conf) val rdd: RDD[String] = sc.textFile("hdfs://master:9000/data/studySet/Secondary/mySec.txt") mysec1(sc,rdd).foreach(println) } } 结果: 需求2 排序为如下方式: (a,18,1956) (a,20,1356) (c,18,1956 2008) (d,5,1956) (f,18,1256) (h,3,2956) (y,8,956) 程序: def mysec2(sc:SparkContext,rdd:RDD[String])={ rdd.map(_.split(",")).map(part=>(s"${part(0)}-${part(1)}",part(2))).reduceByKey((a,b)=>a+" "+b).sortByKey().map { part => val tmp: Array[String] = part._1.split("-") (tmp(0),tmp(1),part._2) } } mysec2(sc,rdd).foreach(println) 需求3--自定义key      你会发现在需求二的时候,处理速度有点慢,原因在于用了多个shuffe操作,大大的减少了性能,所以为了减少shuffle的次数,我们可以自定义Key,这也是企业开发常用的一种方式 自定义Key package Secondary /** * Created by legotime on 3/8/17. */ class MyUDkey(var first:String, var second:Int, var three:Int) extends Ordered[MyUDkey] with Serializable{ override def <(that: MyUDkey): Boolean = { if(this.first < that.first){ true }else if (first == that.first && second < that.second){ true }else if (first == that.first && second == that.second && three < that.three){ true }else{ false } } override def >(that: MyUDkey): Boolean = { if (first >that.first){ true }else if (first == that.first && second > that.second){ true }else if (first == that.first && second == that.second && three > that.three){ true }else{ false } } override def <=(that: MyUDkey): Boolean = { if (<(that)){ true }else if (first==that.first && second==that.second && three ==that.three){ true }else{ false } } override def >=(that: MyUDkey): Boolean = { if (>(that)){ true }else if (first==that.first && second==that.second && three ==that.three){ true }else{ false } } override def compare(that: MyUDkey): Int = { if (first.hashCode-that.first.hashCode !=0){ first.hashCode-that.first.hashCode }else if(second-that.second != 0){ second-that.second }else if(three-that.three != 0){ three-that.three }else{ 0 } } override def compareTo(that: MyUDkey): Int = compare(that) override def toString = s"MyUDkey($first, $second, $three)" } package Secondary import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * Created by root on 3/8/17. */ object UDsecondary { def mysec1(sc:SparkContext,rdd:RDD[String])={ rdd.map(_.split(",")).map(part=>(s"${part(0)}-${part(1)}",part(2))).sortByKey().map { part => val tmp: Array[String] = part._1.split("-") (tmp(0),tmp(1),part._2) } } def mysec2(sc:SparkContext,rdd:RDD[String])={ rdd.map(_.split(",")).map(part=>(s"${part(0)}-${part(1)}",part(2))).reduceByKey((a,b)=>a+" "+b).sortByKey().map { part => val tmp: Array[String] = part._1.split("-") (tmp(0),tmp(1),part._2) } } def mysec3(sc:SparkContext,rdd:RDD[String])={ rdd.map(_.split(",")).map(part =>(new MyUDkey(part(0),part(1).toInt,part(2).toInt),1)).sortByKey() } def main(args: Array[String]) { val conf = new SparkConf().setAppName("二次排序").setMaster("local") .set("spark.storage.memoryFraction", "0.1") val sc = new SparkContext(conf) val rdd: RDD[String] = sc.textFile("hdfs://master:9000/data/studySet/Secondary/mySec.txt") //mysec1(sc,rdd).foreach(println) //mysec2(sc,rdd).foreach(println) mysec3(sc,rdd).foreach(println) } } 结果:
    转载请注明原文地址: https://ju.6miu.com/read-1710.html

    最新回复(0)