spark 2.0.2
最简单的形式,默认的数据源格式是parquet,当然默认的格式可以通过spark.sql.sources.default进行配置:
val usersDF = spark.read.load("examples/src/main/resources/users.parquet") usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")你可以使用额外的选项手动指定数据源,数据源是通过完全限定名指定的(例如org.apache.spark.sql.parquet),但是已经在源码中编译过的,可以使用简称(如json, parquet, jdbc, orc, libsvm, csv, text)。使用这种语法,DataFrames 载入的任何数据类型都可以转换成其他类型。
val peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json") peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")不使用read API,直接将文件载入DataFrame中,并查询:
val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")保存操作使用SaveMode进行可选设置,来指定对已经存在数据如何处理。
Parquet 是柱状的格式,应用于于许多数据处理系统中。Spark SQL 支持对Parquet 文件的读和写,来自动保存原始数据的schema。
// Encoders for most common types are automatically provided by importing spark.implicits._ import spark.implicits._ val peopleDF = spark.read.json("examples/src/main/resources/people.json") // DataFrames can be saved as Parquet files, maintaining the schema information peopleDF.write.parquet("people.parquet") // Read in the parquet file created above // Parquet files are self-describing so the schema is preserved // The result of loading a Parquet file is also a DataFrame val parquetFileDF = spark.read.parquet("people.parquet") // Parquet files can also be used to create a temporary view and then used in SQL statements parquetFileDF.createOrReplaceTempView("parquetFile") val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19") namesDF.map(attributes => "Name: " + attributes(0)).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+