spark SQL案例综合实战
/**
* Java 实战
*/
package com.tom.spark.SparkApps.sql;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hive.ql.parse.HiveParser.rowFormat_return;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;
public class SparkSQLwithJoin {
/**
* @param args
*/
public static void main(String[] args) {
SparkConf conf =
new SparkConf().setAppName(
"DataFrameOps").setMaster(
"local");
JavaSparkContext sc =
new JavaSparkContext(conf);
sc.setLogLevel(
"WARN");
SQLContext sqlContext =
new SQLContext(sc);
DataFrame peoplesDF = sqlContext.read().json(
"F:\\sparkData\\peoples.json");
peoplesDF.registerTempTable(
"peopleScores");
DataFrame execellentScoresDF = sqlContext.sql(
"select name,score from peopleScores where score >= 90");
/**
* 在DataFrame的基础上转化成为RDD,通过Map操作计算出分数大于90的所有人的姓名
*/
JavaRDD<String> personsRDD = execellentScoresDF.javaRDD().map(
new Function<Row, String>() {
/**
*
*/
private static final long serialVersionUID =
1L;
public String
call(Row row)
throws Exception {
return row.getAs(
"name");
}
});
List<String> execellentScoresList = personsRDD.collect();
List<String> peopleInformation =
new ArrayList<String>();
peopleInformation.add(
"{\"name\":\"Michael\", \"age\":20}");
peopleInformation.add(
"{\"name\":\"Andy\", \"age\":17}");
peopleInformation.add(
"{\"name\":\"Justin\", \"age\":19}");
JavaRDD<String> peopleInformationRDD = sc.parallelize(peopleInformation);
DataFrame peopleInformationDF = sqlContext.read().json(peopleInformationRDD);
peopleInformationDF.registerTempTable(
"peopleInformations");
StringBuilder sqlText =
new StringBuilder();
sqlText.append(
"select name, age from peopleInformations where name in (");
for(
int i =
0; i < execellentScoresList.size(); i++) {
sqlText.append(
"'" + execellentScoresList.get(i) +
"'");
if ( i != execellentScoresList.size()-
1 )
sqlText.append(
",");
}
sqlText.append(
")");
String sqlString = sqlText.toString();
System.out.println(sqlString);
DataFrame execellentNameAgeDF = sqlContext.sql(sqlString);
JavaPairRDD<String, Tuple2<Integer, Integer>> resultRDD =
execellentScoresDF.javaRDD().mapToPair(
new PairFunction<Row, String, Integer>() {
private static final long serialVersionUID =
1L;
public Tuple2<String, Integer>
call(Row row)
throws Exception {
return new Tuple2<String, Integer>(String.valueOf(row.getAs(
"name")), (
int)row.getLong(
1));
}
}).join(execellentNameAgeDF.javaRDD().mapToPair(
new PairFunction<Row, String, Integer>() {
/**
*
*/
private static final long serialVersionUID =
1L;
public Tuple2<String, Integer>
call(Row row)
throws Exception {
return new Tuple2<String, Integer>(String.valueOf(row.getAs(
"name")), (
int)row.getLong(
1));
}
}));
JavaRDD<Row> resultRowRDD = resultRDD.map(
new Function<Tuple2<String,Tuple2<Integer,Integer>>, Row>() {
/**
*
*/
private static final long serialVersionUID =
1L;
public Row
call(Tuple2<String, Tuple2<Integer, Integer>> tuple)
throws Exception {
return RowFactory.create(tuple._1, tuple._2._2, tuple._2._1);
}
});
List<Row> rows = resultRowRDD.collect();
for(
int i=
0; i<rows.size(); i++)
System.out.println(rows.get(i).get(
0).toString() +
" " + rows.get(i).get(
1) +
" " + rows.get(i).get(
2));
List<StructField> structFields =
new ArrayList<StructField>();
structFields.add(DataTypes.createStructField(
"name", DataTypes.StringType,
true));
structFields.add(DataTypes.createStructField(
"age", DataTypes.IntegerType,
true));
structFields.add(DataTypes.createStructField(
"score", DataTypes.IntegerType,
true));
StructType structType = DataTypes.createStructType(structFields);
DataFrame personsDF = sqlContext.createDataFrame(resultRowRDD, structType);
personsDF.printSchema();
personsDF.show();
personsDF.write().format(
"json").save(
"F:\\sparkData\\result");
}
}
package
com.tom.spark.sql
import org
.apache.spark.sql.{DataFrame, RowFactory, SQLContext}
import org
.apache.spark.{SparkConf, SparkContext}
object SparkSQLwithJoin {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster(
"local")
.setAppName(
"SparkSQLwithJoin")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val personScoresDF = sqlContext
.read.json(
"F:/sparkData/peoples.json")
personScoresDF
.registerTempTable(
"personScores")
val excellentStudentDF = sqlContext
.sql(
"select name, score from personScores where score >= 90")
val excellentStudents = excellentStudentDF
.rdd.map(_(
0))
val peopleInformations = Array(
"{\"name\":\"Michael\", \"age\":20}",
"{\"name\":\"Andy\", \"age\":17}",
"{\"name\":\"Justin\", \"age\":19}"
)
val peopleInformationsRDD = sc
.parallelize(peopleInformations)
val peopleInformationDF = sqlContext
.read.json(peopleInformationsRDD)
peopleInformationDF
.registerTempTable(
"peopleInformations")
val sqlText = new StringBuilder()
sqlText
.append(
"select name, age from peopleInformations where name in(")
val stu: Array[Any] = excellentStudents
.collect()
for(i <-
0 until stu
.size) {
sqlText
.append(
"'" + stu(i)
.toString +
"'")
if (i != stu
.size -
1) sqlText
.append(
",")
}
sqlText
.append(
")")
val sqlString = sqlText
.toString()
val excellentNameAgeDF = sqlContext
.sql(sqlString)
val resultRDD = excellentStudentDF
.rdd.map(row => (row
.getAs(
"name")
.toString,row
.getLong(
1)))
.join(excellentNameAgeDF
.rdd.map(row =>(row
.getAs(
"name")
.toString,row
.getLong(
1))))
val resultRowRDD = resultRDD
.map(tuple => {
val name = tuple._1
val age: java
.lang.Integer = tuple._2._2
.toInt
val score: java
.lang.Integer = tuple._2._1
.toInt
RowFactory
.create(name, age, score)
})
val personDF: DataFrame = sqlContext
.createDataFrame(resultRowRDD
.map(row => PersonAgeScore(row
.getString(
0),row
.getInt(
1),row
.getInt(
2))) )
personDF
.show
personDF
.write.json(
"F:/sparkData/resultScala")
}
}
case class PersonAgeScore(name: String, age: Int, score: Int)