package com.spark.App
import org.apache.spark.{SparkContext, SparkConf}
/**
* Created by Administrator on 2016/8/14 0014.
*/
object Actions {
def main(args: Array[String]) {
val conf =
new SparkConf().setAppName(
"Actions").setMaster(
"local")
val sc =
new SparkContext(conf)
saveAsTextFileAction(sc)
}
/**
* 并行整合RDD中所有数据
*
* @param sc
*/
def reduceAction(sc: SparkContext): Unit = {
val numbers = sc.parallelize(
1 to
100)
val sum = numbers.reduce(_ + _)
println(sum)
}
/**
* 收集集群中RDD的所有元素到一台机器
*
* @param sc
*/
def collectAction(sc: SparkContext): Unit = {
val numbers = sc.parallelize(
1 to
10)
val data = numbers.map(item => item *
2)
val result = data.collect()
result.foreach(println)
}
/**
* RDD中元素的个数
*
* @param sc
*/
def countAction(sc: SparkContext): Unit = {
val numbers = sc.parallelize(
1 to
100)
val result = numbers.count()
println(result)
}
/**
* 从RDD中返回最前面的num个元素
*
* @param sc
*/
def topAction(sc: SparkContext): Unit = {
val numbers = sc.parallelize(
1 to
100)
val result = numbers.top(
5)
result.foreach(println)
}
/**
* RDD中各个元素出现的次数
*
* @param sc
*/
def countByValue(sc: SparkContext): Unit = {
val numbers = sc.parallelize(Array(
1,
2,
3,
4,
2,
1,
3,
5,
6,
7,
6))
val result = numbers.countByValue()
print(result)
}
/**
* 将RDD保存到Hadoop,每个task都会保存成一个文件
* 这个最好在集群中使用
* 因此为了减少文件数量,可以使用repartition或者coalesce减少分区数量
* repartition和coalesce是有区别的,repartition更倾向于增大分区,因为它使用了shuffle,需要单独写一遍博客对比它们的不同
*
* 注意: 保存的路径写到目录即可,而且该目录是不存在的
* @param sc
*/
def saveAsTextFileAction(sc: SparkContext): Unit = {
val numbers = sc.parallelize(
1 to
100)
numbers.coalesce(
1,
false).saveAsTextFile(
"/home/num.txt")
}
}
转载请注明原文地址: https://ju.6miu.com/read-1297176.html