Spark集群启动之Master、Worker启动流程源码分析

    xiaoxiao2021-03-25  108

    Spark集群启动Master可以使用脚本启动:start-master,shell脚本细节自行查看。

    最终启动命令为:java -cp /home/daxin/bigdata/spark/conf/:/home/daxin/bigdata/spark/jars/*:/home/daxin/bigdata/hadoop/etc/hadoop/ -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.master.Master --host node --port 7077 --webui-port 8080

    最终转换为java命令启动Master的过程,所以我们就需要查看一下Master的main方法代码如下:

    val systemName = "sparkMaster" private val actorName = "Master" /** * spark-class脚本调用,启动master * * @param argStrings */ def main(argStrings: Array[String]) { SignalLogger.register(log) //参数配置准备 val conf = new SparkConf val args = new MasterArguments(argStrings, conf) //创建actorSystem // (actorSystem, boundPort, portsResponse.webUIPort, portsResponse.restPort) val (actorSystem, _, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf) actorSystem.awaitTermination() }

    通过代码可以可以知道调用startSystemAndActor方法完成ActorSystem和Actor的创建。startSystemAndActor方法中调用

    val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf, securityManager = securityMgr) 完成ActorSystem的创建,然后调用:

    val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName) 完成在该actorSystem中对actor的创建。然后执行Master的声明周期方法preStart:

    override def preStart() { logInfo("Starting Spark master at " + masterUrl) logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}") // Listen for remote client disconnection events, since they don't go through Akka's watch() context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) webUi.bind() masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort /** * 定时器,定义给自己发送心跳去检查是否有超超时的worker,有的话移除超时Worker。 */ context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut) masterMetricsSystem.registerSource(masterSource) masterMetricsSystem.start() applicationMetricsSystem.start() // Attach the master and app metrics servlet handler to the web ui after the metrics systems are started. masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler) applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler) /** * 高可用时候,元数据共享选择持久化引擎,分为ZOOKEEPER、FILESYSTEM、CUSTOM */ val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match { case "ZOOKEEPER" => logInfo("Persisting recovery state to ZooKeeper") val zkFactory = new ZooKeeperRecoveryModeFactory(conf, SerializationExtension(context.system)) (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this)) case "FILESYSTEM" => val fsFactory = new FileSystemRecoveryModeFactory(conf, SerializationExtension(context.system)) (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this)) case "CUSTOM" => val clazz = Class.forName(conf.get("spark.deploy.recoveryMode.factory")) val factory = clazz.getConstructor(conf.getClass, Serialization.getClass) .newInstance(conf, SerializationExtension(context.system)) .asInstanceOf[StandaloneRecoveryModeFactory] (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this)) case _ => (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this)) } persistenceEngine = persistenceEngine_ leaderElectionAgent = leaderElectionAgent_ } 接下来执行声明周期方法:

    override def receiveWithLogging = { //高可用选举 case ElectedLeader => { val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData() state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) { RecoveryState.ALIVE } else { RecoveryState.RECOVERING } logInfo("I have been elected leader! New state: " + state) if (state == RecoveryState.RECOVERING) { //开始恢复Master方法 beginRecovery(storedApps, storedDrivers, storedWorkers) recoveryCompletionTask = context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis, self, CompleteRecovery) } } case CompleteRecovery => completeRecovery() case RevokedLeadership => { logError("Leadership has been revoked -- master shutting down.") System.exit(0) } /** * Worker发来的注册消息 */ case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) => { logInfo("Registering worker %s:%d with %d cores, %s RAM".format(workerHost, workerPort, cores, Utils.megabytesToString(memory))) //如果当前Master状态为RecoveryState.STANDBY ,不回应Worker信息。 if (state == RecoveryState.STANDBY) { // ignore, don't send response } else if (idToWorker.contains(id)) { //如果包含WorkerInfo了,回复注册失败信息 sender ! RegisterWorkerFailed("Duplicate worker ID") } else { //注册新的Worker信息 val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, sender, workerUiPort, publicAddress) if (registerWorker(worker)) { //完成worker的持久化,以防master宕机之后无法恢复 persistenceEngine.addWorker(worker) //给Worker发送消息:告诉worker完成注册RegisteredWorker sender ! RegisteredWorker(masterUrl, masterWebUiUrl) schedule() } else { val workerAddress = worker.actor.path.address logWarning("Worker registration failed. Attempted to re-register worker at same " + "address: " + workerAddress) sender ! RegisterWorkerFailed("Attempted to re-register worker at same address: " + workerAddress) } } } case RequestSubmitDriver(description) => { if (state != RecoveryState.ALIVE) { val msg = s"Can only accept driver submissions in ALIVE state. Current state: $state." sender ! SubmitDriverResponse(false, None, msg) } else { logInfo("Driver submitted " + description.command.mainClass) val driver = createDriver(description) persistenceEngine.addDriver(driver) waitingDrivers += driver drivers.add(driver) schedule() // TODO: It might be good to instead have the submission client poll the master to determine // the current status of the driver. For now it's simply "fire and forget". sender ! SubmitDriverResponse(true, Some(driver.id), s"Driver successfully submitted as ${driver.id}") } } case RequestKillDriver(driverId) => { if (state != RecoveryState.ALIVE) { val msg = s"Can only kill drivers in ALIVE state. Current state: $state." sender ! KillDriverResponse(driverId, success = false, msg) } else { logInfo("Asked to kill driver " + driverId) val driver = drivers.find(_.id == driverId) driver match { case Some(d) => if (waitingDrivers.contains(d)) { waitingDrivers -= d self ! DriverStateChanged(driverId, DriverState.KILLED, None) } else { // We just notify the worker to kill the driver here. The final bookkeeping occurs // on the return path when the worker submits a state change back to the master // to notify it that the driver was successfully killed. d.worker.foreach { w => w.actor ! KillDriver(driverId) } } // TODO: It would be nice for this to be a synchronous response val msg = s"Kill request for $driverId submitted" logInfo(msg) sender ! KillDriverResponse(driverId, success = true, msg) case None => val msg = s"Driver $driverId has already finished or does not exist" logWarning(msg) sender ! KillDriverResponse(driverId, success = false, msg) } } } case RequestDriverStatus(driverId) => { (drivers ++ completedDrivers).find(_.id == driverId) match { case Some(driver) => sender ! DriverStatusResponse(found = true, Some(driver.state), driver.worker.map(_.id), driver.worker.map(_.hostPort), driver.exception) case None => sender ! DriverStatusResponse(found = false, None, None, None, None) } } /** * 提交应用给Master,Master启动executor * * <br>(如果没有理解错误的话)description中的command应该是:val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",其余参数略) * 代码位置:类的 SparkDeploySchedulerBackend中的command * */ case RegisterApplication(description) => { if (state == RecoveryState.STANDBY) { // ignore, don't send response } else { logInfo("Registering app " + description.name) //TODO 把应用信息存到内存, 重点:sender应该是clientActor val app = createApplication(description, sender) //sender应该是clientActor registerApplication(app) logInfo("Registered app " + description.name + " with ID " + app.id) //持久化app,实现容错 persistenceEngine.addApplication(app) //回复appClient已经注册(这一块不是worker) sender ! RegisteredApplication(app.id, masterUrl) //TODO Master开始调度资源,其实就是把任务启动启动到哪些Worker上 schedule() } } //TODO appClient发送来的消息,通知Executor状态 case ExecutorStateChanged(appId, execId, state, message, exitStatus) => { val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId)) execOption match { case Some(exec) => { val appInfo = idToApp(appId) exec.state = state if (state == ExecutorState.RUNNING) { appInfo.resetRetryCount() } // exec.application.driver = driverClient exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus) // 完成状态包括:KILLED, FAILED, LOST, EXITED 注意:这里是完成,不是成功! if (ExecutorState.isFinished(state)) { // Remove this executor from the worker and app logInfo(s"Removing executor ${exec.fullId} because it is $state") appInfo.removeExecutor(exec) //appInfo移除executor exec.worker.removeExecutor(exec) //worker移除executor val normalExit = exitStatus == Some(0) //判断是否正常推出 // Only retry certain number of times so we don't go into an infinite loop. if (!normalExit) { //异常退出 if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) { //当前重试次数是否小于最大重试次数MAX_NUM_RETRY10,如果小于重新调度 schedule() } else { //超过最大重启次数 val execs = appInfo.executors.values //获取当前app的所有executors if (!execs.exists(_.state == ExecutorState.RUNNING)) { //如果不存在运行的executor的话,直接removeApplication logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " + s"${appInfo.retryCount} times; removing it") removeApplication(appInfo, ApplicationState.FAILED) } } } } } //位置状态 case None => logWarning(s"Got status update for unknown executor $appId/$execId") } } /** * Worker发送来的消息,告诉Driver当前worker状态 * */ case DriverStateChanged(driverId, state, exception) => { state match { case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED => removeDriver(driverId, state, exception) case _ => throw new Exception(s"Received unexpected state update for driver $driverId: $state") } } case Heartbeat(workerId) => { idToWorker.get(workerId) match { case Some(workerInfo) => workerInfo.lastHeartbeat = System.currentTimeMillis() case None => if (workers.map(_.id).contains(workerId)) { logWarning(s"Got heartbeat from unregistered worker $workerId." + " Asking it to re-register.") sender ! ReconnectWorker(masterUrl) } else { logWarning(s"Got heartbeat from unregistered worker $workerId." + " This worker was never registered, so ignoring the heartbeat.") } } } case MasterChangeAcknowledged(appId) => { idToApp.get(appId) match { case Some(app) => logInfo("Application has been re-registered: " + appId) app.state = ApplicationState.WAITING case None => logWarning("Master change ack from unknown app: " + appId) } if (canCompleteRecovery) { completeRecovery() } } case WorkerSchedulerStateResponse(workerId, executors, driverIds) => { idToWorker.get(workerId) match { case Some(worker) => logInfo("Worker has been re-registered: " + workerId) worker.state = WorkerState.ALIVE val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined) for (exec <- validExecutors) { val app = idToApp.get(exec.appId).get val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId)) worker.addExecutor(execInfo) execInfo.copyState(exec) } for (driverId <- driverIds) { drivers.find(_.id == driverId).foreach { driver => driver.worker = Some(worker) driver.state = DriverState.RUNNING worker.drivers(driverId) = driver } } case None => logWarning("Scheduler state from unknown worker: " + workerId) } if (canCompleteRecovery) { completeRecovery() } } case DisassociatedEvent(_, address, _) => { // The disconnected client could've been either a worker or an app; remove whichever it was logInfo(s"$address got disassociated, removing it.") addressToWorker.get(address).foreach(removeWorker) addressToApp.get(address).foreach(finishApplication) if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() } } case RequestMasterState => { sender ! MasterStateResponse( host, port, restServerBoundPort, workers.toArray, apps.toArray, completedApps.toArray, drivers.toArray, completedDrivers.toArray, state) } case CheckForWorkerTimeOut => { timeOutDeadWorkers() } case BoundPortsRequest => { sender ! BoundPortsResponse(port, webUi.boundPort, restServerBoundPort) } }

    补充:关于Master的参数配置在org.apache.spark.util.AkkaUtils$#doCreateActorSystem方法中完成!

    接下来看看Worker的启动:

    Worker启动脚本有:

    1:start-slave.sh   指定masterUrl  只能在本地节点启动worker

    2:start-slaves.sh  SSH到各个Worker节点启动,里面调用的是slaves.sh脚本

    java -cp /home/daxin/bigdata/spark/conf/:/home/daxin/bigdata/spark/jars/*:/home/daxin/bigdata/hadoop/etc/hadoop/ -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.worker.Worker --webui-port 8082 spark://node:7077

    同理也是执行Worker的main方法:

    /** * * spark启动worker脚本调用main方法执行启动worker * * @param argStrings */ def main(argStrings: Array[String]) { //完成配置信息 SignalLogger.register(log) val conf = new SparkConf val args = new WorkerArguments(argStrings, conf) //创建actorSystem val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores, args.memory, args.masters, args.workDir) actorSystem.awaitTermination() } 同Master的main方法过程,接下来看看Worker的生命周期方法:

    /** * registered :Actor的声明周期方法 * 在registered中完成向Master的注册 * */ override def preStart() { assert(!registered) logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(host, port, cores, Utils.megabytesToString(memory))) logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}") logInfo("Spark home: " + sparkHome) createWorkDir() context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) shuffleService.startIfEnabled() webUi = new WorkerWebUI(this, workDir, webUiPort) webUi.bind() registerWithMaster()//完成向Master的注册 metricsSystem.registerSource(workerSource) metricsSystem.start() // Attach the worker metrics servlet handler to the web ui after the metrics system is started. metricsSystem.getServletHandlers.foreach(webUi.attachHandler) } 改天继续,时候太晚要回宿舍了。

    转载请注明原文地址: https://ju.6miu.com/read-6383.html

    最新回复(0)