大数据IMF传奇行动绝密课程第67课:spark SQL案例综合实战

    xiaoxiao2021-03-25  93

    spark SQL案例综合实战

    /** * Java 实战 */ package com.tom.spark.SparkApps.sql; import java.util.ArrayList; import java.util.List; import org.apache.hadoop.hive.ql.parse.HiveParser.rowFormat_return; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import scala.Tuple2; public class SparkSQLwithJoin { /** * @param args */ public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("DataFrameOps").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); sc.setLogLevel("WARN"); SQLContext sqlContext = new SQLContext(sc); //针对json文件数据源来创建DataFrame DataFrame peoplesDF = sqlContext.read().json("F:\\sparkData\\peoples.json"); //基于json构建的DataFrame来注册临时表 peoplesDF.registerTempTable("peopleScores"); //查询出分数大于90的人 DataFrame execellentScoresDF = sqlContext.sql("select name,score from peopleScores where score >= 90"); /** * 在DataFrame的基础上转化成为RDD,通过Map操作计算出分数大于90的所有人的姓名 */ JavaRDD<String> personsRDD = execellentScoresDF.javaRDD().map(new Function<Row, String>() { /** * */ private static final long serialVersionUID = 1L; public String call(Row row) throws Exception { // TODO Auto-generated method stub return row.getAs("name"); } }); List<String> execellentScoresList = personsRDD.collect(); //动态组拼出json List<String> peopleInformation = new ArrayList<String>(); peopleInformation.add("{\"name\":\"Michael\", \"age\":20}"); peopleInformation.add("{\"name\":\"Andy\", \"age\":17}"); peopleInformation.add("{\"name\":\"Justin\", \"age\":19}"); //通过内容为json的RDD来构造DataFrame JavaRDD<String> peopleInformationRDD = sc.parallelize(peopleInformation); DataFrame peopleInformationDF = sqlContext.read().json(peopleInformationRDD); //注册成为临时表 peopleInformationDF.registerTempTable("peopleInformations"); StringBuilder sqlText = new StringBuilder(); sqlText.append("select name, age from peopleInformations where name in ("); for(int i = 0; i < execellentScoresList.size(); i++) { sqlText.append("'" + execellentScoresList.get(i) + "'"); if ( i != execellentScoresList.size()-1 ) sqlText.append(","); } sqlText.append(")"); String sqlString = sqlText.toString(); System.out.println(sqlString); DataFrame execellentNameAgeDF = sqlContext.sql(sqlString); JavaPairRDD<String, Tuple2<Integer, Integer>> resultRDD = execellentScoresDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() { private static final long serialVersionUID = 1L; public Tuple2<String, Integer> call(Row row) throws Exception { // TODO Auto-generated method stub return new Tuple2<String, Integer>(String.valueOf(row.getAs("name")), (int)row.getLong(1)); } }).join(execellentNameAgeDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() { /** * */ private static final long serialVersionUID = 1L; public Tuple2<String, Integer> call(Row row) throws Exception { // TODO Auto-generated method stub return new Tuple2<String, Integer>(String.valueOf(row.getAs("name")), (int)row.getLong(1)); } })); JavaRDD<Row> resultRowRDD = resultRDD.map(new Function<Tuple2<String,Tuple2<Integer,Integer>>, Row>() { /** * */ private static final long serialVersionUID = 1L; public Row call(Tuple2<String, Tuple2<Integer, Integer>> tuple) throws Exception { // TODO Auto-generated method stub return RowFactory.create(tuple._1, tuple._2._2, tuple._2._1); } }); List<Row> rows = resultRowRDD.collect(); for(int i=0; i<rows.size(); i++) System.out.println(rows.get(i).get(0).toString() + " " + rows.get(i).get(1) + " " + rows.get(i).get(2)); List<StructField> structFields = new ArrayList<StructField>(); structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true)); structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true)); structFields.add(DataTypes.createStructField("score", DataTypes.IntegerType, true)); //构建StructType,用于最后DataFrame元数据的描述 StructType structType = DataTypes.createStructType(structFields); DataFrame personsDF = sqlContext.createDataFrame(resultRowRDD, structType); personsDF.printSchema(); personsDF.show(); personsDF.write().format("json").save("F:\\sparkData\\result"); } } /** * Scala 实战 */ package com.tom.spark.sql import org.apache.spark.sql.{DataFrame, RowFactory, SQLContext} import org.apache.spark.{SparkConf, SparkContext} object SparkSQLwithJoin { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("SparkSQLwithJoin") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val personScoresDF = sqlContext.read.json("F:/sparkData/peoples.json") personScoresDF.registerTempTable("personScores") val excellentStudentDF = sqlContext.sql("select name, score from personScores where score >= 90") val excellentStudents = excellentStudentDF.rdd.map(_(0)) val peopleInformations = Array( "{\"name\":\"Michael\", \"age\":20}", "{\"name\":\"Andy\", \"age\":17}", "{\"name\":\"Justin\", \"age\":19}" ) val peopleInformationsRDD = sc.parallelize(peopleInformations) val peopleInformationDF = sqlContext.read.json(peopleInformationsRDD) peopleInformationDF.registerTempTable("peopleInformations") val sqlText = new StringBuilder() sqlText.append("select name, age from peopleInformations where name in(") val stu: Array[Any] = excellentStudents.collect() for(i <- 0 until stu.size) { sqlText.append( "'" + stu(i).toString + "'") if (i != stu.size - 1) sqlText.append(",") } sqlText.append(")") val sqlString = sqlText.toString() val excellentNameAgeDF = sqlContext.sql(sqlString) val resultRDD = excellentStudentDF.rdd.map(row => (row.getAs("name").toString,row.getLong(1))).join(excellentNameAgeDF.rdd.map(row =>(row.getAs("name").toString,row.getLong(1)))) val resultRowRDD = resultRDD.map(tuple => { val name = tuple._1 val age: java.lang.Integer = tuple._2._2.toInt val score: java.lang.Integer = tuple._2._1.toInt RowFactory.create(name, age, score) }) val personDF: DataFrame = sqlContext.createDataFrame(resultRowRDD.map(row => PersonAgeScore(row.getString(0),row.getInt(1),row.getInt(2))) ) personDF.show personDF.write.json("F:/sparkData/resultScala") } } case class PersonAgeScore(name: String, age: Int, score: Int)
    转载请注明原文地址: https://ju.6miu.com/read-16886.html

    最新回复(0)