Spark的作业调度初步实践

    xiaoxiao2023-03-24  5

    1. 实践所使用的基本代码,是spark的java example。在作业调度实践里,将对这个代码稍作修改。其源码如下: -------------------------------------------------------------------------------------- package com.my.spark.demo; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import java.util.ArrayList; import java.util.List; public class PIMain { public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi").setMaster("local[2]");    JavaSparkContext jsc = new JavaSparkContext(sparkConf);    int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2;    int n = 100000 * slices;    List<Integer> l = new ArrayList<Integer>(n);    for (int i = 0; i < n; i++) {      l.add(i);    }    JavaRDD<Integer> dataSet = jsc.parallelize(l, slices);    int count = dataSet.map(new Function<Integer, Integer>() {      @Override      public Integer call(Integer integer) {        double x = Math.random() * 2 - 1;        double y = Math.random() * 2 - 1;        return (x * x + y * y < 1) ? 1 : 0;      }    }).reduce(new Function2<Integer, Integer, Integer>() {      @Override      public Integer call(Integer integer, Integer integer2) {        return integer + integer2;      }    });    System.out.println("Pi is roughly " + 4.0 * count / n);    jsc.stop(); } } -------------------------------------------------------------------------------------- 2. 静态调度实践: 静态调度有三种方式:spark.cores.max设置使用的最大cpu数量;spark.deploy.defaultCores更改默认cpu数量;spark.executor.memory设置使用的内存。 根据http://spark.apache.org/docs/1.6.1/configuration.html逐一解释参数: spark.cores.max: 在standalone模式和coarse--grained mesos模式,一个spark application能使用的最大数量的cpu。如果这个值没有设置,在standalone模式下就使用spark.deploy.defaultCores数量的cpu,如果在mesos下使用无限的cpu。 spark.deploy.defaultCores: 设置cpu核数,如果这个值没有设,那么应用会自己去获取计算的核数。 spark.executor.memory: 设置每个executor进程使用的内存量。 上述参数设置的位置。设置参数有三个位置,其一,在spark的conf目录下的配置文件里设置;其二,在spark-submit命令行设置;其三,在应用里设置。这三个都可以在应用里设置。 凡是需要启动executor之前的需要进行设置的参数,要在 同时,在spark的conf下,搜索上述三个参数,会发现spark.core.max和spark.deploy.defaultCores在conf的模板里没有,而spark.executor.memory在spark-env.sh里有相似的设置,也就SPARK_EXECUTOR_MEMORY,所以它可以通过配置文件进行设置。 修改源码,设置上述参数,完整源码如下: ------------------------------------- package com.spark.my.demo; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import java.util.ArrayList; import java.util.List; public class PIMain { public static void main(String[] args) { SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi"); sparkConf.set("spark.cores.max", "5"); sparkConf.set("spark.deploy.defaultCores", "1"); sparkConf.set("spark.executor.memory", "2g");    JavaSparkContext jsc = new JavaSparkContext(sparkConf);    int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2;    int n = 100000 * slices;    List<Integer> l = new ArrayList<Integer>(n);    for (int i = 0; i < n; i++) {      l.add(i);    }    JavaRDD<Integer> dataSet = jsc.parallelize(l, slices);    int count = dataSet.map(new Function<Integer, Integer>() {      @Override      public Integer call(Integer integer) {        double x = Math.random() * 2 - 1;        double y = Math.random() * 2 - 1;        return (x * x + y * y < 1) ? 1 : 0;      }    }).reduce(new Function2<Integer, Integer, Integer>() {      @Override      public Integer call(Integer integer, Integer integer2) {        return integer + integer2;      }    });    System.out.println("Pi is roughly " + 4.0 * count / n);        jsc.stop(); } } ------------------------------------- 3. 动态调度 这里面临几个问题: 1)对于关键参数spark.dynamicAllocation.enabled和关键参数spark.shuffle.service.enabled,在spark的文档里,对这两个参数进行了描述,但是找不到设置它的具体例子,也找不到spark/conf下对这一参数的设置,找不到设置这两个参数的具体位置。根据其原理判断,应该在spark/conf下进行设置,因为这个设置将影响spark集群的所有工作节点worker的行为。是否如此,需要检查源码进行验证。 2)同样,对与FAIR调度器和池化策略,spark也没有详细的文档和例子。这块需要研究源码或者寻找更细致的说明。 考虑到时间成本问题,目前只在跑通静态调度的情况下,现完成业务流程。动态调度留待主要流程搞定后再过来深挖。
    转载请注明原文地址: https://ju.6miu.com/read-1202337.html
    最新回复(0)