Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
zhztheplayer committed Jun 14, 2024
1 parent 1f36523 commit c2a3380
Showing 1 changed file with 23 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,9 @@ package org.apache.spark.sql

import org.apache.spark.{SparkContext, Success, TaskKilled}
import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.scheduler.{
SparkListener,
SparkListenerExecutorMetricsUpdate,
SparkListenerTaskEnd,
SparkListenerTaskStart
}
import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorMetricsUpdate, SparkListenerTaskEnd, SparkListenerTaskStart}
import org.apache.spark.sql.KillTaskListener.INIT_WAIT_TIME_MS
import org.apache.spark.sql.catalyst.QueryPlanningTracker

import com.google.common.base.Preconditions
import org.apache.commons.lang3.RandomUtils
Expand All @@ -50,7 +46,8 @@ object SparkQueryRunner {
"ProcessTreePythonVMemory",
"ProcessTreePythonRSSMemory",
"ProcessTreeOtherVMemory",
"ProcessTreeOtherRSSMemory")
"ProcessTreeOtherRSSMemory"
)

def runQuery(
spark: SparkSession,
Expand Down Expand Up @@ -82,25 +79,33 @@ object SparkQueryRunner {

println(s"Executing SQL query from resource path $queryPath...")
try {
val tracker = new QueryPlanningTracker
val sql = resourceToString(queryPath)
val prev = System.nanoTime()
val df = spark.sql(sql)
val rows = df.collect()
val rows = QueryPlanningTracker.withTracker(tracker) {
df.collect()
}
if (explain) {
df.explain(extended = true)
}
val planMillis =
df.queryExecution.tracker.phases.values.map(p => p.endTimeMs - p.startTimeMs).sum
val sparkTracker = df.queryExecution.tracker
val sparkRulesMillis =
sparkTracker.rules.map(_._2.totalTimeNs).sum / 1000000L
val otherRulesMillis =
tracker.rules.map(_._2.totalTimeNs).sum / 1000000L
val planMillis = sparkRulesMillis + otherRulesMillis
val totalMillis = (System.nanoTime() - prev) / 1000000L
val collectedMetrics = metrics.map(name => (name, em.getMetricValue(name))).toMap
RunResult(rows, planMillis, totalMillis - planMillis, collectedMetrics)
} finally {
sc.removeSparkListener(metricsListener)
killTaskListener.foreach(l => {
sc.removeSparkListener(l)
println(s"Successful kill rate ${"%.2f%%"
.format(100 * l.successfulKillRate())} during execution of app: ${sc.applicationId}")
})
killTaskListener.foreach(
l => {
sc.removeSparkListener(l)
println(s"Successful kill rate ${"%.2f%%"
.format(100 * l.successfulKillRate())} during execution of app: ${sc.applicationId}")
})
sc.setJobDescription(null)
}
}
Expand Down Expand Up @@ -166,7 +171,8 @@ class KillTaskListener(val sc: SparkContext) extends SparkListener {
val total = Math.min(
stageKillMaxWaitTimeLookup.computeIfAbsent(taskStart.stageId, _ => Long.MaxValue),
stageKillWaitTimeLookup
.computeIfAbsent(taskStart.stageId, _ => INIT_WAIT_TIME_MS))
.computeIfAbsent(taskStart.stageId, _ => INIT_WAIT_TIME_MS)
)
val elapsed = System.currentTimeMillis() - startMs
val remaining = total - elapsed
if (remaining <= 0L) {
Expand All @@ -180,6 +186,7 @@ class KillTaskListener(val sc: SparkContext) extends SparkListener {
}
throw new IllegalStateException()
}

val elapsed = wait()

// We have 50% chance to kill the task. FIXME make it configurable?
Expand Down

0 comments on commit c2a3380

Please sign in to comment.