Spark学习笔记(四)

    xiaoxiao2021-03-25  126

    为什么理解Spark内部?

    目标:查找每个“第一个字母”的不同名称的数量 给出一种代码: sc.textFile(“hdfs:/names”) .map(name => (name.charAt(0), name)) .groupByKey() .mapValues(names => names.toSet.size) .collect() 最后执行结果:

    Spark执行模型 1.创建RDD的DAG以表示计算 2.为DAG创建逻辑执行计划 3.计划和执行单个任务

    步骤1:创建RDD

    HadoopRDD -> map() -> groupBy() -> mapValues() -> collect()

    步骤2:创建执行计划(尽可能地Pipeline,根据需要重新组织数据,分成“Stage”) ( 一般是在shuffle的步骤下进行Stage的划分) 步骤3: 分派任务

    将每个阶段分成任务 任务是数据+计算 执行前一个阶段中的所有任务

    Shuffle

    在分区之间重新分配数据 将密钥哈希到桶中 优化: 如果数据已经正确分区,尽可能避免部分聚合减少了数据移动将中间文件写入磁盘

    执行groupBy()

    在每个分区中构建散列映射 A => [Arsalan, Aaron, Andrew, Andrew, Andy, Ahir, Ali, …],E => [Erin, Earl, Ed, …]注意:可以跨键,但单个键值对必须不超过内存大小

    从spark内核角度的思考

    优化方向 - 分区没有获得良好的并发性 - 键组过大 - 在集群中运送所有数据

    常见问题清单 1.确保有足够的分区用于并发 2.最小化内存消耗(特别是groupBys中的排序和大键) 3.最小化数据混洗的数量 1&2是关于调整分区数!

    分区调优的重要性 •主要问题:分区过少 - 减少并发 - 更易受数据偏移的影响 - 增加了groupBy,reduceByKey,sortByKey等的内存压力。 •次要问题:分区过多 •需要“合理数量”的分区 - 通常在100到10,000个分区之间 - 下限:集群中至少〜2×数量的核 - 上限:确保任务至少需要100ms

    内存问题 •症状: - 不可思议的性能表现不佳 - 无法解释的执行器/机器故障“ •诊断: - 设置spark.executor.extraJavaOptions包括 •-XX:+ PrintGCDetails •-XX:+ HeapDumpOnOutOfMemoryError - 检查dmesg的oom-killer日志 • 措施: - 增加spark.executor.memory - 增加分区数 - 重新评估程序结构(!)

    sc.textFile(“hdfs:/names”) .distinct(numPartitions = 6) .map(name => (name.charAt(0), name)) .groupByKey() .mapValues { names => names.size } .collect()

    sc.textFile(“hdfs:/names”) .distinct(numPartitions = 6) .map(name => (name.charAt(0), 1)) .reduceByKey(_ + _) .collect()

    在Shuffle之前对数据进行distinct,同时设置合理的分区数量,在设置key-value中的value时,直接置为1,再利用.reduceByKey(_ + _)直接得到结果。

    转载请注明原文地址: https://ju.6miu.com/read-8992.html

    最新回复(0)