diff --git a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/FileDeltaColumnarWrite.scala b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/FileDeltaColumnarWrite.scala index bf6b0c0074dc8..df7ef7e23409e 100644 --- a/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/FileDeltaColumnarWrite.scala +++ b/backends-clickhouse/src/main/delta-32/org/apache/spark/sql/execution/FileDeltaColumnarWrite.scala @@ -137,7 +137,8 @@ case class FileDeltaColumnarWrite( // stats.map(row => x.apply(row).getString(0)).foreach(println) // process stats val commitInfo = DeltaFileCommitInfo(committer) - val basicNativeStat = NativeBasicWriteTaskStatsTracker(description, basicWriteJobStatsTracker) + val basicNativeStat = + NativeBasicWriteTaskStatsTracker(description.path, basicWriteJobStatsTracker) val basicNativeStats = Seq(commitInfo, basicNativeStat) NativeStatCompute(stats)(basicNativeStats, nativeDeltaStats) diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWrite.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWrite.scala index 1342e250430ee..427db0aad2b53 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWrite.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/CHColumnarWrite.scala @@ -198,12 +198,12 @@ case class NativeStatCompute(rows: Seq[InternalRow]) { } case class NativeBasicWriteTaskStatsTracker( - description: WriteJobDescription, + writeDir: String, basicWriteJobStatsTracker: WriteTaskStatsTracker) extends (NativeFileWriteResult => Unit) { private var numWrittenRows: Long = 0 override def apply(stat: NativeFileWriteResult): Unit = { - val absolutePath = s"${description.path}/${stat.relativePath}" + val absolutePath = s"$writeDir/${stat.relativePath}" if (stat.partition_id != "__NO_PARTITION_ID__") { basicWriteJobStatsTracker.newPartition(new GenericInternalRow(Array[Any](stat.partition_id))) } @@ -248,6 +248,8 @@ case class HadoopMapReduceCommitProtocolWrite( extends CHColumnarWrite[HadoopMapReduceCommitProtocol] with Logging { + private var stageDir: String = _ + private lazy val adapter: HadoopMapReduceAdapter = HadoopMapReduceAdapter(committer) /** @@ -257,11 +259,12 @@ case class HadoopMapReduceCommitProtocolWrite( override def doSetupNativeTask(): Unit = { val (writePath, writeFilePattern) = adapter.getTaskAttemptTempPathAndFilePattern(taskAttemptContext, description) - logDebug(s"Native staging write path: $writePath and file pattern: $writeFilePattern") + stageDir = writePath + logDebug(s"Native staging write path: $stageDir and file pattern: $writeFilePattern") val settings = Map( - RuntimeSettings.TASK_WRITE_TMP_DIR.key -> writePath, + RuntimeSettings.TASK_WRITE_TMP_DIR.key -> stageDir, RuntimeSettings.TASK_WRITE_FILENAME_PATTERN.key -> writeFilePattern) NativeExpressionEvaluator.updateQueryRuntimeSettings(settings) } @@ -272,7 +275,7 @@ case class HadoopMapReduceCommitProtocolWrite( None } else { val commitInfo = FileCommitInfo(description) - val basicNativeStat = NativeBasicWriteTaskStatsTracker(description, basicWriteJobStatsTracker) + val basicNativeStat = NativeBasicWriteTaskStatsTracker(stageDir, basicWriteJobStatsTracker) val basicNativeStats = Seq(commitInfo, basicNativeStat) NativeStatCompute(stats)(basicNativeStats) val (partitions, addedAbsPathFiles) = commitInfo.result