Spark 使用sortByKey进行二次排序

    xiaoxiao2025-07-19  7

    Spark的sortByKey API允许自定义排序规则,这样就可以进行自定义的二次排序、三次排序等等。 先来看一下sortByKey的源码实现:

    def sortByKey(): JavaPairRDD[K, V] = sortByKey(true) def sortByKey(ascending: Boolean): JavaPairRDD[K, V] = { val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]] sortByKey(comp, ascending) } def sortByKey(comp: Comparator[K], ascending: Boolean): JavaPairRDD[K, V] = { implicit val ordering = comp // Allow implicit conversion of Comparator to Ordering. fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey(ascending)) } class OrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag, P <: Product2[K, V] : ClassTag] @DeveloperApi() ( self: RDD[P]) extends Logging with Serializable

    通过代码我们可以发现要实现自定义的二次排序,则Key必须实现Spark 的Ordered特质和Java的Serializable接口。

    Java实现: 首先是Key类的自定义实现:

    import scala.math.Ordered; import java.io.Serializable; /** * Key的自定义 * Created by Administrator on 2016/8/14 0014. */ public class SecondarySortKey implements Ordered<SecondarySort>, Serializable { public int getFirst() { return first; } public int getSecond() { return second; } public void setFirst(int first) { this.first = first; } public void setSecond(int second) { this.second = second; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; SecondarySort that = (SecondarySort) o; if (first != that.first) return false; return second == that.second; } @Override public int hashCode() { int result = first; result = 31 * result + second; return result; } // 需要排序的key private int first; private int second; // 二次排序的公开构造器 public SecondarySortKey(int first, int second) { this.first = first; this.second = second; } @Override public int compare(SecondarySort other) { if (this.$greater(other)) { return 1; } else if (this.$less(other)) { return -1; } return 0; } @Override public boolean $less(SecondarySort other) { if (this.first < other.first) { return true; } else if (this.first == other.first && this.second < other.second) { return true; } return false; } @Override public boolean $greater(SecondarySort other) { if (this.first > other.first) { return true; } else if (this.first == other.first && this.second > other.first) { return true; } return false; } @Override public boolean $less$eq(SecondarySort other) { if (this.$less(other)) { return true; } else if (this.first == other.first && this.second == other.second) { return true; } return false; } @Override public boolean $greater$eq(SecondarySort other) { if (this.$greater(other)) { return true; } else if (this.first == other.first && this.second == other.second) { return true; } return false; } @Override public int compareTo(SecondarySort other) { if (this.$greater(other)) { return 1; } else if (this.$less(other)) { return -1; } return 0; } }

    二次排序:

    import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; /** * Created by Administrator on 2016/8/14 0014. */ public class SecondarySortApp { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("SecondarySortApp").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> lines = sc.textFile("/home/resources/helloSpark.txt"); JavaPairRDD<SecondarySort, String> pairs = lines.mapToPair(new PairFunction<String, SecondarySort, String>() { @Override public Tuple2<SecondarySort, String> call(String line) throws Exception { String[] splited = line.split(" "); SecondarySort key = new SecondarySort(Integer.valueOf(splited[0]), Integer.valueOf(splited[1])); return new Tuple2<SecondarySort, String>(key, line); } }); JavaPairRDD<SecondarySort, String> sorted = pairs.sortByKey(); // 完成二次排序 JavaRDD<String> result = sorted.map(new Function<Tuple2<SecondarySort,String>, String>() { @Override public String call(Tuple2<SecondarySort, String> v1) throws Exception { return v1._2; } }); for (String s : result.collect()) { System.out.println(s); } sc.stop(); } }

    Scala 版本实现 Key:

    package com.spark.App /** * Created by Administrator on 2016/8/14 0014. */ class SecondarySortKey(val first: Int, val second: Int) extends Ordered[SecondarySortKey] with Serializable { override def compare(other: SecondarySortKey): Int = { if (this.first > other.first || (this.first == other.first && this.second > other.second)) { return 1; } else if (this.first < other.first || (this.first == other.first && this.second < other.second)) { return -1; } return 0; } }

    二次排序:

    package com.spark.App import org.apache.spark.{SparkContext, SparkConf} /** * Created by Administrator on 2016/8/14 0014. */ object SecondarySortApp { def main(args: Array[String]) { val conf = new SparkConf().setAppName("SecondarySortApp").setMaster("local") val sc = new SparkContext(conf) val lines = sc.textFile("/home/resources/helloSpark.txt") val pairRDD = lines.map(line => { val splited = line.split(" ") val key = new SecondarySortKey(splited(0).toInt, splited(1).toInt) (key, line) }) val sorted = pairRDD.sortByKey(false) val result = sorted.map(item => item._2) result.collect().foreach(println) } }
    转载请注明原文地址: https://ju.6miu.com/read-1300840.html
    最新回复(0)