Skip to content

Commit

Permalink
[SPARK-47833][SQL][CORE] Supply caller stackstrace for checkAndGlobPa…
Browse files Browse the repository at this point in the history
…thIfNecessary AnalysisException

### What changes were proposed in this pull request?

SPARK-29089 parallelized `checkAndGlobPathIfNecessary` by leveraging ForkJoinPool, it also introduced a side effect, if something goes wrong, the reported error message loses caller side stack trace.

For example, I meet the following error on a Spark job, I have no idea what happened without the caller stack trace.

```
2024-04-12 14:31:21 CST ApplicationMaster INFO - Final app status: FAILED, exitCode: 15, (reason: User class threw exception: org.apache.spark.sql.AnalysisException: Path does not exist: hdfs://xyz-cluster/user/abc/hive_db/tmp.db/tmp_lskkh_1
	at org.apache.spark.sql.errors.QueryCompilationErrors$.dataPathNotExistError(QueryCompilationErrors.scala:1011)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4(DataSource.scala:785)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$4$adapted(DataSource.scala:782)
	at org.apache.spark.util.ThreadUtils$.$anonfun$parmap$2(ThreadUtils.scala:372)
	at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
	at scala.util.Success.$anonfun$map$1(Try.scala:255)
	at scala.util.Success.map(Try.scala:213)
	at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
	at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
	at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
)
```

### Why are the changes needed?

Improve error message.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New UT is added, and the exception stacktrace differences are

raw stacktrace
```
java.lang.RuntimeException: Error occurred on Thread-9
        at org.apache.spark.util.ThreadUtilsSuite$$anon$3.internalMethod(ThreadUtilsSuite.scala:141)
        at org.apache.spark.util.ThreadUtilsSuite$$anon$3.run(ThreadUtilsSuite.scala:138)
```

enhanced exception stacktrace
```
java.lang.RuntimeException: Error occurred on Thread-9
        at org.apache.spark.util.ThreadUtilsSuite$$anon$3.internalMethod(ThreadUtilsSuite.scala:141)
        at org.apache.spark.util.ThreadUtilsSuite$$anon$3.run(ThreadUtilsSuite.scala:138)
        at ... run in separate thread: Thread-9 ... ()
        at org.apache.spark.util.ThreadUtilsSuite.$anonfun$new$16(ThreadUtilsSuite.scala:151)
        at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
        at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
        at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
        at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
        at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)
        at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)
        at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
        at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
        (... other scalatest callsites)
```

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#46028 from pan3793/SPARK-47833.

Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
  • Loading branch information
