Spark常用的Transformation算子的简单例子

    xiaoxiao2024-12-29  14

    Spark的常用Transformation算子有map、filter、flatMap、reduceByKey、groupByKey、join、leftOuterJoin、rightOuterJoin、cogroup等算子。在这里进行的简历例子的使用,以后随着学习的深入需要第每个算子深入分析。

    package com.spark.App import org.apache.spark.{SparkContext, SparkConf} /** * Created by Administrator on 2016/8/13 0013. */ object Transformations { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Transformations").setMaster("local") val sc = new SparkContext(conf) // mapTransformation(sc) // filterTransformation(sc) // flatMapTransformation(sc) // reduceByKeyTransformation(sc) // groupByKeyTransformation(sc) // joinTransformation(sc) // leftOuterJoinTransformation(sc) // rightOuterJoinTransformation(sc) cogroupTransformation(sc) sc.stop() } /** * 将函数应用于RDD的每个元素,将返回值构成新的RDD * @param sc */ def mapTransformation(sc: SparkContext): Unit = { val numbers = sc.parallelize(1 to 10) // 根据集合构建RDD val mapped = numbers.map(item => 2 * item) // 每个元素扩大2倍 mapped.collect.foreach(println) } /** * 返回一个由通过filter()的函数的元素组成的RDD,结果为true的元素会返回 * @param sc */ def filterTransformation(sc: SparkContext): Unit = { val numbers = sc.parallelize(1 to 10) val filtered = numbers.filter(item => item % 2 == 0) filtered.collect.foreach(println) } /** * 将函数应用于RDD中的每个元素,将返回的迭代器的所有内容构成新的RDD。通常用来切分单词 * @param sc */ def flatMapTransformation(sc: SparkContext): Unit = { val lines = Array("What makes life dreary is the want of motive", "Hello Spark", "Hello World") val linesRDD = sc.parallelize(lines) val words = linesRDD.flatMap(line => line.split(" ")) words.collect.foreach(println) } /** * reduceByKey必须用于元素是(key value) 的元素,把key相同的元素进行merge操作 * 它能够在本地先进行merge操作,并且merge操作可以通过函数自定义 * @param sc */ def reduceByKeyTransformation(sc: SparkContext): Unit = { val lines = Array("What makes life dreary is the want of motive", "Hello Spark", "Hello World") val linesRDD = sc.parallelize(lines) val words = linesRDD.flatMap(_.split(" ")).map(word => (word, 1)) val wordsCount = words.reduceByKey(_ + _) wordsCount.collect.foreach(println) } /** * groupByKey也是对每个key进行合并操作,但只生成一个sequence. * groupByKey本身不能自定义操作函数. * @param sc */ def groupByKeyTransformation(sc: SparkContext): Unit = { val data = Array(Tuple2("David", "Math"), Tuple2("David", "Music"), Tuple2("Mary", "Math"), Tuple2("Mary", "Art"), Tuple2("Allin", "Computer")) val dataRDD = sc.parallelize(data) val grouped = dataRDD.groupByKey() grouped.collect.foreach(println) } /** * 对两个RDD进行内链接。 * 只有两个RDD中都有的key才会输出 * @param sc */ def joinTransformation(sc: SparkContext): Unit = { val idAndName = Array(Tuple2(1, "David"), Tuple2(2, "Mary"), Tuple2(3, "Allin")) val idAndScore = Array(Tuple2(1, 98), Tuple2(2, 90), Tuple2(3, 86)) val names = sc.parallelize(idAndName); val scores = sc.parallelize(idAndScore) val nameAndScore = names.join(scores) nameAndScore.collect.foreach(println) } /** * 对两个RDD进行连接操作,确保第一个RDD的键必须存在 * 连接后,缺省的val为None,这里的结果为: * (1,(David,Some(98))) * (3,(Allin,None)) * (2,(Mary,Some(90))) * @param sc */ def leftOuterJoinTransformation(sc: SparkContext): Unit = { val idAndName = Array(Tuple2(1, "David"), Tuple2(2, "Mary"), Tuple2(3, "Allin")) val idAndScore = Array(Tuple2(1, 98), Tuple2(2, 90)) val names = sc.parallelize(idAndName); val scores = sc.parallelize(idAndScore) val nameAndScore = names.leftOuterJoin(scores) nameAndScore.collect.foreach(println) } /** * 对两个RDD进行连接操作,确保第二个RDD的键必须存在,与leftOuterJoin正好相反 * 输出结果: * (1,(Some(David),98)) * (3,(None,86)) * (2,(Some(Mary),90)) * @param sc */ def rightOuterJoinTransformation(sc: SparkContext): Unit = { val idAndName = Array(Tuple2(1, "David"), Tuple2(2, "Mary")) val idAndScore = Array(Tuple2(1, 98), Tuple2(2, 90), Tuple2(3, 86)) val names = sc.parallelize(idAndName); val scores = sc.parallelize(idAndScore) val nameAndScore = names.rightOuterJoin(scores) nameAndScore.collect.foreach(println) } /** * 将两个RDD中用有相同键的数据分组到一起 * 输出结果: * (1,(CompactBuffer(David, Frank),CompactBuffer(98, 93))) * (3,(CompactBuffer(Allin, Carry),CompactBuffer(86, 83))) * (2,(CompactBuffer(Mary, Duncan),CompactBuffer(90, 80))) * @param sc */ def cogroupTransformation(sc: SparkContext): Unit = { val idAndName = Array(Tuple2(1, "David"), Tuple2(1, "Frank"), Tuple2(2, "Mary"), Tuple2(2, "Duncan"), Tuple2(3, "Allin"), Tuple2(3, "Carry")) val idAndScore = Array(Tuple2(1, 98), Tuple2(1, 93), Tuple2(2, 90), Tuple2(2, 80), Tuple2(3, 86), Tuple2(3, 83)) val names = sc.parallelize(idAndName); val scores = sc.parallelize(idAndScore) val nameAndScore = names.cogroup(scores) nameAndScore.collect.foreach(println) } }
    转载请注明原文地址: https://ju.6miu.com/read-1295104.html
    最新回复(0)