1、下载Spark
打开
http://spark.apache.org/downloads.html
,可以下载各种版本的spark
解压下载文件,可以看到
examples,其中可以找到很多经典的范例。
这里我们使用java版本:
spark-2.1.0-bin-hadoop2.6\examples\src\main\java\org\apache\spark\examples
2、这里我们仿照JavaWordCount.java自己来敲一遍代码,找找感觉
这里一步步的来,完整代码放最下面了
1、首先是初始化SparkContext。
在spark2.1.0版本中推荐使用SparkSession来初始化,这里为了明显的体现出SparkConf,采用new的方式。
1.1、设置SparkConf
Spark的配置属性都是以“spark.”开头的字符。在初始化时,会自动加载所有以spark.开头的系统属性。
SparkConf里能设置的属性很多,目前还没能找到一个比较全面的介绍,这个后续再说
SparkConf sparkConf = new SparkConf();sparkConf.set("spark.app.name", "JavaWordCount");//采用local模式执行[4]表示使用四个cpu核(因为本人的机器是四核的哦)sparkConf.set("spark.master", "local[4]");// 设置本地缓存目录,默认为/tmp,window下是C://tmp(本人很讨厌王C盘下胡乱添加东西的行为,所以这里色孩子为D盘)sparkConf.set("spark.local.dir", "D:\\cloud\\spark\\tmp"); //设置executor占用的内存大小,默认512m,最小450m(如果没有单位,默认是B)//也可用系统变量SPARK_EXECUTOR_MEMORY或者SPARK_MEM设置sparkConf.set("spark.executor.memory","450m");//配置TaskScheduler(任务调度)模式(有FAIR和FIFO两种,默认为FIFO)sparkConf.set("spark.scheduler.mode", "FIFO");// 默认分区数(这里设置为和能够使用的cup核数相同最好(如果不设置,会有一套默认值判断的逻辑,以后再讨论))sparkConf.set("spark.default.parallelism", "4");// 是否启动sparkUI,默认为true,如果启动了UI,可以在浏览器通过地址直接调用(本机执行就没必要耗费额外的资源了,这里关闭)// SparkUI: Bound SparkUI to 0.0.0.0, and started at http://172.21.133.102:4040sparkConf.set("spark.ui.enabled", "false");// 是否允许sprakUi被kill,默认为true//sparkConf.set("spark.ui.killEnabled", "true");// sprakUi端口(默认端口4040)//sparkConf.set("spark.ui.port", "8080");
1.2、创建SparkContext
SparkContext可以看做spark应用程序的引擎,而SparkConf则是这个引擎运行时设置参数的一个“面板”。
// 创建SparkContextJavaSparkContext ctx = new JavaSparkContext(sparkConf);
虽然这里只是new了一个对象,但实际上背后有很多的工作,这里列一下,不做分析:
1)创建Spark执行环境SparkEnv;
2)创建RDD清理器metadataCleaner;
3)创建并初始化Spark UI;(spark.ui.enabled=true时)
4)Hadoop相关配置及Executor环境变量的设置;
5)创建任务调度TaskScheduler;
6)创建和启动DAGScheduler;
7)TaskScheduler的启动;
8)初始化块管理器BlockManager(BlockManager是存储体系的主要组件之一)
9)启动测量系统MetricsSystem;
10)创建和启动Executor分配管理器ExecutorAllocationManager;
11)ContextCleaner的创建与启动;
12)Spark环境更新;
13)创建DAGSchedulerSource和BlockManagerSource;
14)将SparkContext标记为激活;
2、接下来创建RDD(Creation Operation)
先拷贝一段RDD的概念:
RDD,全称为Resilient Distributed Datasets,是一个容错的、并行的数据结构,可以让用户显式地将数据存储到磁盘和内存中,并能控制数据的分区。同时,RDD还提供了一组丰富的操作来操作这些数据。在这些操作中,诸如map、flatMap、filter等转换操作实现了monad模式,很好地契合了Scala的集合操作。除此之外,RDD还提供了诸如join、groupBy、reduceByKey等更为方便的操作(注意,reduceByKey是action,而非transformation),以支持常见的数据运算。
RDD的一些特性(如分区、款依赖、窄依赖等)这里不去涉及
在这个例子里,JavaRDD
<
String
>
lines
可以看做是一个string数组,每一个数组元素对应文件里的一个行。
String filePath = args[0];
// 创建:创建RDDJavaRDD<String> lines = ctx.textFile(filePath);
这里如果没有设置Hadoop环境变量的话,会抛出一个异常:
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
不过这里不用管,因为这个例子里我们实际上并没有用到Hadoop。
3、接下来对RDD进行一系列的转换(Transformation Operation)操作
RDD转换操作方法有map、flatMap、mapToPair等等,这里我们只介绍word count 用到的
3.1、
flatMap:
每个元素输入项都可以被映射到0个或多个的输出项,最终将结果”扁平化“后输出
这里用来将输入文件里每一行中包含的单词都拆分出来,最终形成一个word集合
// 将lines里每个元素(line)按空格拆分为一个个word,组成新的RDD// 转换-flatMap:每个元素输入项都可以被映射到0个或多个的输出项,最终将结果”扁平化“后输出JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String,String>(){ private static final long serialVersionUID = 1L; @Override public Iterable<String> call(final String line) throws Exception { return Arrays.asList(SPACES.split(line.trim())); }});
3.2、 mapToPair
:每个元素输入都被映射到一个key-value格式的输出项
这里用来给每个word赋一个基数1,以便接下来执行累加计算
// 以每个word为key,给每个word赋一个基数1作为value,组成pair,以便接下来执行累加计算// 转换:每个元素输入都被映射到一个key-value格式的输出项JavaPairRDD<String,Integer> wordOnes = words.mapToPair(new PairFunction<String,String,Integer>(){ private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); }});
3.1、3.2等同于
flatMapToPair操作
3.3、 reduceByKey
:
按Key进行分组,使用给定的func函数聚合value值。一组相同key的元素作为输入项,映射到一个key不变,value经过聚合的key-value元素
// 累加相同word的基数(即value)// 行动: reduceByKey:按Key进行分组,使用给定的func函数聚合value值。//一组相同key的元素作为输入项,映射到一个key不变value经过聚合的key-value元素JavaPairRDD<String,Integer> wordCount = wordOnes.reduceByKey(new Function2<Integer,Integer,Integer>(){ private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { // v1:第一个value的值(第一次执行时)或者前一次执行结果 // v2:下一个value的值 // 最后的结果就是累加值 return v1 + v2; }});
到此为止,我们已经定义好了统计文件中每个单词出现的频率的任务,接下来可以执行任务并输出结果了
4、接下来调用collect,提交job并获取结果(Action Operation)操作
注意:程序一直执行到这里,之前定义的那些转换操作才真正开始
List<Tuple2<String, Integer>> output = wordCount.collect();for (Tuple2<?,?> tuple:output) { System.out.println("【" + tuple._1 + "】出现了" + tuple._2 + "次");}
这里我尝试多做几个操作:
1、过滤掉出现频率低于10的和空字符
2、按出现频率由高到低排序
// 转换-map:每个元素输入仅映射到一个输出项(这里是为了方便之后以词频排序)// 转换-filter:全部元素为输入项,输出所有给定的func函数返回值为true的// 转换-sortBy:以给定函数返回值作为排序依据进行排序JavaRDD<Tuple2<String, Integer>> wordContList = wordCount.map(new Function<Tuple2<String,Integer>,Tuple2<String,Integer>>(){ private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(Tuple2<String, Integer> v1) throws Exception { return new Tuple2<String,Integer>(v1._1, v1._2); }}).filter(new Function<Tuple2<String,Integer>,Boolean>(){ private static final long serialVersionUID = 1L; @Override public Boolean call(Tuple2<String, Integer> v1) throws Exception { return v1._2 >= 10 && null != v1._1 && !v1._1.trim().isEmpty(); }}).sortBy(new Function<Tuple2<String,Integer>,Integer>(){ private static final long serialVersionUID = 1L; @Override public Integer call(Tuple2<String, Integer> v1) throws Exception { return v1._2; // 返回value作为排序依据 } }, false,1);//降序排列
接下来我们使用foreach进行遍历打印结果
(foreach也是行动操作,会触发转换,这里我们只是为了看下效果所以用来执行下打印)
wordContList.foreach(new VoidFunction<Tuple2<String, Integer>>() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2<String, Integer> e) throws Exception { System.out.println("【" + e._1 + "】出现了" + e._2 + "次"); }});
因此可以直接在本地运行。这里我直接在Eclipse里执行结果如下:
package com.spark.hello;
import java.util.Arrays;import java.util.Iterator;import java.util.regex.Pattern;
import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
public class JavaWordCountLocal { private static final Pattern SPACES = Pattern.compile("\\s+");
public static void main(String[] args) { if (args.length < 1) { System.err.println("Usage:JavaWordCount <file>"); System.exit(1); } String filePath = args[0]; //String filePath = "E:\\Workspaces\\Spark\\spark-hello\\file\\CmdLog.log"; //生成配置 /*SparkConf sparkConf = new SparkConf() .setAppName("JavaWordCount") .setMaster("local");*/ SparkConf sparkConf = new SparkConf(); sparkConf.set("spark.app.name", "JavaWordCount"); sparkConf.set("spark.master", "local[4]"); sparkConf.set("spark.local.dir", "D:\\cloud\\spark\\tmp"); // 设置本地缓存目录,默认为/tmp,window下是C://tmp sparkConf.set("spark.diskStore.subDictories", "5"); //设置executor占用的内存大小,默认512m,最小450m(如果没有单位,默认是B) //也可用系统变量SPARK_EXECUTOR_MEMORY或者SPARK_MEM设置 sparkConf.set("spark.executor.memory","450m"); //配置TaskScheduler(任务调度)模式(有FAIR和FIFO两种,默认为FIFO) sparkConf.set("spark.scheduler.mode", "FIFO"); // 默认分区数 sparkConf.set("spark.default.parallelism", "4"); // 是否启动sparkUI,默认为true,如果启动了UI,可以在浏览器通过地址直接调用 // SparkUI: Bound SparkUI to 0.0.0.0, and started at http://172.21.133.102:4040 sparkConf.set("spark.ui.enabled", "false"); // 是否允许sprakUi被kill,默认为true sparkConf.set("spark.ui.killEnabled", "true"); // 端口(默认端口4040) sparkConf.set("spark.ui.port", "8080"); // 创建SparkContext JavaSparkContext ctx = new JavaSparkContext(sparkConf); // 创建:创建RDD JavaRDD<String> lines = ctx.textFile(filePath); // 将lines里每个元素(line)按空格拆分为一个个word,组成新的RDD // 转换-flatMap:每个元素输入项都可以被映射到0个或多个的输出项,最终将结果”扁平化“后输出 JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String,String>(){ private static final long serialVersionUID = 1L; @Override public Iterable<String> call(String line) throws Exception { return Arrays.asList(SPACES.split(line.trim())); } }); // 以每个word为key,给每个word赋一个基数1作为value,组成pair,以便接下来执行累加计算 // 转换-mapToPair:每个元素输入都被映射到一个key-value格式的输出项 JavaPairRDD<String,Integer> wordOnes = words.mapToPair(new PairFunction<String,String,Integer>(){ private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); } }); // 累加相同word的基数(即value) // 转换-reduceByKey:按Key进行分组,使用给定的func函数聚合value值。 //一组相同key的元素作为输入项,映射到一个key不变,value经过聚合的key-value元素 JavaPairRDD<String,Integer> wordCount = wordOnes.reduceByKey(new Function2<Integer,Integer,Integer>(){ private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { // v1:第一个value的值(第一次执行时)或者前一次执行结果 // v2:下一个value的值 // 最后的结果就是累加值 return v1 + v2; } }); // 转换-map:每个元素输入仅映射到一个输出项(这里是为了方便之后以词频排序) // 转换-filter:全部元素为输入项,输出所有给定的func函数返回值为true的 // 转换-sortBy:以给定函数返回值作为排序依据进行排序 JavaRDD<Tuple2<String, Integer>> wordContList = wordCount.map(new Function<Tuple2<String,Integer>,Tuple2<String,Integer>>(){ private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(Tuple2<String, Integer> v1) throws Exception { return new Tuple2<String,Integer>(v1._1, v1._2); } }).filter(new Function<Tuple2<String,Integer>,Boolean>(){ private static final long serialVersionUID = 1L; @Override public Boolean call(Tuple2<String, Integer> v1) throws Exception { return v1._2 >= 10 && null != v1._1 && !v1._1.trim().isEmpty(); } }).sortBy(new Function<Tuple2<String,Integer>,Integer>(){ private static final long serialVersionUID = 1L; @Override public Integer call(Tuple2<String, Integer> v1) throws Exception { return v1._2; // 返回value作为排序依据 } }, false,1);//降序排列 // 输出 wordContList.foreach(new VoidFunction<Tuple2<String, Integer>>() { private static final long serialVersionUID = 1L; @Override public void call(Tuple2<String, Integer> e) throws Exception { System.out.println("【" + e._1 + "】出现了" + e._2 + "次"); } }); // 行动:汇总所有数据 /*List<Tuple2<String, Integer>> output = wordCount.collect(); for (Tuple2<?,?> tuple:output) { System.out.println("【" + tuple._1 + "】出现了" + tuple._2 + "次"); }*/ ctx.close(); //ctx.stop(); }}
转载请注明原文地址: https://ju.6miu.com/read-23556.html