Spark Streaming Receiver启动过程分析

By | 2019年2月13日

—————☼—————☼—————☼—————☼—————☼—————
Spark Streaming概述
Spark Streaming 初始化过程
Spark Streaming Receiver启动过程分析
Spark Streaming 数据准备阶段分析(Receiver方式)
Spark Streaming 数据计算阶段分析
SparkStreaming Backpressure分析
Spark Streaming Executor DynamicAllocation 机制分析

—————☼—————☼—————☼—————☼—————☼—————

Receiver是数据准备阶段的一个主要组件,其负载接入外部数据,其生命周期由ReceiverTracker负责管理。

Receiver的启动

1. Receiver抽取与Executor准备

“Spark Streaming 初始化过程”中提到 JobScheduler在启动时会创建和启动ReceiverTracker.
在ReceiverTracker创建时,其会从DStreamGraph中抽取出ReceiverInputStream,以便在启动Receiver时从中抽取出Receiver,然后一一启动。

  private val receiverInputStreams = ssc.graph.getReceiverInputStreams()

在ReceiverTracker启动时,其主要做如下两件事:

  • 创建ReceiverTrackerEndpoint,用于接收Receiver的信息
  • 启动Receiver.

ReceiverTracker的Start方法如下所示:

  /** Start the endpoint and receiver execution thread. */
  def start(): Unit = synchronized {
    if (isTrackerStarted) {
      throw new SparkException("ReceiverTracker already started")
    }

    if (!receiverInputStreams.isEmpty) {
      endpoint = ssc.env.rpcEnv.setupEndpoint(
        "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
      if (!skipReceiverLaunch) launchReceivers()
      logInfo("ReceiverTracker started")
      trackerState = Started
    }
  }

其中 launchReceivers()方法用于启动Receiver, 其代码如下:

 /**
   * Get the receivers from the ReceiverInputDStreams, distributes them to the
   * worker nodes as a parallel collection, and runs them.
   */
  private def launchReceivers(): Unit = {
    val receivers = receiverInputStreams.map { nis =>
      val rcvr = nis.getReceiver()
      rcvr.setReceiverId(nis.id)
      rcvr
    }

    runDummySparkJob()

    logInfo("Starting " + receivers.length + " receivers")
    // 发送启动指令
    endpoint.send(StartAllReceivers(receivers)) 
  }

此方法的主要操作有:

  • 从ReceiverInputStreams中抽取Receiver, 并将streamId做为Receiver的id.
  • 执行runDummySparkJob,此方法是执行一个简单的SparkJob,目的是为确保应用申请的Executor的最小份额得以满足,最小份额由参数“spark.cores.max” 和 “spark.scheduler.minRegisteredResourcesRatio” 共同决定,默认为申请的所有Executor。当应用已获得的Executor数量小于最小份额时,Job将阻塞并等待Executor注册,直到满足其运行需要的最小限额。
    runDummySparkJob的代码如下:
  /**
   * Run the dummy Spark job to ensure that all slaves have registered. This avoids all the
   * receivers to be scheduled on the same node.
   *
   * TODO Should poll the executor number and wait for executors according to
   * "spark.scheduler.minRegisteredResourcesRatio" and
   * "spark.scheduler.maxRegisteredResourcesWaitingTime" rather than running a dummy job.
   */
  private def runDummySparkJob(): Unit = {
    if (!ssc.sparkContext.isLocal) {
      ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
    }
    assert(getExecutors.nonEmpty)
  }

程序逻辑非常简单,目的是使其在不消耗过多资源的情况下,可以保证在调度Recevier时,已有大量的Executor注册完成,从而使Recevier调度时尽量均匀的调度至不同的Executor 。

  • 向ReceiverTrackerEndpoint发送启动所有executor指令(StartAllReceivers)

在ReceiverTrackerEndpoint收到StartAllReceivers指令后,其将

  • 调度Receiver: 为Receiver设置执行位置信息
  • 启动Receiver

其实现逻辑如下:

case StartAllReceivers(receivers) =>
        val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
        for (receiver <- receivers) {
          val executors = scheduledLocations(receiver.streamId)
          updateReceiverScheduledExecutors(receiver.streamId, executors)
          receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
          startReceiver(receiver, executors)
        }

2. Receiver 调度

Receiver调度工作由ReceiverSchedulingPolicy进行,对Receiver的调度工作主要可以分为如下两个阶段:

  • 全局调度阶段
    此阶段发生在首次调度Receiver时,此阶段会保证receivers尽量均匀的分散在Executors中。调度过程中会为每一个Receiver指定启动的位置信息(location)
  • 局部调度阶段
    此阶段发生在Receiver重启时,仅需启动失败Receiver

全局调度阶段是必然会发生的,因此将以这种情况为例对Receiver调度进行详细说明。其调度过程如下:

  • 获取所有executor的主要地址信息
  • 创建numReceiversOnExecutor用于记录每个Executor分配的Receiver数目
  • 创建scheduledLocations用于记录用户指定偏好位置的Receiver
  • 调度指定preferredLocation信息的Receiver. 遍历Receivers, 为用户指定的preferredLocation的主机中选择启动Receiver数 最少的Executor做为当前Receiver启动位置,并更新记录scheduledLocations 和numReceiversOnExecutor。
  • 调度未指定preferredLocation信息的Receiver.
    将Executor依照分配的Receiver数目从小到大排序,为Receiver分配一个Executor.
  • 若还有剩余Executor, 将这些Executor 加入到拥有最少候选对象的Receiver列表中。

至此, Receiver与与Executor的关联联系建立完毕。
调度的实现代码如下所示:

 /**
   * Try our best to schedule receivers with evenly distributed. However, if the
   * `preferredLocation`s of receivers are not even, we may not be able to schedule them evenly
   * because we have to respect them.
   *
   * Here is the approach to schedule executors:
   * <ol>
   *   <li>First, schedule all the receivers with preferred locations (hosts), evenly among the
   *       executors running on those host.</li>
   *   <li>Then, schedule all other receivers evenly among all the executors such that overall
   *       distribution over all the receivers is even.</li>
   * </ol>
   *
   * This method is called when we start to launch receivers at the first time.
   *
   * @return a map for receivers and their scheduled locations
   */
  def scheduleReceivers(
      receivers: Seq[Receiver[_]],
      executors: Seq[ExecutorCacheTaskLocation]): Map[Int, Seq[TaskLocation]] = {
    if (receivers.isEmpty) {
      return Map.empty
    }

    if (executors.isEmpty) {
      return receivers.map(_.streamId -> Seq.empty).toMap
    }

    val hostToExecutors = executors.groupBy(_.host)
    val scheduledLocations = Array.fill(receivers.length)(new mutable.ArrayBuffer[TaskLocation])
    val numReceiversOnExecutor = mutable.HashMap[ExecutorCacheTaskLocation, Int]()
    // Set the initial value to 0
    executors.foreach(e => numReceiversOnExecutor(e) = 0)

    // Firstly, we need to respect "preferredLocation". So if a receiver has "preferredLocation",
    // we need to make sure the "preferredLocation" is in the candidate scheduled executor list.
    for (i <- 0 until receivers.length) {
      // Note: preferredLocation is host but executors are host_executorId
      receivers(i).preferredLocation.foreach { host =>
        hostToExecutors.get(host) match {
          case Some(executorsOnHost) =>
            // preferredLocation is a known host. Select an executor that has the least receivers in
            // this host
            val leastScheduledExecutor =
              executorsOnHost.minBy(executor => numReceiversOnExecutor(executor))
            scheduledLocations(i) += leastScheduledExecutor
            numReceiversOnExecutor(leastScheduledExecutor) =
              numReceiversOnExecutor(leastScheduledExecutor) + 1
          case None =>
            // preferredLocation is an unknown host.
            // Note: There are two cases:
            // 1. This executor is not up. But it may be up later.
            // 2. This executor is dead, or it's not a host in the cluster.
            // Currently, simply add host to the scheduled executors.

            // Note: host could be `HDFSCacheTaskLocation`, so use `TaskLocation.apply` to handle
            // this case
            scheduledLocations(i) += TaskLocation(host)
        }
      }
    }

    // For those receivers that don't have preferredLocation, make sure we assign at least one
    // executor to them.
    for (scheduledLocationsForOneReceiver <- scheduledLocations.filter(_.isEmpty)) {
      // Select the executor that has the least receivers
      val (leastScheduledExecutor, numReceivers) = numReceiversOnExecutor.minBy(_._2)
      scheduledLocationsForOneReceiver += leastScheduledExecutor
      numReceiversOnExecutor(leastScheduledExecutor) = numReceivers + 1
    }

    // Assign idle executors to receivers that have less executors
    val idleExecutors = numReceiversOnExecutor.filter(_._2 == 0).map(_._1)
    for (executor <- idleExecutors) {
      // Assign an idle executor to the receiver that has least candidate executors.
      val leastScheduledExecutors = scheduledLocations.minBy(_.size)
      leastScheduledExecutors += executor
    }

    receivers.map(_.streamId).zip(scheduledLocations).toMap
  }

此实现,存在一个问题,如果Receiver设置了preferredLocation且preferredLocation所对应的主机存在此应用的Executor的情况下,也不一定保证Receiver调度至此Executor.

3. Receiver 启动

在为Receiver设置完启动位置之后,将调用startReceiver方法启动Receiver, 启动过程如下:

  • 依据preferredLocation将Receiver包装成RDD
 // Create the RDD using the scheduledLocations to run the receiver in a Spark job
      val receiverRDD: RDD[Receiver[_]] =
        if (scheduledLocations.isEmpty) {
          ssc.sc.makeRDD(Seq(receiver), 1)
        } else {
          val preferredLocations = scheduledLocations.map(_.toString).distinct
          ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
        }
      receiverRDD.setName(s"Receiver $receiverId")
  • 以SparkJob的形式提交作业, Receiver作为Task 以线程方式执行
 val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
        receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
      // We will keep restarting the receiver job until ReceiverTracker is stopped
      future.onComplete {
        case Success(_) =>
          if (!shouldStartReceiver) {
            onReceiverJobFinish(receiverId)
          } else {
            logInfo(s"Restarting Receiver $receiverId")
            self.send(RestartReceiver(receiver))
          }
        case Failure(e) =>
          if (!shouldStartReceiver) {
            onReceiverJobFinish(receiverId)
          } else {
            logError("Receiver has been stopped. Try to restart it.", e)
            logInfo(s"Restarting Receiver $receiverId")
            self.send(RestartReceiver(receiver))
          }
      }(ThreadUtils.sameThread)
  • Task执行, 执行的startReceiverFunc方法,该方法会创建并启动ReceiverSupervisorImpl(Job及Task调度过程此处不再详细说明,同批处理)
    // Function to start the receiver on the worker node
      val startReceiverFunc: Iterator[Receiver[_]] => Unit =
        (iterator: Iterator[Receiver[_]]) => {
          if (!iterator.hasNext) {
            throw new SparkException(
              "Could not start receiver as object not found.")
          }
          if (TaskContext.get().attemptNumber() == 0) {
            val receiver = iterator.next()
            assert(iterator.hasNext == false)
            val supervisor = new ReceiverSupervisorImpl(
              receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
            supervisor.start()
            supervisor.awaitTermination()
          } else {
            // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
          }
        }

其中ReceiverSupervisorImpl 提供了处理Receiver接收数据的所有必要的方法。并且它还创建了BlockGenerator,用于对Receiver接收的数据流进行切片操作。
其ReceiverSupervisorImpl的Start方法实现如下:

/** Start the supervisor */
 def start() {
   onStart()
   startReceiver()
 }

其中onStart() 会创建BlockGenerator并启动。
startReceiver()方法,首先会向ReceiverTracker注册Receiver信息,并验证Receiver是否合法。若合法,则调用Receiver的onStart方法进行数据接收,其实现逻辑如下:

 /** Start receiver */
  def startReceiver(): Unit = synchronized {
    try {
      if (onReceiverStart()) {
        logInfo(s"Starting receiver $streamId")
        receiverState = Started
        receiver.onStart()
        logInfo(s"Called receiver $streamId onStart")
      } else {
        // The driver refused us
        stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)
      }
    } catch {
      case NonFatal(t) =>
        stop("Error starting receiver " + streamId, Some(t))
    }
  }

下面以WordCount中的SocketInputDStream中的SocketReceiver为例进行说明,其onStart方法实现如下:

def onStart() {

    logInfo(s"Connecting to $host:$port")
    try {
      socket = new Socket(host, port)
    } catch {
      case e: ConnectException =>
        restart(s"Error connecting to $host:$port", e)
        return
    }
    logInfo(s"Connected to $host:$port")

    // Start the thread that receives data over a connection
    new Thread("Socket Receiver") {
      setDaemon(true)
      override def run() { receive() }
    }.start()
  }

 /** Create a socket connection and receive data until receiver is stopped */
  def receive() {
    try {
      val iterator = bytesToObjects(socket.getInputStream())
      while(!isStopped && iterator.hasNext) {
        store(iterator.next())
      }
      if (!isStopped()) {
        restart("Socket data stream had no more data")
      } else {
        logInfo("Stopped receiving")
      }
    } catch {
      case NonFatal(e) =>
        logWarning("Error receiving data", e)
        restart("Error receiving data", e)
    } finally {
      onStop()
    }
  }
}

通过上述实现可知,其将通过socket方式进行数据接收。
Receiver启动流程至此结束,Receiver启动之后会接收源源不断的数据流并对数据分片,副本分发工作,为计算阶段做准备,接下来将进行数据准备环节的分析。

发表评论