二次排序就是对按照从左往右,从上往下排好序
数据:
c,18,1956
a,20,1356
d,5,1956
f,18,1256
h,3,2956
c,18,2008
y,8,956
a,18,1956
并保存为mySec.txt,放入HDFS如下:
需求1
需求,排序完如下:
(a,18,1956)
(a,20,1356)
(c,18,1956)
(c,18,2008)
(d,5,1956)
(f,18,1256)
(h,3,2956)
(y,8,956)
代码如下:
object UDsecondary {
def mysec1(sc:SparkContext,rdd:RDD[String])={
rdd.map(_.split(",")).map(part=>(s"${part(0)}-${part(1)}",part(2))).sortByKey().map { part =>
val tmp: Array[String] = part._1.split("-")
(tmp(0),tmp(1),part._2)
}
}
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("二次排序").setMaster("local")
.set("spark.storage.memoryFraction", "0.1")
val sc = new SparkContext(conf)
val rdd: RDD[String] = sc.textFile("hdfs://master:9000/data/studySet/Secondary/mySec.txt")
mysec1(sc,rdd).foreach(println)
}
}
结果:
需求2
排序为如下方式:
(a,18,1956)
(a,20,1356)
(c,18,1956 2008)
(d,5,1956)
(f,18,1256)
(h,3,2956)
(y,8,956)
程序:
def mysec2(sc:SparkContext,rdd:RDD[String])={
rdd.map(_.split(",")).map(part=>(s"${part(0)}-${part(1)}",part(2))).reduceByKey((a,b)=>a+" "+b).sortByKey().map { part =>
val tmp: Array[String] = part._1.split("-")
(tmp(0),tmp(1),part._2)
}
}
mysec2(sc,rdd).foreach(println)
需求3--自定义key
你会发现在需求二的时候,处理速度有点慢,原因在于用了多个shuffe操作,大大的减少了性能,所以为了减少shuffle的次数,我们可以自定义Key,这也是企业开发常用的一种方式
自定义Key
package Secondary
/**
* Created by legotime on 3/8/17.
*/
class MyUDkey(var first:String, var second:Int, var three:Int) extends Ordered[MyUDkey] with Serializable{
override def <(that: MyUDkey): Boolean = {
if(this.first < that.first){
true
}else if (first == that.first && second < that.second){
true
}else if (first == that.first && second == that.second && three < that.three){
true
}else{
false
}
}
override def >(that: MyUDkey): Boolean = {
if (first >that.first){
true
}else if (first == that.first && second > that.second){
true
}else if (first == that.first && second == that.second && three > that.three){
true
}else{
false
}
}
override def <=(that: MyUDkey): Boolean = {
if (<(that)){
true
}else if (first==that.first && second==that.second && three ==that.three){
true
}else{
false
}
}
override def >=(that: MyUDkey): Boolean = {
if (>(that)){
true
}else if (first==that.first && second==that.second && three ==that.three){
true
}else{
false
}
}
override def compare(that: MyUDkey): Int = {
if (first.hashCode-that.first.hashCode !=0){
first.hashCode-that.first.hashCode
}else if(second-that.second != 0){
second-that.second
}else if(three-that.three != 0){
three-that.three
}else{
0
}
}
override def compareTo(that: MyUDkey): Int = compare(that)
override def toString = s"MyUDkey($first, $second, $three)"
}
package Secondary
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by root on 3/8/17.
*/
object UDsecondary {
def mysec1(sc:SparkContext,rdd:RDD[String])={
rdd.map(_.split(",")).map(part=>(s"${part(0)}-${part(1)}",part(2))).sortByKey().map { part =>
val tmp: Array[String] = part._1.split("-")
(tmp(0),tmp(1),part._2)
}
}
def mysec2(sc:SparkContext,rdd:RDD[String])={
rdd.map(_.split(",")).map(part=>(s"${part(0)}-${part(1)}",part(2))).reduceByKey((a,b)=>a+" "+b).sortByKey().map { part =>
val tmp: Array[String] = part._1.split("-")
(tmp(0),tmp(1),part._2)
}
}
def mysec3(sc:SparkContext,rdd:RDD[String])={
rdd.map(_.split(",")).map(part =>(new MyUDkey(part(0),part(1).toInt,part(2).toInt),1)).sortByKey()
}
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("二次排序").setMaster("local")
.set("spark.storage.memoryFraction", "0.1")
val sc = new SparkContext(conf)
val rdd: RDD[String] = sc.textFile("hdfs://master:9000/data/studySet/Secondary/mySec.txt")
//mysec1(sc,rdd).foreach(println)
//mysec2(sc,rdd).foreach(println)
mysec3(sc,rdd).foreach(println)
}
}
结果:
转载请注明原文地址: https://ju.6miu.com/read-1710.html