Hive on Spark源码分析(一)—— SparkTask Hive on Spark源码分析(二)—— SparkSession与HiveSparkClient Hive on Spark源码分析(三)—— SparkClilent与SparkClientImpl(上) Hive on Spark源码分析(四)—— SparkClilent与SparkClientImpl(下) Hive on Spark源码分析(五)—— RemoteDriver Hive on Spark源码分析(六)—— RemoteSparkJobMonitor与JobHandle
在Hive on Spark源码分析(一)—— SparkTask中已经介绍到,SparkSession是HOS任务提交的起点。在HOS的代码实现中,SparkSession是一个接口,具体的实现类是SparkSessionImpl,因此我们直接进入到SparkSessionImpl内部进行分析。
首先是初始化:
//通过makeSessionId,随机生成一个id就可以初始化一个sparkSession public SparkSessionImpl() { sessionId = makeSessionId(); } public static String makeSessionId() { return UUID.randomUUID().toString(); } 初始化后需要打开session。其实所谓的打开一个session,其实就是实例化一个hiveSparkClient对象。创建过程基于简单工厂模式,如果获取到的spark.master参数的值为"local",则创建LocalHiveSparkClient实例 ,否则创建RemoteHiveSparkClient实例: @Override public void open(HiveConf conf) throws HiveException { this.conf = conf; isOpen = true; try { hiveSparkClient = HiveSparkClientFactory.createHiveSparkClient(conf); } catch (Throwable e) { throw new HiveException("Failed to create spark client.", e); } } 实例化 hiveSparkClient对象后,则可以通过 hiveSparkClient提交session需要提交的任务,具体实现在submit方法中。首先判断session是否已经打开,如果已经成功打开则调用 hiveSparkClient的execute方法进行任务提交: @Override public SparkJobRef submit(DriverContext driverContext, SparkWork sparkWork) throws Exception { Preconditions.checkState(isOpen, "Session is not open. Can\'t submit jobs."); return hiveSparkClient.execute(driverContext, sparkWork); } 由于H iveSparkClient是一个接口,它有两个实现类,分别是 LocalHiveSparkClient和 RemoteHiveSparkClient。 前面我们已经提到,其实例根据提交参数的不同,会分别创建不同的子类实例 。因为local模式通常仅用于实验环境,因此这里我们主要分析一下 RemoteHiveSparkClient的execute方法是如何提交SparkSession的任务的。 RemoteHiveSparkClient的execute方法首先判断是否提交模式为yarn-cluster或yarn-client,并且remoteClient(SparkClient实例)还没准备好,是的话需要创建一个SparkClient。否则直接调用submit方法将任务提交。 @Override public SparkJobRef execute(final DriverContext driverContext, final SparkWork sparkWork) throws Exception { if (hiveConf.get("spark.master").startsWith("yarn-") && !remoteClient.isActive()) { // Re-create the remote client if not active any more close(); createRemoteClient(); } try { return submit(driverContext, sparkWork); } catch (Throwable cause) { throw new Exception("Failed to submit Spark work, please retry later", cause); } } SparkClient的创建同样是使用工厂模式,但是SparkClient的实现其实只有SparkClientImpl。并且如果开启了动态资源分配,会对executor进行数目进行配置: private void createRemoteClient() throws Exception { remoteClient = SparkClientFactory.createClient(conf, hiveConf); if (HiveConf.getBoolVar(hiveConf, ConfVars.HIVE_PREWARM_ENABLED) && hiveConf.get("spark.master").startsWith("yarn-")) { int minExecutors = getExecutorsToWarm(); if (minExecutors <= 0) { return; } LOG.info("Prewarm Spark executors. The minimum number of executors to warm is " + minExecutors); // Spend at most MAX_PREWARM_TIME to wait for executors to come up. int curExecutors = 0; long ts = System.currentTimeMillis(); do { try { curExecutors = getExecutorCount(MAX_PREWARM_TIME, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { // let\'s don\'t fail on future timeout since we have a timeout for pre-warm LOG.warn("Timed out getting executor count.", e); } if (curExecutors >= minExecutors) { LOG.info("Finished prewarming Spark executors. The current number of executors is " + curExecutors); return; } Thread.sleep(500); // sleep half a second } while (System.currentTimeMillis() - ts < MAX_PREWARM_TIME); LOG.info("Timeout (" + MAX_PREWARM_TIME / 1000 + "s) occurred while prewarming executors. " + "The current number of executors is " + curExecutors); } } 下面来看一下submit方法的实现: private SparkJobRef submit(final DriverContext driverContext, final SparkWork sparkWork) throws Exception { final Context ctx = driverContext.getCtx(); final HiveConf hiveConf = (HiveConf) ctx.getConf(); //添加各种jar包和配置文件 refreshLocalResources(sparkWork, hiveConf); final JobConf jobConf = new JobConf(hiveConf); // Create temporary scratch dir final Path emptyScratchDir = ctx.getMRTmpPath(); FileSystem fs = emptyScratchDir.getFileSystem(jobConf); fs.mkdirs(emptyScratchDir); //序列化配置、临时目录路径和任务 byte[] jobConfBytes = KryoSerializer.serializeJobConf(jobConf); byte[] scratchDirBytes = KryoSerializer.serialize(emptyScratchDir); byte[] sparkWorkBytes = KryoSerializer.serialize(sparkWork); //创建JobStatusJob,包含环境配置,临时目录和具体的提交的任务 JobStatusJob job = new JobStatusJob(jobConfBytes, scratchDirBytes, sparkWorkBytes); //这一步调用SparkClientImpl向RemoteDriver提交job,job包含原来提交过来的SparkWork,还包含了jobConf和临时目录(emptyScratchDir) //过程: SparkClientImpl.submit => ClientProtocol.submit => JobHandle<Serializable> jobHandle = remoteClient.submit(job); //这里sparkClientTimtout是hive-site.xml中配置的hive.spark.job.monitor.timeout //remoteClient是SparkClientImpl //jobHanle是submit后返回的JobHandleImpl,是跟踪任务的关键 RemoteSparkJobStatus sparkJobStatus = new RemoteSparkJobStatus(remoteClient, jobHandle, sparkClientTimtout); return new RemoteSparkJobRef(hiveConf, jobHandle, sparkJobStatus); } RemoteHiveSparkClient中还有一些其他简单方法这里就不做具体分析了。另外这个类里面还有一个非常重要的call方法,是最终生成执行计划,得到RDD并最终执行的方法,后面会在RemoteDriver类的分析中进行具体分析。 现在任务的提交又传递给了remoteClient,是一个SparkClient类型的实例,因此下一篇我们将会具体来分析SparkClient的实现类,SparkClientImpl。
