From d97b06b85165a39b279020324b2a39071f8c10eb Mon Sep 17 00:00:00 2001 From: SteNicholas Date: Fri, 2 Aug 2024 19:45:27 +0800 Subject: [PATCH] [GLUTEN-6695][CH] Introduce shuffleWallTime in CHMetricsApi to calculate the overall shuffle write time --- .../apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala | 1 + .../spark/shuffle/CHCelebornColumnarShuffleWriter.scala | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala index 85b298fa48354..0ff53e1c58178 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala @@ -222,6 +222,7 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil { "spillTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time to spill"), "compressTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time to compress"), "prepareTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time to prepare"), + "shuffleWallTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "shuffle wall time"), "avgReadBatchNumRows" -> SQLMetrics .createAverageMetric(sparkContext, "avg read batch num rows"), "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), diff --git a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala index ae22a08908193..c7d7957c15b6b 100644 --- a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala +++ b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala @@ -63,7 +63,9 @@ class CHCelebornColumnarShuffleWriter[K, V]( } else { initShuffleWriter(cb) val col = cb.column(0).asInstanceOf[CHColumnVector] + val startTime = System.nanoTime() jniWrapper.split(nativeShuffleWriter, col.getBlockAddress) + dep.metrics("shuffleWallTime").add(System.nanoTime() - startTime) dep.metrics("numInputRows").add(cb.numRows) dep.metrics("inputBatches").add(1) // This metric is important, AQE use it to decide if EliminateLimit @@ -77,8 +79,10 @@ class CHCelebornColumnarShuffleWriter[K, V]( return } + val startTime = System.nanoTime() splitResult = jniWrapper.stop(nativeShuffleWriter) + dep.metrics("shuffleWallTime").add(System.nanoTime() - startTime) dep.metrics("splitTime").add(splitResult.getSplitTime) dep.metrics("IOTime").add(splitResult.getDiskWriteTime) dep.metrics("serializeTime").add(splitResult.getSerializationTime)