sparkstreaming直连kafka源码分析(基于spark1.6)

2018-12-07 18:32 
val data = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](

ssc, kafkaParams, TopicsSet)

会创建一个

DirectKafkaInputDStream并加入到ssc自身维护的流程图变量
ssc.graph.addInputStream(this)

然后进行data.foreachRdd操作的时候会形成一个outputstream:

private def foreachRDD(
    foreachFunc: (RDD[T], Time) => Unit,
    displayInnerRDDOps: Boolean): Unit = {
  new ForEachDStream(this,
    context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
}

其中将自身(data)传入作为

ForEachDStream的parent,这样就保存了每个stream的血统。

ssc.start()方法会调用JobScheduler的start()方法:

def start(): Unit = synchronized {
  if (eventLoop != null) return // scheduler has already been started

  logDebug("Starting JobScheduler")
  eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
    override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)

    override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
  }
  eventLoop.start()

  // attach rate controllers of input streams to receive batch completion updates
  for {
    inputDStream <- ssc.graph.getInputStreams
    rateController <- inputDStream.rateController
  } ssc.addStreamingListener(rateController)

  listenerBus.start(ssc.sparkContext)
  receiverTracker = new ReceiverTracker(ssc)
  inputInfoTracker = new InputInfoTracker(ssc)
  receiverTracker.start()
  jobGenerator.start()
  logInfo("Started JobScheduler")
}
其中比较重要的是:
启动了一个线程消费任务事件:

private def processEvent(event: JobSchedulerEvent) {
  try {
    event match {
      case JobStarted(job, startTime) => handleJobStart(job, startTime)
      case JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime)
      case ErrorReported(m, e) => handleError(m, e)
    }
  } catch {
    case e: Throwable =>
      reportError("Error in job scheduler", e)
  }
}

另外的是:


receiverTracker.start()
jobGenerator.start()

receiverTracker.start()是专门为有receiver的inputstream设计的,其start代码如下:

def start(): Unit = synchronized {
  if (isTrackerStarted) {
    throw new SparkException("ReceiverTracker already started")
  }

  if (!receiverInputStreams.isEmpty) {
    endpoint = ssc.env.rpcEnv.setupEndpoint(
      "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
    if (!skipReceiverLaunch) launchReceivers()
    logInfo("ReceiverTracker started")
    trackerState = Started
  }
}

因为直连的方式创建的DirectKafkaInputDStream父类为InputStream,不是ReceiverInputDStream,所以

当采用直连的方式的时候 receiverTracker.start() 其实啥也没做。
那么关键就是
jobGenerator.start()
源码如下:

/** Start generation of jobs */
def start(): Unit = synchronized {
  if (eventLoop != null) return // generator has already been started

  // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.
  // See SPARK-10125
  checkpointWriter

  eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
    override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)

    override protected def onError(e: Throwable): Unit = {
      jobScheduler.reportError("Error in job generator", e)
    }
  }
  eventLoop.start()

  if (ssc.isCheckpointPresent) {
    restart()
  } else {
    startFirstTime()
  }
}

可以看到,他会启动一个消费事件的线程,然后判断是否首次启动,如果是,那么:


/** Starts the generator for the first time */
private def startFirstTime() {
  val startTime = new Time(timer.getStartTime())
  graph.start(startTime - graph.batchDuration)
  timer.start(startTime.milliseconds)
  logInfo("Started JobGenerator at " + startTime)
}

就会启动定时器,定时执行callBack函数,那么这个函数是怎么定义的呢?

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
  longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

可以看到会定时发送
GenerateJobs(new Time(longTime)))给eventLoop,然后看eventLoop的run方法:


override def run(): Unit = {
  try {
    while (!stopped.get) {
      val event = eventQueue.take()
      try {
        onReceive(event)
      } catch {
        case NonFatal(e) => {
          try {
            onError(e)
          } catch {
            case NonFatal(e) => logError("Unexpected error in " + name, e)
          }
        }
      }
    }
  } catch {
    case ie: InterruptedException => // exit even if eventQueue is not empty
    case NonFatal(e) => logError("Unexpected error in " + name, e)
  }
}

onReceive方法会调用:


/** Generate jobs and perform checkpoint for the given `time`.  */
private def generateJobs(time: Time) {
  // Set the SparkEnv in this thread, so that job generation code can access the environment
  // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
  // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
  SparkEnv.set(ssc.env)
  Try {
    jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
    graph.generateJobs(time) // generate jobs using allocated block
  } match {
    case Success(jobs) =>
      val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
      jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
    case Failure(e) =>
      jobScheduler.reportError("Error generating jobs for time " + time, e)
  }
  eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))  //此处可以看到每次rdd形成后就会checkpoint操作。
}

