SparkUI中显示stage skipped的原因【源码分析】

article/2025/9/20 8:11:16

SparkUI中显示stage skipped的原因【源码分析】

    • Spark Job的ResultStage的最后一个Task成功执行之后,DAGScheduler.handleTaskCompletion方法会发送SparkListenerJobEnd事件,源码如下:
    • JobProgressListener.onJobEnd方法负责处理SparkListenerJobEnd事件,代码如下:
    • StageInfo.submissionTime在Stage被分解成TaskSet,并且TaskSet被提交到TaskSetManager之前进行设置,源码如下:
  • 哪种Stage不会分解成TaskSet分解执行呢?
  • 结论:

在spark的首页ui上经常显示任务和Stage被skipped,如以下截图所式:
在这里插入图片描述

本文将阐述什么情况下Stage或者Task会显示为skipped,以及stage和task显示为skipped的时候是否spark application执行会出问题?

Spark Job的ResultStage的最后一个Task成功执行之后,DAGScheduler.handleTaskCompletion方法会发送SparkListenerJobEnd事件,源码如下:

private[scheduler] def handleTaskCompletion(event: CompletionEvent) {  val task = event.task  val stageId = task.stageId  val taskType = Utils.getFormattedClassName(task)  outputCommitCoordinator.taskCompleted(stageId, task.partitionId,  event.taskInfo.attempt, event.reason)  // The success case is dealt with separately below, since we need to compute accumulator  // updates before posting.  if (event.reason != Success) {  val attemptId = stageIdToStage.get(task.stageId).map(_.latestInfo.attemptId).getOrElse(-1)  listenerBus.post(SparkListenerTaskEnd(stageId, attemptId, taskType, event.reason,  event.taskInfo, event.taskMetrics))  }  if (!stageIdToStage.contains(task.stageId)) {  // Skip all the actions if the stage has been cancelled.  return  }  val stage = stageIdToStage(task.stageId)  event.reason match {  case Success =>  listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,  event.reason, event.taskInfo, event.taskMetrics))  stage.pendingTasks -= task  task match {  case rt: ResultTask[_, _] =>  // Cast to ResultStage here because it's part of the ResultTask  // TODO Refactor this out to a function that accepts a ResultStage  val resultStage = stage.asInstanceOf[ResultStage]  resultStage.resultOfJob match {  case Some(job) =>  if (!job.finished(rt.outputId)) {  updateAccumulators(event)  job.finished(rt.outputId) = true  job.numFinished += 1  // If the whole job has finished, remove it  if (job.numFinished == job.numPartitions) {//ResultStage所有任务都执行完毕,发送SparkListenerJobEnd事件  markStageAsFinished(resultStage)  cleanupStateForJobAndIndependentStages(job)  listenerBus.post(  SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))  }  // taskSucceeded runs some user code that might throw an exception. Make sure  // we are resilient against that.  try {  job.listener.taskSucceeded(rt.outputId, event.result)  } catch {  case e: Exception =>  // TODO: Perhaps we want to mark the resultStage as failed?  job.listener.jobFailed(new SparkDriverExecutionException(e))  }  }  case None =>  logInfo("Ignoring result from " + rt + " because its job has finished")  }

JobProgressListener.onJobEnd方法负责处理SparkListenerJobEnd事件,代码如下:

override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = synchronized {  val jobData = activeJobs.remove(jobEnd.jobId).getOrElse {  logWarning(s"Job completed for unknown job ${jobEnd.jobId}")  new JobUIData(jobId = jobEnd.jobId)  }  jobData.completionTime = Option(jobEnd.time).filter(_ >= 0)  jobData.stageIds.foreach(pendingStages.remove)  jobEnd.jobResult match {  case JobSucceeded =>  completedJobs += jobData  trimJobsIfNecessary(completedJobs)  jobData.status = JobExecutionStatus.SUCCEEDED  numCompletedJobs += 1  case JobFailed(exception) =>  failedJobs += jobData  trimJobsIfNecessary(failedJobs)  jobData.status = JobExecutionStatus.FAILED  numFailedJobs += 1  }  for (stageId <- jobData.stageIds) {  stageIdToActiveJobIds.get(stageId).foreach { jobsUsingStage =>  jobsUsingStage.remove(jobEnd.jobId)  if (jobsUsingStage.isEmpty) {  stageIdToActiveJobIds.remove(stageId)  }  stageIdToInfo.get(stageId).foreach { stageInfo =>  if (stageInfo.submissionTime.isEmpty) {//Job的Stage没有提交执行,则这个Stage和它对应的Task会标记为skipped stage和skipped task进行统计  // if this stage is pending, it won't complete, so mark it as "skipped":  skippedStages += stageInfo  trimStagesIfNecessary(skippedStages)  jobData.numSkippedStages += 1  jobData.numSkippedTasks += stageInfo.numTasks  }  }  }  }  }

StageInfo.submissionTime在Stage被分解成TaskSet,并且TaskSet被提交到TaskSetManager之前进行设置,源码如下:

private def submitMissingTasks(stage: Stage, jobId: Int) {  logDebug("submitMissingTasks(" + stage + ")")  // Get our pending tasks and remember them in our pendingTasks entry  stage.pendingTasks.clear()  // First figure out the indexes of partition ids to compute.  //parititionsToCompute是一个List, 表示一个stage需要compute的所有分区的index  val partitionsToCompute: Seq[Int] = {  stage match {  case stage: ShuffleMapStage =>  (0 until stage.numPartitions).filter(id => stage.outputLocs(id).isEmpty)  case stage: ResultStage =>  val job = stage.resultOfJob.get  (0 until job.numPartitions).filter(id => !job.finished(id))  }  }  val properties = jobIdToActiveJob.get(stage.firstJobId).map(_.properties).orNull  runningStages += stage  // SparkListenerStageSubmitted should be posted before testing whether tasks are  // serializable. If tasks are not serializable, a SparkListenerStageCompleted event  // will be posted, which should always come after a corresponding SparkListenerStageSubmitted  // event.  stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size))  outputCommitCoordinator.stageStart(stage.id)  listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))  // TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.  // Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast  // the serialized copy of the RDD and for each task we will deserialize it, which means each  // task gets a different copy of the RDD. This provides stronger isolation between tasks that  // might modify state of objects referenced in their closures. This is necessary in Hadoop  // where the JobConf/Configuration object is not thread-safe.  var taskBinary: Broadcast[Array[Byte]] = null  try {  // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).  // For ResultTask, serialize and broadcast (rdd, func).  val taskBinaryBytes: Array[Byte] = stage match {  case stage: ShuffleMapStage =>  closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array()  case stage: ResultStage =>  closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func): AnyRef).array()  }  taskBinary = sc.broadcast(taskBinaryBytes)//将任务信息构造成广播变量,广播到每个Executor  } catch {  // In the case of a failure during serialization, abort the stage.  case e: NotSerializableException =>  abortStage(stage, "Task not serializable: " + e.toString)  runningStages -= stage  // Abort execution  return  case NonFatal(e) =>  abortStage(stage, s"Task serialization failed: $e\n${e.getStackTraceString}")  runningStages -= stage  return  }  //tasks是一个List,它表示一个stage每个task的描述,描述信息为:task所在stage id、task处理的partition、partition所在的主机地址和Executor id  val tasks: Seq[Task[_]] = try {  stage match {  case stage: ShuffleMapStage =>  partitionsToCompute.map { id =>  /* * 获取task所在的节点,数据所在的节点优先启动任务处理这些数据,在这里用到ShuffleMapStage. * */  val locs = getPreferredLocs(stage.rdd, id)  val part = stage.rdd.partitions(id)  new ShuffleMapTask(stage.id, taskBinary, part, locs)//taskBinary是广播变量  }  case stage: ResultStage =>  val job = stage.resultOfJob.get  partitionsToCompute.map { id =>  val p: Int = job.partitions(id)  val part = stage.rdd.partitions(p)  val locs = getPreferredLocs(stage.rdd, p)  new ResultTask(stage.id, taskBinary, part, locs, id)  }  }  } catch {  case NonFatal(e) =>  abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}")  runningStages -= stage  return  }  if (tasks.size > 0) {  logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")  stage.pendingTasks ++= tasks  logDebug("New pending tasks: " + stage.pendingTasks)  taskScheduler.submitTasks(  new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.firstJobId, properties))  stage.latestInfo.submissionTime = Some(clock.getTimeMillis())//设置StageInfo的submissionTime成员,表示这个TaskSet会被执行,不会被skipped  } else

Job的Stage没有分解成TaskSet提交执行,则这个Stage和它对应的Task会标记为skipped stage和skipped task进行统计显示。

哪种Stage不会分解成TaskSet分解执行呢?

Spark在提交Job的时候,会发送JobSubmitted事件,DAGScheduler.doOnReceive接收到JobSubmitted事件之后,会调用DAGScheduler.handleJobSubmitted方法处理任务提交。

DAGScheduler.handleJobSubmitted首先调用DAGScheduler.newResultStage方法创建最后一个Stage,DAGScheduler.newResultStage通过以下一系列函数调用最终会调用到DAGScheduler.registerShuffleDependencies,这个方法将这个RDD所有的祖宗Stage加入到DAGScheduler.jobIdToStageIds这个HashMap中。然后获取这个Job的每个Stage对应的StageInfo,转换成一个Seq,发送SparkListenerJobStart事件。
DAGScheduler.newResultStage->
DAGScheduler.getParentStagesAndId->
DAGScheduler.getParentStagesAndId->getParentStages
DAGScheduler.getParentStagesAndId->getShuffleMapStage
DAGScheduler.registerShuffleDependencies

DAGScheduler.registerShuffleDependencies首先调用DAGScheduler.getAncestorShuffleDependencies找到当前rdd所有祖宗的rdd依赖,包括父辈、爷爷辈,以致更高辈分的rdd依赖,然后调用DAGScheduler.newOrUsedShuffleStage创建每个祖宗rdd依赖对应的ShuffleMapStage,

private def registerShuffleDependencies(shuffleDep: ShuffleDependency[_, _, _], firstJobId: Int) {  val parentsWithNoMapStage = getAncestorShuffleDependencies(shuffleDep.rdd)//获取所有祖宗rdd依赖,包括父辈、爷爷辈等  while (parentsWithNoMapStage.nonEmpty) {  val currentShufDep = parentsWithNoMapStage.pop()  //根据ShuffleDependency和jobid生成Stage,由于是从栈里面弹出,所以最先添加的是Root stage,依次类推,最先添加的Stage shuffleId越小  val stage = newOrUsedShuffleStage(currentShufDep, firstJobId)  shuffleToMapStage(currentShufDep.shuffleId) = stage  }  }

DAGScheduler.newOrUsedShuffleStage会调用DAGScheduler.newShuffleMapStage创建stage。
DAGScheduler.newShuffleMapStage方法创建了stage之后,调用DAGScheduler.updateJobIdStageIdMaps方法将新创建的
stage.id加入到DAGScheduler.jobIdToStageIds中。源码如下:

private def updateJobIdStageIdMaps(jobId: Int, stage: Stage): Unit = {  def updateJobIdStageIdMapsList(stages: List[Stage]) {  if (stages.nonEmpty) {  val s = stages.head  s.jobIds += jobId  jobIdToStageIds.getOrElseUpdate(jobId, new HashSet[Int]()) += s.id//将stage id加入到jobIdToStageIds中  val parents: List[Stage] = getParentStages(s.rdd, jobId)  val parentsWithoutThisJobId = parents.filter { ! _.jobIds.contains(jobId) }  updateJobIdStageIdMapsList(parentsWithoutThisJobId ++ stages.tail)  }  }  updateJobIdStageIdMapsList(List(stage))  }

DAGScheduler.handleJobSubmitted源码如下:

private[scheduler] def handleJobSubmitted(jobId: Int,  finalRDD: RDD[_],  func: (TaskContext, Iterator[_]) => _,  partitions: Array[Int],  allowLocal: Boolean,  callSite: CallSite,  listener: JobListener,  properties: Properties) {  var finalStage: ResultStage = null  try {  // New stage creation may throw an exception if, for example, jobs are run on a  // HadoopRDD whose underlying HDFS files have been deleted.  finalStage = newResultStage(finalRDD, partitions.size, jobId, callSite)//创建ResultStage,在这个方法里面会将这个Job执行过程中,需要可能经历的Stage全部放入到  } catch {  case e: Exception =>  logWarning("Creating new stage failed due to exception - job: " + jobId, e)  listener.jobFailed(e)  return  }  if (finalStage != null) {  val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)  clearCacheLocs()  logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(  job.jobId, callSite.shortForm, partitions.length, allowLocal))  logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")  logInfo("Parents of final stage: " + finalStage.parents)  logInfo("Missing parents: " + getMissingParentStages(finalStage))  val shouldRunLocally =  localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1  val jobSubmissionTime = clock.getTimeMillis()  if (shouldRunLocally) {  // Compute very short actions like first() or take() with no parent stages locally.  listenerBus.post(  SparkListenerJobStart(job.jobId, jobSubmissionTime, Seq.empty, properties))  runLocally(job)  } else {  jobIdToActiveJob(jobId) = job  activeJobs += job  finalStage.resultOfJob = Some(job)  val stageIds = jobIdToStageIds(jobId).toArray//获取一个Job对应的所有的Stage id,Job的所有Stage在执行newResultStage的时候会创建,所以在这里能获取成功  val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))//获取每个Stage对应的StageInfo  listenerBus.post(  SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))//发送Job启动事件SparkListenerJobStart  submitStage(finalStage)  }  }  submitWaitingStages()  }

结论:

JobProgressListener.onJobStart 负责接收处理 SparkListenerJobStart 事件。它会把DAGScheduler.handleJobSubmitted 方法创建的所有StageInfo信息放到 JobProgressListener.stageIdToInfo 这个HashMap中。

至此可以得出结论:JobProgressListener.onJobEnd方法中,处理的obProgressListener.stageIdToInfo信息是执行
DAGScheduler.handleJobSubmitted产生的。在Job对应的所有Stage分解成Task之前就已经产生了

文章可以知道,在将Stage分解成TaskSet的时候,如果一个RDD已经Cache到了BlockManager,则这个RDD对应的所有祖宗Stage都不会分解成TaskSet进行执行,所以这些祖宗Stage对应的StageInfo.submissionTime.isEmpty就会返回true,所以这些祖宗Stage和它们对应的Task就会在Spark ui上显示为skipped
Stage执行完成之后,会执行JobProgressListener.onStageCompleted将Stage信息保存到JobProgressListener.stageIdToInfo,源码如下:

override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = synchronized {  val stage = stageCompleted.stageInfo  stageIdToInfo(stage.stageId) = stage//保存Stage的信息,便于跟踪显示  val stageData = stageIdToData.getOrElseUpdate((stage.stageId, stage.attemptId), {  logWarning("Stage completed for unknown stage " + stage.stageId)  new StageUIData  })

Stage对应的TaskSet中所有任务成功执行后,会将Stage对应的StageInfo反馈到JobProgressListener.stageIdToInfo,这样这些任务就不会显示为skipped了

任务出现skipped是正常的,之所以出现skipped是因为要计算的数据已经缓存到了内存,没有必要再重复计算。出现skipped对结果没有影响


http://chatgpt.dhexx.cn/article/emRf8BhA.shtml

相关文章

unprintable character xxxx skipped解决方案

在keil软件中输入源代码的时候 常常会出现上述错误 双击蓝色那一条文字 -----到34行处 可以发现注释中间的分号是在中文状态下 而在这里面我们只能用英文状态下的分号 所以将分号改为英文状态下的分号即可 若实在不行 就直接删除注释~~~

spark web ui中的skipped的含义

顾名思义&#xff0c;跳出的意思啦。 例如如图&#xff1a; skipped的stages代表是已经执行过了。所以不需要再执行了。 如何&#xff0c;你有一个 testRdd。然后先做 testRdd.Filter("xxx").map("xx")&#xff0c; 这个是transform 然后再分别做了count和…

svn的skipped,no versioned parent报错解决方法

http://blog.csdn.net/lanjinghai507/article/details/52327636 感谢分享 今天一大早&#xff0c;习惯性的用sublime_text,选择文件夹&#xff0c;然后删除该文件&#xff0c;再然后跟新&#xff0c;接着就出现skipped,no versioned parent的报错&#xff0c;如下图 skipped…

ntohs和htons的区别

虽然注册CSDN已经有好几年了&#xff0c;学习写程序也有两年的时间了。对于绝大数人来说&#xff0c;我还是得菜鸟。平时遇到什么问题也会来到这个平台搜索别人的解答。也从这个平台上获益很多。 今天在公司因为一个问题和同事就关于htons与ntohs有没有区别探讨了一下&#xff…

socket编程中的 htons()

文章目录 1.内存存储数据的方式1.1 数据字节序号1.2 小端字节序1.3 大端字节序 2. 转换顺序2.1 网络字节序2.2 主机字节序2.3 转换函数 在刚刚接触 socket 时&#xff0c;遇到了 htons() 函数&#xff0c;就直接懵逼了&#xff0c;这是什么东西&#xff0c;有什么用&#xff1f…

高并发解决方案——Redis(一)

简介 Redis作为重要的缓存数据库在高并发的解决方案中起着重要作用。为了系统的学习Redis&#xff0c;也为了秋招&#xff08;美团比较关注Redis 的掌握&#xff09;&#xff0c;计划编写该系列博客&#xff0c;也是为了整理知识点。 本篇主要介绍了Redis的基础知识与原理。之…

Jmeter超高并发解决方案

背景一亿用户量&#xff0c;平均每人每天10次的业务量&#xff0c;要求并发数在5000以上&#xff0c;峰值在5w到10w之间&#xff0c;QPS在25w以上 一、jmeter解决高并发的优化方案 1.1 优化监听&#xff08;GUI模式&#xff0c;尽量不考虑&#xff09; …

2023春招面试专题:高并发解决方案

如何理解高并发&#xff1f; 高并发意味着大流量&#xff0c;需要运用技术手段抵抗流量的冲击&#xff0c;这些手段好比操作流量&#xff0c;能让流量更平稳地被系统所处理&#xff0c;带给用户更好的体验。 我们常见的高并发场景有&#xff1a;淘宝的双11、春运时的抢票、微…

高并发解决方案之熔断处理

高并发解决方案之熔断处理 前言概念基本介绍三种状态熔断方式常用框架功能对比使用介绍 参考链接 前言 问题列表 跨系统、跨服务调用第三方接口时&#xff0c;第三方接口响应超时或者服务不可用&#xff0c;发生连锁故障进而导致雪崩效应。 举例说明 假设上游服务是A&#xff…

阿里云高并发解决方案

今天这篇帖子我会讲解一下金蝶财务软件K3_CLOUD高并发部署在阿里云上得解决方案,本篇博客同样适用于其他软件高并发上云部署 以我去年12月份的一个客户项目为例。 做过项目的朋友都知道,小客户比较关注的是成本,大客户关注的是价值和服务。高并发客户一般都会有专业的IT运…

高并发解决方案相关面试题

什么是DNS解析域名 DNS域名解析就是讲域名转化为不需要显示端口&#xff08;二级域名的端口一般为80&#xff09;的IP地址&#xff0c;域名解析的一般先去本地环境的host文件读取配置&#xff0c;解析成对应的IP地址&#xff0c;根据IP地址访问对应的服务器。若host文件未配置…

Token高并发解决方案

Token高并发解决方案 一&#xff1a;作为token使用的第三方 客户端模式使用token 可以采用单例模式或定义一个全局变量isRefresh 标志&#xff0c;加同步锁Synchronized来保证token过期的那个时间点&#xff0c;刷新token方法只被调用一次。 二&#xff1a;作为token服务器端…

大数据和高并发解决方案

一、网站应用背景 开发一个网站的应用程序&#xff0c;当用户规模比较小的时候&#xff0c;使用简单的&#xff1a;一台应用服务器一台数据库服务器一台文件服务器&#xff0c;这样的话完全可以解决一部分问题&#xff0c;也可以通过堆硬件的方式来提高网站应用的访问性能&…

电商中常见的高并发解决方案

目录 多级缓存 什么叫多级缓存 多级缓存的实现思路 Redis 缓存同步 MySql 数据 Nginx 限流 什么是限流 常见的限流算法之漏桶算法 nginx 限流的方式 控制速率 控制并发量&#xff08;连接数&#xff09; 在本文中&#xff0c;我们将以京东为例&#xff0c;了解电商中…

【数据科学】斯皮尔曼的等级相关系数(Spearman's coefficient)

在统计数据中&#xff0c;斯皮尔曼的等级相关系数或斯皮尔曼的rho&#xff0c;以查尔斯斯皮尔曼命名并经常用希腊字母表示或&#xff0c;是秩相关的非参数度量&#xff08;两个变量的排名之间的统计依赖性&#xff09;。它评估了使用单调函数描述两个变量之间关系的程度。 两个…

python 利用Scipy计算person 和spearman相关系数

python 利用Scipy计算person 和spearman相关系数 觉得有用的话,欢迎一起讨论相互学习~ 学习以下两位大佬的讲解 (Pearson)皮尔逊相关系数和spearman相关系数&#xff08;附python实现&#xff09; 相关性系数及其python实现 皮尔逊相关系数 下面是皮尔逊相关系数的计算公式…

Python+pandas计算数据相关系数(person、Kendall、spearman)

pandas中DataFrame对象corr()方法的用法&#xff0c;该方法用来计算DataFrame对象中所有列之间的相关系数&#xff08;包括pearson相关系数、Kendall Tau相关系数和spearman秩相关&#xff09;。 pandas相关系数-DataFrame.corr()参数详解 DataFrame.corr(methodpearson, min_p…

相关性Correlations 皮尔逊相关系数(pearson)和斯皮尔曼等级相关系数(spearman)

相关性Correlations Correlations&#xff0c;相关度量&#xff0c;目前Spark支持两种相关性系数&#xff1a;皮尔逊相关系数&#xff08;pearson&#xff09;和斯皮尔曼等级相关系数&#xff08;spearman&#xff09;。相关系数是用以反映变量之间相关关系密切程度的统计指标。…

spearman学习

特征提取&#xff0c;预测都弄完了&#xff0c;现在要检查一下预测的效果。 spearman秩相关系数是度量两个变量之间的统计相关性的指标&#xff0c;用来评估当用单调函数来描述两个变量之间的关系有多好。在没有重复数据的情况下&#xff0c;如果一个变量是另外一个变量的严格…

R语言中进行Spearman等级相关分析

摘要 使用Spearman等级相关性测试两个等级变量或一个等级变量和一个测量变量之间的关联。 如果您担心非正态性&#xff0c;也可以对两个测量变量使用Spearman等级相关性而不是线性回归/相关性&#xff0c;但这通常不是必须的。 宏伟的军舰鸟&#xff08;军舰鸟magnificens&…