1. 实践所使用的基本代码,是spark的java example。在作业调度实践里,将对这个代码稍作修改。其源码如下:
--------------------------------------------------------------------------------------
package com.my.spark.demo;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import java.util.ArrayList;
import java.util.List;
public class PIMain {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi").setMaster("local[2]");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2;
int n = 100000 * slices;
List<Integer> l = new ArrayList<Integer>(n);
for (int i = 0; i < n; i++) {
l.add(i);
}
JavaRDD<Integer> dataSet = jsc.parallelize(l, slices);
int count = dataSet.map(new Function<Integer, Integer>() {
@Override
public Integer call(Integer integer) {
double x = Math.random() * 2 - 1;
double y = Math.random() * 2 - 1;
return (x * x + y * y < 1) ? 1 : 0;
}
}).reduce(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
return integer + integer2;
}
});
System.out.println("Pi is roughly " + 4.0 * count / n);
jsc.stop();
}
}
--------------------------------------------------------------------------------------
2. 静态调度实践:
静态调度有三种方式:spark.cores.max设置使用的最大cpu数量;spark.deploy.defaultCores更改默认cpu数量;spark.executor.memory设置使用的内存。
根据http://spark.apache.org/docs/1.6.1/configuration.html逐一解释参数:
spark.cores.max: 在standalone模式和coarse--grained mesos模式,一个spark application能使用的最大数量的cpu。如果这个值没有设置,在standalone模式下就使用spark.deploy.defaultCores数量的cpu,如果在mesos下使用无限的cpu。
spark.deploy.defaultCores: 设置cpu核数,如果这个值没有设,那么应用会自己去获取计算的核数。
spark.executor.memory: 设置每个executor进程使用的内存量。
上述参数设置的位置。设置参数有三个位置,其一,在spark的conf目录下的配置文件里设置;其二,在spark-submit命令行设置;其三,在应用里设置。这三个都可以在应用里设置。
凡是需要启动executor之前的需要进行设置的参数,要在
同时,在spark的conf下,搜索上述三个参数,会发现spark.core.max和spark.deploy.defaultCores在conf的模板里没有,而spark.executor.memory在spark-env.sh里有相似的设置,也就SPARK_EXECUTOR_MEMORY,所以它可以通过配置文件进行设置。
修改源码,设置上述参数,完整源码如下:
-------------------------------------
package com.spark.my.demo;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import java.util.ArrayList;
import java.util.List;
public class PIMain {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi");
sparkConf.set("spark.cores.max", "5");
sparkConf.set("spark.deploy.defaultCores", "1");
sparkConf.set("spark.executor.memory", "2g");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2;
int n = 100000 * slices;
List<Integer> l = new ArrayList<Integer>(n);
for (int i = 0; i < n; i++) {
l.add(i);
}
JavaRDD<Integer> dataSet = jsc.parallelize(l, slices);
int count = dataSet.map(new Function<Integer, Integer>() {
@Override
public Integer call(Integer integer) {
double x = Math.random() * 2 - 1;
double y = Math.random() * 2 - 1;
return (x * x + y * y < 1) ? 1 : 0;
}
}).reduce(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) {
return integer + integer2;
}
});
System.out.println("Pi is roughly " + 4.0 * count / n);
jsc.stop();
}
}
-------------------------------------
3. 动态调度
这里面临几个问题:
1)对于关键参数spark.dynamicAllocation.enabled和关键参数spark.shuffle.service.enabled,在spark的文档里,对这两个参数进行了描述,但是找不到设置它的具体例子,也找不到spark/conf下对这一参数的设置,找不到设置这两个参数的具体位置。根据其原理判断,应该在spark/conf下进行设置,因为这个设置将影响spark集群的所有工作节点worker的行为。是否如此,需要检查源码进行验证。
2)同样,对与FAIR调度器和池化策略,spark也没有详细的文档和例子。这块需要研究源码或者寻找更细致的说明。
考虑到时间成本问题,目前只在跑通静态调度的情况下,现完成业务流程。动态调度留待主要流程搞定后再过来深挖。
转载请注明原文地址: https://ju.6miu.com/read-1202337.html