大数据IMF传奇行动绝密课程第71课:Spark SQL窗口函数解密与实战

    xiaoxiao2021-03-25  95

    Spark SQL窗口函数解密与实战

    1、Spark SQL窗口函数解析 2、Spark SQL窗口函数实战

    /** * Scala代码 */ package com.tom.spark.sql import org.apache.spark.sql.DataFrame import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.hive.HiveContext object SparkSQLWindowFunctionOps { def main(args: Array[String]): Unit = { val conf = new SparkConf().setMaster("local").setAppName("SparkSQLWindowFunctionOps") val sc = new SparkContext(conf) val hiveContext = new HiveContext(sc) hiveContext.sql("use hive") //使用名称为hive的数据库,接下来所有的表操作都位于这个库 /** * 如果要创建的表存在的话就删除,然后创建我们要导入数据的表 */ hiveContext.sql("DROP TABLE IF EXISTS scores") hiveContext.sql("CREATE TABLE IF NOT EXISTS scores(name STRING,score INTEGER)" + "ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' LINES TERMINATED BY '\\n'") //把要处理的数据导入到Hive的表中 hiveContext.sql("LOAD DATA LOCAL INPATH '/root/Documents/SparkApps/resources/topNGroup.txt' INTO TABLE scores") /** * 使用子查询的方式完成目标数据的提取,在目标数据内部使用窗口函数row_number来进行分组排序 * PARTITION BY:指定窗口函数分组的Key; * ORDER BY:分组后进行排序; * */ val result = hiveContext.sql("SELECT name, score FROM (" + "SELECT name, score, row_number() OVER (PARTITION BY name ORDER BY score DESC) rank FROM scores" + ") sub_scores " + "WHERE rank<=4") result.show() //把数据保存到Hive数据仓库中 hiveContext.sql("DROP TABLE EXISTS sortedResultScores") result.saveAsTable("sortedResultScores") } }
    转载请注明原文地址: https://ju.6miu.com/read-17688.html

    最新回复(0)