首先需要使用Navicat在数据库建库santi,在santi内建表santi_001 表的格式需要与santi_001(lal: String,count:Int,time:String)相同。 代码如下。 将RDD转化成DataFrame写入数据库
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.sql.Row
object ToMysql{ def main(args: Array[String]) { //创建环境变量 val sparkConf = new SparkConf().setAppName(“PeakTimePeople”) //创建环境变量实例 val sc = new SparkContext(sparkConf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) val schema = StructType( List( StructField(“lac”, StringType, true), StructField(“count”, IntegerType, true), StructField(“time”, StringType, true) ) )
import java.util.Properties val prop = new Properties() prop.setProperty(“driver”,”com.mysql.jdbc.Driver”) prop.put(“user”,”root”) prop.put(“password”,”123456”) val a=List((“12@21”,12,”20160201”),(“13@12”,31,”20160831”),(“13@12”,31,”20160826”)) val rowRDD= sc.makeRDD(a).map(x=>Row(x._1,x._2,x._3)) println(rowRDD.collect.mkString(“;”)) //打印出来查看数据
val data=sqlContext.createDataFrame(rowRDD,schema) val url=”jdbc:mysql://192.168.137.131:3306/santi” data.write.mode(“overwrite”).jdbc(url,”santi_001”,prop) sc.stop() } }