其中关键的是:

graph.generateJobs(time) ,点进去:

def generateJobs(time: Time): Seq[Job] = {
  logDebug("Generating jobs for time " + time)
  val jobs = this.synchronized {
    outputStreams.flatMap { outputStream =>
      val jobOption = outputStream.generateJob(time)
      jobOption.foreach(_.setCallSite(outputStream.creationSite))
      jobOption
    }
  }
  logDebug("Generated " + jobs.length + " jobs for time " + time)
  jobs
}

再点进outputStream.generateJob(time):



private[streaming] def generateJob(time: Time): Option[Job] = {
  getOrCompute(time) match {
    case Some(rdd) => {
      val jobFunc = () => {
        val emptyFunc = { (iterator: Iterator[T]) => {} }
        context.sparkContext.runJob(rdd, emptyFunc)
      }
      Some(new Job(time, jobFunc))
    }
    case None => None
  }
}

这里已经很清楚了,outputstreams(所有的action操作都会形成各自的outputstream,并保存在ssc的graph变量中
调用outputstream的compute方法形成rdd,而outputstream的compute又依赖于它的parent的compute,所以最终会调用

DirectKafkaInputDStream的compute方法
那么该方法又是如何运行成rdd的呢?
override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
  val untilOffsets = clamp(latestLeaderOffsets(maxRetries)) 
  val rdd = KafkaRDD[K, V, U, T, R](
    context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)

  // Report the record number and metadata of this batch interval to InputInfoTracker.
  val offsetRanges = currentOffsets.map { case (tp, fo) =>
    val uo = untilOffsets(tp)
    OffsetRange(tp.topic, tp.partition, fo, uo.offset)
  }
  val description = offsetRanges.filter { offsetRange =>
    // Don't display empty ranges.
    offsetRange.fromOffset != offsetRange.untilOffset
  }.map { offsetRange =>
    s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
      s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
  }.mkString("\n")
  // Copy offsetRanges to immutable.List to prevent from being modified by the user
  val metadata = Map(
    "offsets" -> offsetRanges.toList,
    StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
  val inputInfo = StreamInputInfo(id, rdd.count, metadata)
  ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

  currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
  Some(rdd)
}

其中第一行代码

val untilOffsets = clamp(latestLeaderOffsets(maxRetries)) 

用于计算每个rdd的截止offset,具体就是先获取最新的offset 和设置的参数 和当前的offset,然后得出

可以看出offset是维护在driver的内存的
然后形成rdd,封装成job以后,回到上面的代码 发现后面是调用:

jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))

接着点进去:


def submitJobSet(jobSet: JobSet) {
  if (jobSet.jobs.isEmpty) {
    logInfo("No jobs added for time " + jobSet.time)
  } else {
    listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
    jobSets.put(jobSet.time, jobSet)
    jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
    logInfo("Added jobs for time " + jobSet.time)
  }
}
发现是把job封装成JobHandler交给线程池运行,点击JobHandler的run方法:
def run() {
  try {
    val formattedTime = UIUtils.formatBatchTime(
      job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
    val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"
    val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"

    ssc.sc.setJobDescription(
      s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""")
    ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
    ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)

    // We need to assign `eventLoop` to a temp variable. Otherwise, because
    // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then
    // it's possible that when `post` is called, `eventLoop` happens to null.
    var _eventLoop = eventLoop
    if (_eventLoop != null) {
      _eventLoop.post(JobStarted(job, clock.getTimeMillis()))
      // Disable checks for existing output directories in jobs launched by the streaming
      // scheduler, since we may need to write output to an existing directory during checkpoint
      // recovery; see SPARK-4835 for more details.
      PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
        job.run()
      }
      _eventLoop = eventLoop
      if (_eventLoop != null) {
        _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
      }
    } else {
      // JobScheduler has been stopped.
    }
  } finally {
    ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)
    ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)
  }
}

发现是给JobScheduler维护的队列发送启动事件,,然后执行job.run(),最后再给队列发送一个job结束事件

那么job.run()方法里面执行的就是job封装的函数,回到前面查看封装的函数怎么定义:
private[streaming] def generateJob(time: Time): Option[Job] = {
  getOrCompute(time) match {
    case Some(rdd) => {
      val jobFunc = () => {
        val emptyFunc = { (iterator: Iterator[T]) => {} }
        context.sparkContext.runJob(rdd, emptyFunc)
      }
      Some(new Job(time, jobFunc))
    }
    case None => None
  }
}

其实就是这一时间段的rdd加上相应的执行函数。

发表评论

您必须 登录 才能发表留言!