dataimput
import org
.apache.spark.{SparkConf, SparkContext}
import org
.apache.spark.mllib.linalg.Vectors
import org
.apache.spark.mllib.regression.LabeledPoint
import org
.apache.spark.mllib.util.MLUtils
object data {
def main(args: Array[String]) {
val conf = new SparkConf()
.setMaster(
"local")
.setAppName(
"get_data")
val sc = new SparkContext(conf)
// $example on$
// Load
and parse the date
// 数据格式“标签,特征值
1 特征值
2 特征值
3 。。。。。。。”
val example_data1= sc
.textFile(
"D:/data/mllib/sample_svm_data.txt")
.map {
line=>
val parts = line
.split(
",")
LabeledPoint(parts(
0)
.toDouble,Vectors
.dense(parts(
1)
.split(
' ')
.map( _
.toDouble)))
}
.cache()
//数据格式“标签, 特征ID:特征值 特征ID:特征值 。。。。。。。。”
val example_data2=MLUtils
.loadLibSVMFile(sc,
"D:/data/mllib/sample_libsvm_data.txt")
.cache()
example_data2
.foreach(println)
}
}
svmtrain
import org
.apache.spark.ml.feature._
import org
.apache.spark.{ml, mllib}
import org
.apache.spark.mllib.classification.{SVMModel, SVMWithSGD}
import org
.apache.spark.mllib.regression.LabeledPoint
import org
.apache.spark.rdd.RDD
import org
.apache.spark.sql._
object svm_test2 {
private val spark =SparkSession
.builder()
.config(
"spark.sql.warehouse.dir",
"file:///D:/spark-warehouse")
.master(
"local")
.appName(
"TFIDFExample ")
.getOrCreate()
private val sc = spark
.sparkContext
import spark
.implicits._
case class RawDataRecord(lable: String, message: Array[String])
def main(args: Array[String]): Unit = {
//将原始数据映射到DataFrame中,字段category为分类编号,字段text为分好的词,以空格分隔
val train_data = sc
.textFile(
"file:///D:/data/mllib/s.txt")
.map {
x =>
val data =
x.split(
",")
RawDataRecord(data(
0), data(
1)
.split(
" "))
}
val msgDF = spark
.createDataFrame(train_data)
.toDF()
msgDF
.show()
// val result=get_word2Vec(msgDF)
val result=get_tfidf(msgDF)
result
.show()
val Array(train, test) = result
.randomSplit(Array(
0.8,
0.2), seed =
12L)
val traindate=get_dataFrame_to_rdd(train)
val model3 = SVMWithSGD
.train(traindate,
150)
val testDataRdd=get_dataFrame_to_rdd(test)
val pres = testDataRdd
.map { point =>
val score = model3
.predict(point
.features)
(score,point
.label)
}
val p = pres
.collect()
println(
"prediction" +
"\t" +
"label")
for (i <-
0 to p
.length -
1) {
println(p(i)._1 +
"\t" + p(i)._2)
}
val accury =
1.0 * pres
.filter(
x =>
x._1 ==
x._2)
.count() / test
.count()
println(
"正确率:" + accury)
}
def get_word2Vec(dataDF: DataFrame):DataFrame={
val word2Vec = new Word2Vec()
.setInputCol(
"message")
.setOutputCol(
"feature")
val model = word2Vec
.fit(dataDF)
val result = model
.transform(dataDF)
result
}
def get_tfidf(dataDF: DataFrame):DataFrame={
val labelIndexer = new StringIndexer()
.setInputCol(
"lable")
.setOutputCol(
"indexedLabel")
.fit(dataDF)
val da= labelIndexer
.transform(dataDF)
val hashingTF = new HashingTF()
.setInputCol(
"message")
.setOutputCol(
"rawFeatures")
.setNumFeatures(
200)
val featurizedData = hashingTF
.transform(da)
val idf = new IDF()
.setInputCol(
"rawFeatures")
.setOutputCol(
"feature")
val idfModel = idf
.fit(featurizedData)
val tfidf_rescaledData = idfModel
.transform(featurizedData)
//特征过滤
// val selector = new ChiSqSelector()
//
.setNumTopFeatures(
20)
//
.setLabelCol(
"indexedLabel")
//
.setFeaturesCol(
"feature")
//
.setOutputCol(
"features")
// val transformer = selector
.fit(tfidf_rescaledData)
// val selector_feature=transformer
.transform(tfidf_rescaledData)
// selector_feature
.show()
tfidf_rescaledData
}
def get_dataFrame_to_rdd(dataDF: DataFrame):RDD[LabeledPoint]={
val DataRdd = dataDF
.select($
"lable", $
"feature")
.map {
case Row(label: String, features: ml
.linalg.Vector) =>
ml
.feature.LabeledPoint(label
.toDouble, ml
.linalg.Vectors.dense(features
.toArray))
}
.rdd
val MllibDataRdd = DataRdd
.map { line => val lable = line
.label
val fea = line
.features.toArray
mllib
.regression.LabeledPoint(lable, mllib
.linalg.Vectors.dense(fea))
}
MllibDataRdd
}
}
转载请注明原文地址: https://ju.6miu.com/read-678647.html