K-means 1.K-means (scala)
// Load and parse the data. val data = sc.textFile("kmeans_data.txt") val parsedData = data.map(_.split(‘ ').map(_.toDouble)).cache() // Cluster the data into five classes using KMeans. val clusters = KMeans.train(parsedData, 5, numIterations = 20) ! // Compute the sum of squared errors. val cost = clusters.computeCost(parsedData) println("Sum of squared errors = " + cost)2.K-means (python)
# Load and parse the data data = sc.textFile("kmeans_data.txt") parsedData = data.map(lambda line: array([float(x) for x in line.split(' ‘)])).cache() # Build the model (cluster the data) clusters = KMeans.train(parsedData, 5, maxIterations = 20,runs = 1,initialization_mode = "kmeans||") # Evaluate clustering by computing the sum of squared errors def error(point): center = clusters.centers[clusters.predict(point)] return sqrt(sum([x**2 for x in (point - center)])) cost = parsedData.map(lambda point: error(point)).reduce(lambda x, y: x + y) print("Sum of squared error = " + str(cost))降维+K-means
// compute principal components val points: RDD[Vector] = ... val mat = RowMatrix(points) val pc = mat.computePrincipalComponents(20) // project points to a low-dimensional space val projected = mat.multiply(pc).rows // train a k-means model on the projected data val model = KMeans.train(projected, 10)Streaming + MLlib
// collect tweets using streaming // train a k-means model val model: KMmeansModel = ... // apply model to filter tweets val tweets = TwitterUtils.createStream(ssc, Some(authorizations(0))) val statuses = tweets.map(_.getText) val filteredTweets = statuses.filter(t => model.predict(featurize(t)) == clusterNumber) // print tweets within this particular cluster filteredTweets.print()协同过滤 目标:从其条目的子集中恢复矩阵。(再理解。)
Collaborative filtering // Load and parse the data val data = sc.textFile("mllib/data/als/test.data") val ratings = data.map(_.split(',') match { case Array(user, item, rate) => Rating(user.toInt, item.toInt, rate.toDouble) }) // Build the recommendation model using ALS val numIterations = 20 val rank = 10 val regularizer = 0.01 val model = ALS.train(ratings, rank, numIterations, regularizer) // Evaluate the model on rating data val usersProducts = ratings.map { case Rating(user, product, rate) => (user, product) } val predictions = model.predict(usersProducts)What:扩展Spark用于进行大数据流处理
why
许多大数据应用程序需要实时处理大数据流:网站监控、欺诈检测、广告获利
Advantage 缩放到数百个节点 实现低延迟 从故障中有效恢复 集成批处理和交互式处理
现有的流系统 Storm
如果节点未处理,则重播记录 处理每个记录至少一次 可以更新可变状态两次! 可变状态可能会由于失败而丢失!Trident
使用事务更新状态 每个记录只处理一次 对外部数据库的每状态事务很慢Spark Streaming 将流计算作为一系列非常小的确定性批处理作业运行
将实况流切成X秒的批次 Spark将每个批处理的数据作为RDD,使用RDD操作进行处理最后,RDD操作的处理结果分批返回编程模型 - DStream 离散流(DStream) - 表示数据流 - 实现为一系列RDD
示例 - 从Twitter获取标签
val ss = new StreamingContext(sparkContext,Seconds(1)) val tweets = TwitterUtils.createStream(ssc,auth)
Input DStream:tweetsval tweets = TwitterUtils.createStream(ssc,None) val hashTags =tweets.flatMap(status=>getTags(status))
transformed DStream:hashTags transformation:flatMaphashTags.saveAsHadoopFiles(“hdfs://…”)
output operation:saveAsHadoopFileshashTags.foreachRDD(hashTagRDD=>{…})
1.指定函数以根据先前状态和新数据生成新状态 示例:将每个用户的心情保持为状态,并使用他们的tweets进行更新 def updateMood(newTweets,lastMood)=>newMood val moods=tweetsByUser.updateStateByKey(updateMood_)
2.混合RDD和DStream操作 - 示例:使用垃圾邮件HDFS文件加入传入的tweets,以过滤掉不正确的tweets tweets.transform(tweetsRDD=>tweetsRDD.join(spamFile).filter(…)})
3.混合RDD和DStream
将实时数据流与历史数据组合 使用Spark等生成历史数据模型使用数据模型处理实时数据流将流与MLlib,GraphX algos组合 离线学习,在线预测在线学习和预测使用SQL查询流数据 select * from table_from_streaming_data4.统一栈的优点
以交互方式探索数据以识别问题
在Spark中使用相同的代码来处理大型日志
在Spark Streaming中使用类似的代码进行实时处理
5.容错
输入数据批次会复制到内存中以实现容错由于工作程序失败导致的数据丢失可以从复制的输入数据重新计算所有的变换都是容错的,一次性的变换6.输入源 Kafka,Flume,Akka Actors,原始TCP套接字,HDFS等。