Spark要点

    xiaoxiao2022-06-24  34

    Spark要点: 内存计算,DAG; RDD:Resilient Distributed Dataset    弹性分布式数据集 RDD可以基于工作集应用 RDD特征:有很多partition(数据分片),并行度从上一个RDD继承;每个split(数据分片)对应一个函数function(),函数处理就是以任务方式运行;RDD依赖一组其他的RDD;对于key-value RDD,它的Partitioner是hash-partioned;一组相关的数据分片地址会被计算。 创建RDD的三种方式:1、通过已经存在的Scala集合   2、通过HDFS、HBase等   3、其它的RDD的转换 Created sql context (with Hive support) .. SQL context available as sqlContext val collection = sc.parallelize 从分布式文件系统或者HBase中读取数据 val hdfsData = sc.textFile(“/library/wordcount/input/Data”) 从一个数据集转化为另一个数据集:transformation val wordcount = sc.textFile(“/library/wordcount/input/Data”).flatMap(_.split(“ “)).map(word => (word,1)).reduceByKey(_+_) wordcount.toDebugString transformation的lazy特性:要结果才发生计算,避免产生大量的中间结果 RDD的Runtime架构: 数据:基于分布式存储e.g HDFS、Scala集合、Scala标量 算法流程: 输入算子(textFile、parallelize等)->变换算子(filter、map等)->缓存算子(cache算子)->行动算子(action算子) Tranformation: map:   f:T -> u flatMap:合并多个map结果为一个数据集 mapPartitions (对整个分区操作,一个partition有很多条数据): Iter => iter.filter(_>=3) filter: f:T->Boolean cartesian:笛卡尔积 union:并集(须确保两个RDD的类型一致,不做去重) mapValues(针对k-v RDD的value操作):mapValues(a=>a+2) subtract:去除交集部分元素 sample:对RDD元素采样   百分比 fraction = 0.5  , 种子 seed = 9 takeSample:返回一个collection groupBy:分组 partionBy:分区 cogroup:key-元组 combineByKey: def combineByKey[C](createCombiner:V => c, mergeValue:(C,V) => C,mergeCombiners:(C,C) => C, partitionner: Partitionner,mapSideCombine:Boolean = true, serializer:Serialize = null) :RDD[(K,C0] reduceByKey:对key相同的元素执行操作 join:先进行cogroup产生一个新的RDD,再对每个key对应的元组做笛卡尔积 lefOuterJoin    rightOutJoin RDD的cache和persist:也是lazy级别,只有action触发才执行 Action: Foreach(_ => println(_)) collect:相当于把所有RDD数据分片toArray(Scala级别的Array) collectAsMap():key重复,后面的元素会覆盖前面的元素 reduceByKeyLocally:Reduce -> collectAsMap lookup:对key所在分区进行扫描(若RDD包含分区器) count:返回整个RDD中元素个数(RDD可以包含多个数据分片split) reduceLeft:从左往右迭代执行函数 fold:带zeroValue   V0的reduceLeft操作   (串行处理) aggregate:归并方式进行数据聚集   (并行处理) saveAsTextFile:每个元素以k-v (null,element.toString)方式存储到HDFS saveAsObjectFile:先将RDD中元素转成数组,在序列化成ByteWritable,以sequenceFile存入HDFS RDD缓存(数据重用):常用于 交互式查询 或者 迭代计算 def persist(newLevel:StorageLevel): this.type = {    if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) { throw new UnsupportedOperationException(“Cannot change storage level of an RDD after it was already assigned a level”) } sc.persistRDD(this) sc.cleaner.foreach(_.registerRDDForCleanup(this)) storageLevel = newLevel this } 检查点checkpoint:避免内存不足等原因缓存丢失导致重新计算的性能开销,但会导致新作业的产生 def checkpoint(){ if(context.checkpointDir.isEmpty){ throw new SparkException(“Checkpoint directory has not been set in the SparkContext") }else if(checkpointData.isEmpty){ checkpointData = Some(new RDDCheckpointData(this)) checkpointData.get.markForCheckpoint() } } RDD之间关系类型:窄依赖(父子一对一)和宽依赖(父子多对多:父RDD的partition对应子RDD多个partition) NarrowDependency(内部pipeline提升效率):map  union ShuffleDependency(通常有Hash和Sorted两种方式): groupBy  join RDD关系网构成DAG 根据宽依赖关系划分stage,Stage构成大粒度的DAG RDD中Task分类:ShuffleMapTask和ResultTask runTask()方法都会用到iterator final def iterator(split:Partition,context:TaskContext):Iterator[T] = { if(storageLevel != StorageLevel.NONE){ SparkEnv.get.cacheManager.getOrCompute(this,split,context,storageLevel) }else{ computeOrReadCheckpoint(split,context) } } SparkEnv:Spark内核组件 iterator源码: private val loading = new mutable.HashSet[RDDBlockId] def getOrCompute[T](rdd : RDD[T], partition : Partition, context : TaskContext, storageLevel:StorageLevel):Iterator[T] = { val key = RDDBlockId(rdd.id, partition.index) logDebug(s”looking for partition $key”) blockManager.get(key) match{ case Some(blockResult) => //Partition is already materialized , so just return its values context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics) new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]]) case None => //Acquire a lock for loading this partition // if another thread already holds the lock, wait for it to finish return its results val storedValues = acquiredLockForPartition[T](key) if(storedValues.isDefined){ return new InterruptibleIterator[T](context, storedValues.get) } //Otherwise, we have to load the partition ourselves try { logInfo(s”Partition $key not found ,computing it”) val computedValues = rdd.computOrReadCheckPoint(partition , context) //if the task is running locally , do not persist the result if(context.isRunningLocally){  return computedValues } } //Otherwise, cache the values and keep track of any updates in block statuses val updatedBlocks = new ArrayBuffer[(BlockId , BlockStatus)] val cachedValues = putInBlockManager(key , computedValues , storageLevel , updatedBlocks) val metrics = context.taskMetrics val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(seql(BlockId, BlockStatus)]()) metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq) new InterruptibleIterator(context , cacheValues) }finally { loading.synchronized{ loading.remove(key) loading.notifyAll() } } } RDD容错原理:lineage继承血统,宽依赖时部分分区丢失导致冗余计算,对过长lineage和宽依赖做CheckPoint DriverProgram运行应用程序,创建SparkContext,管理spark资源,一个application可以有多个作业,一个job就是一个DAG,由action个数决定,默认每个application对应一个executor,每个task对应一个数据块 DAGScheduler从后往前回溯job的DAG,并将其划分为多个Stage(将RDD DAG转化为Stage RDD),每个Stage都会产生一系列任务的集合TaskSet,TaskScheduler执行具体task spark应用运行两种模式:client模式:driver在client机器中,cluster模式:driver在集群的worker机器中 SchedulerBackend接口 DAGSchedulerEventProcessActor源码: private def initializeEventProcessActor(){ //blocking the thread until supervisor is started , which ensures eventProcessActor is // not null before any job is submitted implicit val timeout = Timeout(30 seconds) val initEventActorReply =  dagSchedulerActorSupervisor ? Props(new DAGSchedulerEventProcessActor(this)) ventProcessActor = Await.result(initEventActorReply , timeout.duration).asInstanceOf[ActorRef] } 两种job提交方式:runJob和runApproximateJob job的Stage划分: 获取父RDD(从后往前广度优先遍历回溯): private def getParentStages(rdd : RDD[_] , jobId : Int) : List[Stage]  = { val parents = new HashSet[Stage] val visited = new HashSet[RDD[-]] val watintForVist = new Stack[RDD[_]] def visit(r : EDD[_]] { if(!visited(r)){ visited += r for(dep <- r.dependencies){ dep match { case shufDep : ShuffleDependency[_, _, _] => parents += getShuffleMapStage(shufDep , jobId) case _=> waitingForVisit.push(dep.rdd) } } } } waitingForVist.push(rdd) while (!WaitingForVist.isEmpty){ visit(waitingForVisit.pop()) } parents.toList } private def registerShuffleDependencies(shuffleDep : shuffleDependency[_,_,_], jobId: Int) = { val parentsWithNoMapStage = getAncestorShuffleDependencies(shuffleDep.rdd) while(!parentsWithNoMapStage.isEmpty){ val currentShufDep = parentsWIthNoMapStage.pop() val stage =  newOrUsedStage( currentShufDep.rdd , currentShufDep.rdd.partition.size, currentShufDep, jobId, currentShufDep.rdd.creationSite) shuffleToMapStage(currentShufDep.shuffleId) = stage } } 当前RDD没有Stage则创建 mapOutputTracker.containsShuffle(shuffleDep.shuffled)  判断被计算的stage for(parent <- missing) submitStage(parent)   从finalStage往前提交父Stage任务 tasks分为shuffleMapTask和resultTask,选出本地化的partition进行计算: val tasks: Seq[Task[_]] = if (stage.isShuffleMap){ partitionToCompute.map{  id => val locs = getPreferredLocs(stage.rdd, id) val part = stage.rdd.partitions(id) new ShuffleMapTask(stage.id, taskBinary, part, locs) } }else{ val job = stage.resultOfJob.get partitionToCompute.map {  id => val p: Int = job.parttions(id) val part = stage.rdd.parttions(p) val locs = getPreferredLocs(stage.rdd, p) new ResultTask(stage.id, taskBinary, part, locs, id) } } executorData.excutorActor ! LauchTask(new SerializableBuffer(serializedTask)) Stage内部就是taskSet rootPool = new Pool(“”, schedulingMode, 0 ,0)  //Pool是树状结构 task只有被标记FINISHED才是成功,TaskState为FAILED、KILLED、LOST都是失败 JobWaiter是JobListener的具体实现 private[spark] trait JobListener{ def taskSucceeded(index: Int, result: Any) def jobFailed(exception: Exception) } stage执行顺序从前往后 mapOutputTracker.registerMapOutputs( stage.shuffleDep.get.shuffleId, stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray, changeEpoch = true) TaskSetManager不断发出和任务 backend.reviveOffers()  重新分配失败任务 SchedulerBackend底层有TaskScheduler调度器(隶属于Stage内部,负责具体任务执行) 本地模式:local[线程数目,重试次数] yarn模式下把内存cpu等资源封装成Container Node Manager负责管理当前节点资源 Resource Manager分配AppMaster(例如NameNode、SchedulerBackend)和Container(例如DataNode、Executor),由Node Manager启动 Mesos粗粒度调度和细粒度调度两种调度方式 driver驱动的client向master注册申请资源 sender ! RegisteredApplication(app.id, masterUrl) 当应用注册成功,master向client发送消息 case LaunchExecutor(masterUrl,appId,appDesc,core_,memory_) Master.scala Client.scala Worker.scala 它们之间通过Actor通信 Worker本身就是一个Actor Worker与Master主要通过接收消息和发送心跳来通信 case SendHeartbeat => if (connected) { master ! Heartbeat(workerId) } zookeeper能够存储元数据和作为master的HA Worker异常退出时的处理: override def postStop(){ metricsSystem.report() registrationRetryTimer.foreach(_.cancel()) executors.values.foreach(_.kill()) shuffleService.stop() webUi.stop() meticsSystem.stop() } Executor出现的异常会被Worker转发给Master HA源码:ZooKeeperLeaderElectionAgent.scala spreadOutApp为true则均匀分配到多个worker,否则分配在一个worker(适合cpu密集型) executor里边用ConcurrentHashMap[Long,TaskRunner]存放task线程 task在Driver中序列化,在Executor反序列化 val ser = SparkEnv.get.closureSerializer.newInstance() val serializedTask = ser.serialize(task) TaskContext上下文采用ThreadLocal存储 task的结果result大于最大限制将被抛弃 Storage模块的DiskBlockManager把逻辑block与物理磁盘文件映射 spark有不同的StorageLevel(存储级别),RDD因此非常灵活 推荐使用OFF_HEAP存储级别,共享内存池,高效利用资源 BlockStore(内存读写)和DiskStore(磁盘读写) TachyonStore内存分布式文件系统读写,有利于不同executor共享数据 底层都是调用doPut操作 hadoop和spark大部分性能问题都跟shuffle有关 有shuffle的时候就会有stage 当task处理的数据大于能载入的内存时就要提高RDD的并行度 Consolidate用于解决shuffle过程产生文件过多的问题,通过线程复用,一个core只产生一个文件,以追加方式写入文件内容 Shuffle Pluggable解决集群规模瓶颈 private[spark] trait ShuffleManager ShuffleWriter和ShuffleReader ShufflerBlockManager HashShuffleManager会产生过多文件,消耗过多内存和磁盘 SortShuffleManager把所有文件写入一个文件,提升IO性能 ShuffleBlockFetcherIterator.scala IndexShuffleBlockManager.scala和FileShuffleBlockManager.scala
    转载请注明原文地址: https://ju.6miu.com/read-1123711.html

    最新回复(0)