pan3793 authored and yaooqinn committed Apr 19, 2024
1 parent 06b12fc commit 2bf4346
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 25 deletions.
62 changes: 41 additions & 21 deletions core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ private[spark] object ThreadUtils {
/**
* Run a piece of code in a new thread and return the result. Exception in the new thread is
* thrown in the caller thread with an adjusted stack trace that removes references to this
* method for clarity. The exception stack traces will be like the following
* method for clarity. The exception stack traces will be like the following:
*
* SomeException: exception-message
* at CallerClass.body-method (sourcefile.scala)
Expand Down Expand Up @@ -261,31 +261,51 @@ private[spark] object ThreadUtils {

exception match {
case Some(realException) =>
// Remove the part of the stack that shows method calls into this helper method
// This means drop everything from the top until the stack element
// ThreadUtils.runInNewThread(), and then drop that as well (hence the `drop(1)`).
val baseStackTrace = Thread.currentThread().getStackTrace().dropWhile(
! _.getClassName.contains(this.getClass.getSimpleName)).drop(1)

// Remove the part of the new thread stack that shows methods call from this helper method
val extraStackTrace = realException.getStackTrace.takeWhile(
! _.getClassName.contains(this.getClass.getSimpleName))

// Combine the two stack traces, with a place holder just specifying that there
// was a helper method used, without any further details of the helper
val placeHolderStackElem = new StackTraceElement(
s"... run in separate thread using ${ThreadUtils.getClass.getName.stripSuffix("$")} ..",
" ", "", -1)
val finalStackTrace = extraStackTrace ++ Seq(placeHolderStackElem) ++ baseStackTrace

// Update the stack trace and rethrow the exception in the caller thread
realException.setStackTrace(finalStackTrace)
throw realException
throw wrapCallerStacktrace(realException, dropStacks = 2)
case None =>
result
}
}

/**
* Adjust exception stack stace to wrap with caller side thread stack trace.
* The exception stack traces will be like the following:
*
* SomeException: exception-message
* at CallerClass.body-method (sourcefile.scala)
* at ... run in separate thread using org.apache.spark.util.ThreadUtils ... ()
* at CallerClass.caller-method (sourcefile.scala)
* ...
*/
def wrapCallerStacktrace[T <: Throwable](
realException: T,
combineMessage: String =
s"run in separate thread using ${ThreadUtils.getClass.getName.stripSuffix("$")}",
dropStacks: Int = 1): T = {
require(dropStacks >= 0, "dropStacks must be zero or positive")
val simpleName = this.getClass.getSimpleName
// Remove the part of the stack that shows method calls into this helper method
// This means drop everything from the top until the stack element
// ThreadUtils.wrapCallerStack(), and then drop that as well (hence the `drop(1)`).
// Large dropStacks allows caller to drop more stacks.
val baseStackTrace = Thread.currentThread().getStackTrace
.dropWhile(!_.getClassName.contains(simpleName))
.drop(dropStacks)

// Remove the part of the new thread stack that shows methods call from this helper method
val extraStackTrace = realException.getStackTrace
.takeWhile(!_.getClassName.contains(simpleName))

// Combine the two stack traces, with a place holder just specifying that there
// was a helper method used, without any further details of the helper
val placeHolderStackElem = new StackTraceElement(s"... $combineMessage ..", " ", "", -1)
val finalStackTrace = extraStackTrace ++ Seq(placeHolderStackElem) ++ baseStackTrace

// Update the stack trace and rethrow the exception in the caller thread
realException.setStackTrace(finalStackTrace)
realException
}

/**
* Construct a new ForkJoinPool with a specified max parallelism and name prefix.
*/
Expand Down
39 changes: 37 additions & 2 deletions core/src/test/scala/org/apache/spark/util/ThreadUtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,46 @@ class ThreadUtilsSuite extends SparkFunSuite {
runInNewThread("thread-name") { throw new IllegalArgumentException(uniqueExceptionMessage) }
}
assert(exception.getMessage === uniqueExceptionMessage)
assert(exception.getStackTrace.mkString("\n").contains(
val stacktrace = exception.getStackTrace.mkString("\n")
assert(stacktrace.contains(
"... run in separate thread using org.apache.spark.util.ThreadUtils ..."),
"stack trace does not contain expected place holder"
)
assert(exception.getStackTrace.mkString("\n").contains("ThreadUtils.scala") === false,
assert(!stacktrace.contains("ThreadUtils.scala"),
"stack trace contains unexpected references to ThreadUtils"
)
}

test("SPARK-47833: wrapCallerStacktrace") {
var runnerThreadName: String = null
var exception: Throwable = null
val t = new Thread() {
override def run(): Unit = {
runnerThreadName = Thread.currentThread().getName
internalMethod()
}
private def internalMethod(): Unit = {
throw new RuntimeException(s"Error occurred on $runnerThreadName")
}
}
t.setDaemon(true)
t.setUncaughtExceptionHandler { case (_, e) => exception = e }
t.start()
t.join()

ThreadUtils.wrapCallerStacktrace(exception, s"run in separate thread: $runnerThreadName")

val stacktrace = exception.getStackTrace.mkString("\n")
assert(stacktrace.contains("internalMethod"),
"stack trace does not contain real exception stack trace"
)
assert(stacktrace.contains(s"... run in separate thread: $runnerThreadName ..."),
"stack trace does not contain expected place holder"
)
assert(stacktrace.contains("org.scalatest.Suite.run"),
"stack trace does not contain caller stack trace"
)
assert(!stacktrace.contains("ThreadUtils.scala"),
"stack trace contains unexpected references to ThreadUtils"
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,7 @@ object DataSource extends Logging {
globResult
}.flatten
} catch {
case e: SparkException => throw e.getCause
case e: SparkException => throw ThreadUtils.wrapCallerStacktrace(e.getCause)
}

if (checkFilesExist) {
Expand All @@ -798,7 +798,7 @@ object DataSource extends Logging {
}
}
} catch {
case e: SparkException => throw e.getCause
case e: SparkException => throw ThreadUtils.wrapCallerStacktrace(e.getCause)
}
}

Expand Down

0 comments on commit 2bf4346

Please sign in to comment.