package sparkExample
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by xiaoxu
*/
object SparkRDDOperation {
def main(args: Array[
String]) {
// 初始化
System.
setProperty(
"hadoop.home.dir",
"E:\\winutils-hadoop-2.6.4\\hadoop-2.6.4")
val conf =
new SparkConf().setMaster(
"local[2]").setAppName(
this.getClass.getName)
val sc =
new SparkContext(conf)
// 设置Log级别
Logger.
getRootLogger.setLevel(Level.
INFO)
// 读取文件夹下的所有的文件
sc.textFile(
"hdfs://hadoop1:9000/sparkExample")
// 读取指定的文件
sc.textFile(
"hdfs://hadoop1:9000/sparkExample/sparkExample.txt")
// 读取系统中的文件
sc.textFile(
"file:/opt/sparkExample/sparkExample.txt")
// 直接读取系统中的文件
sc.textFile(
"/opt/sparkExample/sparkExample.txt")
// 直接读取系统中的文件夹下的所有的文件
sc.textFile(
"/opt/sparkExample")
// 直接读取系统中的文件夹中的匹配的文件
sc.textFile(
"/opt/sparkExample/*.txt")
// 构造数据
val parallelize: RDD[Int] = sc.parallelize(
1 to
9,
3)
// 对每一个数据进行*2
val map: RDD[Int] = parallelize.map(x => x *
2)
map.count()
// 对数据进行过滤
val filter: RDD[Int] = parallelize.filter(x => x >
3)
filter.count()
// 对parallelize中的数据依次往后推10个
val map1: RDD[Int] = parallelize.flatMap(x => x to
10)
map1.collect()
//对数据union操作后再去重
val union: RDD[Int] = map.union(map1).distinct()
union.collect()
// 对两个RDD求交集,并去重
val intersection: RDD[Int] = map.intersection(map1).distinct()
intersection.collect()
// 构造数据
val parallelize2: RDD[(Int, Int)] = sc.parallelize(
Array((
1,
1), (
1,
2), (
2,
3), (
3,
4)),
3)
// 相同key一样的进行对value进行累加
val key: RDD[(Int, Int)] = parallelize2.reduceByKey((x, y) => x + y)
// 按照key一样的把数据分组
val key1: RDD[(Int,
Iterable[Int])] = parallelize2.groupByKey()
key1.collect()
// 构造数据
val parallelize3: RDD[Int] = sc.parallelize(
List(
1,
2,
3,
4,
5,
6),
2)
// 结果是9,原因是分了两个区,在每一个区中拿最大的那个进行相加
val aggregate: Int = parallelize3.aggregate(
0)(Math.
max(_, _), _ + _)
aggregate.toString
// 按照key进行排序
val key2: RDD[(Int, Int)] = parallelize2.sortByKey()
key2.collect()
// 对数据进行笛卡尔积操作
val parallelize4: RDD[(Int, Int)] = sc.parallelize(
Array((
1,
1), (
1,
2), (
2,
3), (
3,
4)),
2)
// 按照key对数据进行JOIN操作
parallelize2.join(parallelize4).collect().toBuffer
// 按照KEY进行统计,并把每个RDD的元素进行查看
parallelize2.cogroup(parallelize4).collect().toBuffer
//按照KEY进行统计,并把每个RDD按照元组的形式进行展示
parallelize2.cartesian(parallelize4).collect().toBuffer
// 构造数据
val parallelize5: RDD[Int] = sc.parallelize(
1 to
10,
2)
// 获取数据的前一个RDD
parallelize5.pipe(
"head -n 1").collect().toBuffer
// 按照数据进行3:7分,并查看数据与个数
val split: Array[RDD[Int]] = parallelize5.randomSplit(
Array(
0.3,
0.7),
1)
split(
0).collect().toBuffer
split(
1).count()
// 构造数据
val parallelize6: RDD[Int] = sc.parallelize(
1 to
9,
3)
val parallelize7: RDD[Int] = sc.parallelize(
1 to
3,
3)
// subtract获取两个数据差集
parallelize6.subtract(parallelize7).collect().toBuffer
// 构造数据
val parallelize8: RDD[Int] = sc.parallelize(
Array(
1,
2,
3,
4,
5))
val parallelize9: RDD[Int] = sc.parallelize(
Array(
"A",
"B",
"C",
"D",
"E"))
// 对两个RDD进行合并,前提是分区的数量必须一致
parallelize8.zip(parallelize8).collect().toBuffer
// 获取前3个元素
parallelize8.take(
3).toBuffer
// 获取数据的前一个
parallelize8.first()
// 任意获取四个元素
parallelize8.takeSample(
false,
5)
// 按照顺序获取四个元素
parallelize8.takeOrdered(
4)
// 终止程序
sc.stop()
}
}
转载请注明原文地址: https://ju.6miu.com/read-35438.html