spark streaming源码解读

    xiaoxiao2021-03-26  22

    让我们按照源码一步步的解析。请一定一定一定和我们一起跟踪代码。一定要照着做,才能理解。

    我们先看下StreamingContext创建的时候有哪些成员变量被初始化了。

    import org.apache.spark.SparkConf import org.apache.spark.streaming.{Durations, StreamingContext}

    object StreamingWordCountSelfScala {   def main(args: Array[String]) {     val sparkConf = new SparkConf().setMaster("spark://master:7077").setAppName("StreamingWordCountSelfScala")     val ssc = new StreamingContext(sparkConf, Durations.seconds(5)) // 每5秒收割一次数据     val lines = ssc.socketTextStream("localhost", 9999) // 监听 本地9999 socket 端口     val words = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) // flat map 后 reduce     words.print() // 打印结果     ssc.start() // 启动     ssc.awaitTermination()     ssc.stop(true)   } }

    // 每5秒收割一次数据,这里只是定义收割时间。也就是每5秒中产生的数据是同一个批次的。 val ssc = new StreamingContext(sparkConf, Durations.seconds(5)) // StreamingContext line 80 def this(conf: SparkConf, batchDuration: Duration) = {   this(StreamingContext.createNewSparkContext(conf), null, batchDuration) }

    看下 StreamingContext.createNewSparkContext(conf)

    直接new 了SparkContext,这和我们平常的基于Spark Core编写的程序完全一样。SparkStreaming就是Spark Core上的一个应用程序。只是复杂了点而已。

    // StreamingContext.scala line 873 private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = {   new SparkContext(conf) }

    回到创建StreamingContext的构造中。代码片段列出了几个比较关键的成员变量初始化

    // StreamingContext.scala line 49 class StreamingContext private[streaming] (     sc_ : SparkContext,     cp_ : Checkpoint,     batchDur_ : Duration   ) extends Logging{   ... // line 128 if (sc_ == null && cp_ == null) {   throw new Exception("Spark Streaming cannot be initialized with " +     "both SparkContext and checkpoint as null") } private[streaming] val isCheckpointPresent = (cp_ != null) // line 136 private[streaming] val sc: SparkContext = {   if (sc_ != null) {     sc_   } else if (isCheckpointPresent) {     SparkContext.getOrCreate(cp_.createSparkConf()) // 通过SparkConf创建SparkContext,总之一定要以SparkContext为入口,天堂之门   } else {     throw new SparkException("Cannot create StreamingContext without a SparkContext")   } } if (sc.conf.get("spark.master") == "local" || sc.conf.get("spark.master") == "local[1]") {  // 判断local模式下,一定要1条以上线程,因为要分配一条线程接受数据,如果只有一条进程的话,就没有进程来处理接收到的数据了   logWarning("spark.master should be set as local[n], n > 1 in local mode if you have receivers" +     " to get data, otherwise Spark jobs will not get resources to process the received data.") } private[streaming] val conf = sc.conf private[streaming] val env = sc.env // line 155 private[streaming] val graph: DStreamGraph = {   if (isCheckpointPresent) { // 如果CheckPoint存在,直接从CheckPoint恢复     cp_.graph.setContext(this)     cp_.graph.restoreCheckpointData()     cp_.graph   } else {     require(batchDur_ != null, "Batch duration for StreamingContext cannot be null")     val newGraph = new DStreamGraph() // 直接创建DStreamGraph     newGraph.setBatchDuration(batchDur_)      newGraph   } } // 原子Integer类型,哪会用到呢?我们拭目以待。 private val nextInputStreamId = new AtomicInteger(0) // line 170  private[streaming] var checkpointDir: String = {   if (isCheckpointPresent) {     sc.setCheckpointDir(cp_.checkpointDir)     cp_.checkpointDir   } else {     null   } } // line 179 // 这里出现另一个duration。若没单独设置,直接使用batchDuration的值,本例中为5秒 private[streaming] val checkpointDuration: Duration = {   if (isCheckpointPresent) cp_.checkpointDuration else graph.batchDuration } // line 183 // 重点,后面单独详细说明 private[streaming] val scheduler = new JobScheduler(this) private[streaming] val waiter = new ContextWaiter private[streaming] val progressListener = new StreamingJobProgressListener(this) private[streaming] val uiTab: Option[StreamingTab] =   if (conf.getBoolean("spark.ui.enabled", true)) {     Some(new StreamingTab(this))   } else {     None   } /* Initializing a streamingSource to register metrics */ private val streamingSource = new StreamingSource(this) private var state: StreamingContextState = INITIALIZED private val startSite = new AtomicReference[CallSite](null) private[streaming] def getStartSite(): CallSite = startSite.get() private var shutdownHookRef: AnyRef = _ conf.getOption("spark.streaming.checkpoint.directory").foreach(checkpoint)   ... }

    有几个重点的成员变量,DStreamGraph、JobScheduler

    让我们先深入看看DStreamGraph

    // DStreamGraph.scala line 27 final private[streaming] class DStreamGraph extends Serializable with Logging { // 关注下这里,实现了Serializable接口,就意味着是可以被序列化的 // InputDStream类型的动态数组哦。后续分析哦。   private val inputStreams = new ArrayBuffer[InputDStream[_]]()   private val outputStreams = new ArrayBuffer[DStream[_]]()   var rememberDuration: Duration = null   var checkpointInProgress = false   var zeroTime: Time = null   var startTime: Time = null   var batchDuration: Duration = null     // ...     // 定义的其他方法 }

    再看下JobScheduler的构造

    // JobScheduler.scala line 37 /**  * This class schedules jobs to be run on Spark. It uses the JobGenerator to generate  * the jobs and runs them using a thread pool.  * 本类对运行在Spark上的job进行调度。使用JobGenerator来生成Jobs,并且在线程池运行。  * 说的很清楚了。由JobGenerator生成Job,在线程池中运行。  */ private[streaming] class JobScheduler(val ssc: StreamingContext) extends Logging {   // Use of ConcurrentHashMap.keySet later causes an odd runtime problem due to Java 7/8 diff   // https://gist.github.com/AlainODea/1375759b8720a3f9f094   private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet] // 以时间作为key   // 默认并发Jobs数为1   private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)   // 使用线程方式执行   private val jobExecutor =     ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")     // line 50     // 创建JobGenerator,后续会详细分析   private val jobGenerator = new JobGenerator(this)   val clock = jobGenerator.clock   val listenerBus = new StreamingListenerBus()   // These two are created only when scheduler starts.   // eventLoop not being null means the scheduler has been started and not stopped   var receiverTracker: ReceiverTracker = null   // A tracker to track all the input stream information as well as processed record number   var inputInfoTracker: InputInfoTracker = null   private var eventLoop: EventLoop[JobSchedulerEvent] = null

    再分析下JobGenerator

    // JobScheduler.scala line 37 /**  * This class schedules jobs to be run on Spark. It uses the JobGenerator to generate  * the jobs and runs them using a thread pool.  * 本类对运行在Spark上的job进行调度。使用JobGenerator来生成Jobs,并且在线程池运行。  * 说的很清楚了。由JobGenerator生成Job,在线程池中运行。  */ private[streaming] class JobScheduler(val ssc: StreamingContext) extends Logging {   // Use of ConcurrentHashMap.keySet later causes an odd runtime problem due to Java 7/8 diff   // https://gist.github.com/AlainODea/1375759b8720a3f9f094   private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet] // 以时间作为key   // 默认并发Jobs数为1   private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)   // 使用线程方式执行   private val jobExecutor =     ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")     // line 50     // 创建JobGenerator,后续会详细分析   private val jobGenerator = new JobGenerator(this)   val clock = jobGenerator.clock   val listenerBus = new StreamingListenerBus()   // These two are created only when scheduler starts.   // eventLoop not being null means the scheduler has been started and not stopped   var receiverTracker: ReceiverTracker = null   // A tracker to track all the input stream information as well as processed record number   var inputInfoTracker: InputInfoTracker = null   private var eventLoop: EventLoop[JobSchedulerEvent] = null

    让我们来回顾下,整个StreamingContext的实例化过程

    创建SparkContext;其中创建的TaskScheduler、SchedulerBackend、DAGScheduler等属于Spark Core的内容。

    StreamingContext中实例化了DStreamingGraph、JobScheduler

    JobScheduler中实例化了JobGenerator、默认数量为1的Job执行线程池(jobExecutor)

    JobGenator中定义了定时触发的函数、并传入RecurringTimer(循环定时器)的构造中

    至此,StreamingContext已经实例化完成。

    转载请注明原文地址: https://ju.6miu.com/read-660543.html

    最新回复(0)