Skip to content

Commit

Permalink
[Bug Fix] Fix "WARN org.apache.spark.sql.execution.datasources.BasicW…
Browse files Browse the repository at this point in the history
…riteTaskStatsTracker: Expected x files, but only saw 0."
  • Loading branch information
baibaichen committed Dec 15, 2024
1 parent 937caff commit f30028d
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ case class FileDeltaColumnarWrite(
// stats.map(row => x.apply(row).getString(0)).foreach(println)
// process stats
val commitInfo = DeltaFileCommitInfo(committer)
val basicNativeStat = NativeBasicWriteTaskStatsTracker(description, basicWriteJobStatsTracker)
val basicNativeStat =
NativeBasicWriteTaskStatsTracker(description.path, basicWriteJobStatsTracker)
val basicNativeStats = Seq(commitInfo, basicNativeStat)
NativeStatCompute(stats)(basicNativeStats, nativeDeltaStats)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,12 +198,12 @@ case class NativeStatCompute(rows: Seq[InternalRow]) {
}

case class NativeBasicWriteTaskStatsTracker(
description: WriteJobDescription,
writeDir: String,
basicWriteJobStatsTracker: WriteTaskStatsTracker)
extends (NativeFileWriteResult => Unit) {
private var numWrittenRows: Long = 0
override def apply(stat: NativeFileWriteResult): Unit = {
val absolutePath = s"${description.path}/${stat.relativePath}"
val absolutePath = s"$writeDir/${stat.relativePath}"
if (stat.partition_id != "__NO_PARTITION_ID__") {
basicWriteJobStatsTracker.newPartition(new GenericInternalRow(Array[Any](stat.partition_id)))
}
Expand Down Expand Up @@ -248,6 +248,8 @@ case class HadoopMapReduceCommitProtocolWrite(
extends CHColumnarWrite[HadoopMapReduceCommitProtocol]
with Logging {

private var stageDir: String = _

private lazy val adapter: HadoopMapReduceAdapter = HadoopMapReduceAdapter(committer)

/**
Expand All @@ -257,11 +259,12 @@ case class HadoopMapReduceCommitProtocolWrite(
override def doSetupNativeTask(): Unit = {
val (writePath, writeFilePattern) =
adapter.getTaskAttemptTempPathAndFilePattern(taskAttemptContext, description)
logDebug(s"Native staging write path: $writePath and file pattern: $writeFilePattern")
stageDir = writePath
logDebug(s"Native staging write path: $stageDir and file pattern: $writeFilePattern")

val settings =
Map(
RuntimeSettings.TASK_WRITE_TMP_DIR.key -> writePath,
RuntimeSettings.TASK_WRITE_TMP_DIR.key -> stageDir,
RuntimeSettings.TASK_WRITE_FILENAME_PATTERN.key -> writeFilePattern)
NativeExpressionEvaluator.updateQueryRuntimeSettings(settings)
}
Expand All @@ -272,7 +275,7 @@ case class HadoopMapReduceCommitProtocolWrite(
None
} else {
val commitInfo = FileCommitInfo(description)
val basicNativeStat = NativeBasicWriteTaskStatsTracker(description, basicWriteJobStatsTracker)
val basicNativeStat = NativeBasicWriteTaskStatsTracker(stageDir, basicWriteJobStatsTracker)
val basicNativeStats = Seq(commitInfo, basicNativeStat)
NativeStatCompute(stats)(basicNativeStats)
val (partitions, addedAbsPathFiles) = commitInfo.result
Expand Down

0 comments on commit f30028d

Please sign in to comment.