让我们按照源码一步步的解析。请一定一定一定和我们一起跟踪代码。一定要照着做,才能理解。
我们先看下StreamingContext创建的时候有哪些成员变量被初始化了。
import org.apache.spark.SparkConf import org.apache.spark.streaming.{Durations, StreamingContext}
object StreamingWordCountSelfScala { def main(args: Array[String]) { val sparkConf = new SparkConf().setMaster("spark://master:7077").setAppName("StreamingWordCountSelfScala") val ssc = new StreamingContext(sparkConf, Durations.seconds(5)) // 每5秒收割一次数据 val lines = ssc.socketTextStream("localhost", 9999) // 监听 本地9999 socket 端口 val words = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) // flat map 后 reduce words.print() // 打印结果 ssc.start() // 启动 ssc.awaitTermination() ssc.stop(true) } }
// 每5秒收割一次数据,这里只是定义收割时间。也就是每5秒中产生的数据是同一个批次的。 val ssc = new StreamingContext(sparkConf, Durations.seconds(5)) // StreamingContext line 80 def this(conf: SparkConf, batchDuration: Duration) = { this(StreamingContext.createNewSparkContext(conf), null, batchDuration) }
看下 StreamingContext.createNewSparkContext(conf)
直接new 了SparkContext,这和我们平常的基于Spark Core编写的程序完全一样。SparkStreaming就是Spark Core上的一个应用程序。只是复杂了点而已。
// StreamingContext.scala line 873 private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = { new SparkContext(conf) }
回到创建StreamingContext的构造中。代码片段列出了几个比较关键的成员变量初始化
// StreamingContext.scala line 49 class StreamingContext private[streaming] ( sc_ : SparkContext, cp_ : Checkpoint, batchDur_ : Duration ) extends Logging{ ... // line 128 if (sc_ == null && cp_ == null) { throw new Exception("Spark Streaming cannot be initialized with " + "both SparkContext and checkpoint as null") } private[streaming] val isCheckpointPresent = (cp_ != null) // line 136 private[streaming] val sc: SparkContext = { if (sc_ != null) { sc_ } else if (isCheckpointPresent) { SparkContext.getOrCreate(cp_.createSparkConf()) // 通过SparkConf创建SparkContext,总之一定要以SparkContext为入口,天堂之门 } else { throw new SparkException("Cannot create StreamingContext without a SparkContext") } } if (sc.conf.get("spark.master") == "local" || sc.conf.get("spark.master") == "local[1]") { // 判断local模式下,一定要1条以上线程,因为要分配一条线程接受数据,如果只有一条进程的话,就没有进程来处理接收到的数据了 logWarning("spark.master should be set as local[n], n > 1 in local mode if you have receivers" + " to get data, otherwise Spark jobs will not get resources to process the received data.") } private[streaming] val conf = sc.conf private[streaming] val env = sc.env // line 155 private[streaming] val graph: DStreamGraph = { if (isCheckpointPresent) { // 如果CheckPoint存在,直接从CheckPoint恢复 cp_.graph.setContext(this) cp_.graph.restoreCheckpointData() cp_.graph } else { require(batchDur_ != null, "Batch duration for StreamingContext cannot be null") val newGraph = new DStreamGraph() // 直接创建DStreamGraph newGraph.setBatchDuration(batchDur_) newGraph } } // 原子Integer类型,哪会用到呢?我们拭目以待。 private val nextInputStreamId = new AtomicInteger(0) // line 170 private[streaming] var checkpointDir: String = { if (isCheckpointPresent) { sc.setCheckpointDir(cp_.checkpointDir) cp_.checkpointDir } else { null } } // line 179 // 这里出现另一个duration。若没单独设置,直接使用batchDuration的值,本例中为5秒 private[streaming] val checkpointDuration: Duration = { if (isCheckpointPresent) cp_.checkpointDuration else graph.batchDuration } // line 183 // 重点,后面单独详细说明 private[streaming] val scheduler = new JobScheduler(this) private[streaming] val waiter = new ContextWaiter private[streaming] val progressListener = new StreamingJobProgressListener(this) private[streaming] val uiTab: Option[StreamingTab] = if (conf.getBoolean("spark.ui.enabled", true)) { Some(new StreamingTab(this)) } else { None } /* Initializing a streamingSource to register metrics */ private val streamingSource = new StreamingSource(this) private var state: StreamingContextState = INITIALIZED private val startSite = new AtomicReference[CallSite](null) private[streaming] def getStartSite(): CallSite = startSite.get() private var shutdownHookRef: AnyRef = _ conf.getOption("spark.streaming.checkpoint.directory").foreach(checkpoint) ... }
有几个重点的成员变量,DStreamGraph、JobScheduler
让我们先深入看看DStreamGraph
// DStreamGraph.scala line 27 final private[streaming] class DStreamGraph extends Serializable with Logging { // 关注下这里,实现了Serializable接口,就意味着是可以被序列化的 // InputDStream类型的动态数组哦。后续分析哦。 private val inputStreams = new ArrayBuffer[InputDStream[_]]() private val outputStreams = new ArrayBuffer[DStream[_]]() var rememberDuration: Duration = null var checkpointInProgress = false var zeroTime: Time = null var startTime: Time = null var batchDuration: Duration = null // ... // 定义的其他方法 }
再看下JobScheduler的构造
// JobScheduler.scala line 37 /** * This class schedules jobs to be run on Spark. It uses the JobGenerator to generate * the jobs and runs them using a thread pool. * 本类对运行在Spark上的job进行调度。使用JobGenerator来生成Jobs,并且在线程池运行。 * 说的很清楚了。由JobGenerator生成Job,在线程池中运行。 */ private[streaming] class JobScheduler(val ssc: StreamingContext) extends Logging { // Use of ConcurrentHashMap.keySet later causes an odd runtime problem due to Java 7/8 diff // https://gist.github.com/AlainODea/1375759b8720a3f9f094 private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet] // 以时间作为key // 默认并发Jobs数为1 private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1) // 使用线程方式执行 private val jobExecutor = ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor") // line 50 // 创建JobGenerator,后续会详细分析 private val jobGenerator = new JobGenerator(this) val clock = jobGenerator.clock val listenerBus = new StreamingListenerBus() // These two are created only when scheduler starts. // eventLoop not being null means the scheduler has been started and not stopped var receiverTracker: ReceiverTracker = null // A tracker to track all the input stream information as well as processed record number var inputInfoTracker: InputInfoTracker = null private var eventLoop: EventLoop[JobSchedulerEvent] = null
再分析下JobGenerator
// JobScheduler.scala line 37 /** * This class schedules jobs to be run on Spark. It uses the JobGenerator to generate * the jobs and runs them using a thread pool. * 本类对运行在Spark上的job进行调度。使用JobGenerator来生成Jobs,并且在线程池运行。 * 说的很清楚了。由JobGenerator生成Job,在线程池中运行。 */ private[streaming] class JobScheduler(val ssc: StreamingContext) extends Logging { // Use of ConcurrentHashMap.keySet later causes an odd runtime problem due to Java 7/8 diff // https://gist.github.com/AlainODea/1375759b8720a3f9f094 private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet] // 以时间作为key // 默认并发Jobs数为1 private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1) // 使用线程方式执行 private val jobExecutor = ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor") // line 50 // 创建JobGenerator,后续会详细分析 private val jobGenerator = new JobGenerator(this) val clock = jobGenerator.clock val listenerBus = new StreamingListenerBus() // These two are created only when scheduler starts. // eventLoop not being null means the scheduler has been started and not stopped var receiverTracker: ReceiverTracker = null // A tracker to track all the input stream information as well as processed record number var inputInfoTracker: InputInfoTracker = null private var eventLoop: EventLoop[JobSchedulerEvent] = null
让我们来回顾下,整个StreamingContext的实例化过程
创建SparkContext;其中创建的TaskScheduler、SchedulerBackend、DAGScheduler等属于Spark Core的内容。
StreamingContext中实例化了DStreamingGraph、JobScheduler
JobScheduler中实例化了JobGenerator、默认数量为1的Job执行线程池(jobExecutor)
JobGenator中定义了定时触发的函数、并传入RecurringTimer(循环定时器)的构造中
至此,StreamingContext已经实例化完成。
