From 58907cfc5e3cf7d9d1fb15bd2ec0fbd4769548a0 Mon Sep 17 00:00:00 2001 From: Yang Zhang Date: Tue, 27 Aug 2024 15:46:07 +0800 Subject: [PATCH] [VL] Add write IO metrics for WriteFiles (#7011) --- .../apache/gluten/backendsapi/velox/VeloxMetricsApi.scala | 1 + .../org/apache/gluten/execution/VeloxMetricsSuite.scala | 1 + cpp/core/jni/JniWrapper.cc | 3 ++- cpp/core/utils/metrics.h | 1 + cpp/velox/compute/WholeStageResultIterator.cc | 2 ++ .../src/main/java/org/apache/gluten/metrics/Metrics.java | 5 ++++- .../main/java/org/apache/gluten/metrics/OperatorMetrics.java | 4 +++- .../main/scala/org/apache/gluten/metrics/MetricsUtil.scala | 2 ++ .../org/apache/gluten/metrics/WriteFilesMetricsUpdater.scala | 1 + 9 files changed, 17 insertions(+), 3 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala index 88aa8c90da73..00cba4372891 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala @@ -341,6 +341,7 @@ class VeloxMetricsApi extends MetricsApi with Logging { "physicalWrittenBytes" -> SQLMetrics.createSizeMetric( sparkContext, "number of written bytes"), + "writeIONanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of write IO"), "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of write"), "numWrittenFiles" -> SQLMetrics.createMetric(sparkContext, "number of written files") ) diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala index 468f26259219..0a3e4ebe2cd1 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxMetricsSuite.scala @@ -195,6 +195,7 @@ class VeloxMetricsSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa assert(write.isDefined) val metrics = write.get.metrics assert(metrics("physicalWrittenBytes").value > 0) + assert(metrics("writeIONanos").value > 0) assert(metrics("numWrittenFiles").value == 1) } } diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 1662b200bf8a..53af168508ba 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -175,7 +175,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { metricsBuilderClass = createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/metrics/Metrics;"); metricsBuilderConstructor = getMethodIdOrError( - env, metricsBuilderClass, "", "([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J)V"); + env, metricsBuilderClass, "", "([J[J[J[J[J[J[J[J[J[JJ[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J[J)V"); nativeColumnarToRowInfoClass = createGlobalClassReferenceOrError(env, "Lorg/apache/gluten/vectorized/NativeColumnarToRowInfo;"); @@ -496,6 +496,7 @@ JNIEXPORT jobject JNICALL Java_org_apache_gluten_vectorized_ColumnarBatchOutIter longArray[Metrics::kIoWaitTime], longArray[Metrics::kPreloadSplits], longArray[Metrics::kPhysicalWrittenBytes], + longArray[Metrics::kWriteIOTime], longArray[Metrics::kNumWrittenFiles]); JNI_METHOD_END(nullptr) diff --git a/cpp/core/utils/metrics.h b/cpp/core/utils/metrics.h index bda72b070f24..37f38102ea9a 100644 --- a/cpp/core/utils/metrics.h +++ b/cpp/core/utils/metrics.h @@ -76,6 +76,7 @@ struct Metrics { // Write metrics. kPhysicalWrittenBytes, + kWriteIOTime, kNumWrittenFiles, // The end of enum items. diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index 59181aaea358..7ad8cb27501f 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -46,6 +46,7 @@ const std::string kRemainingFilterTime = "totalRemainingFilterTime"; const std::string kIoWaitTime = "ioWaitNanos"; const std::string kPreloadSplits = "readyPreloadedSplits"; const std::string kNumWrittenFiles = "numWrittenFiles"; +const std::string kWriteIOTime = "writeIOTime"; // others const std::string kHiveDefaultPartition = "__HIVE_DEFAULT_PARTITION__"; @@ -391,6 +392,7 @@ void WholeStageResultIterator::collectMetrics() { metrics_->get(Metrics::kNumWrittenFiles)[metricIndex] = runtimeMetric("sum", entry.second->customStats, kNumWrittenFiles); metrics_->get(Metrics::kPhysicalWrittenBytes)[metricIndex] = second->physicalWrittenBytes; + metrics_->get(Metrics::kWriteIOTime)[metricIndex] = runtimeMetric("sum", second->customStats, kWriteIOTime); metricIndex += 1; } diff --git a/gluten-data/src/main/java/org/apache/gluten/metrics/Metrics.java b/gluten-data/src/main/java/org/apache/gluten/metrics/Metrics.java index c4dcbb65a31f..8910b6be9a8c 100644 --- a/gluten-data/src/main/java/org/apache/gluten/metrics/Metrics.java +++ b/gluten-data/src/main/java/org/apache/gluten/metrics/Metrics.java @@ -50,7 +50,7 @@ public class Metrics implements IMetrics { public long[] preloadSplits; public long[] physicalWrittenBytes; - + public long[] writeIOTime; public long[] numWrittenFiles; public SingleMetric singleMetric = new SingleMetric(); @@ -88,6 +88,7 @@ public Metrics( long[] ioWaitTime, long[] preloadSplits, long[] physicalWrittenBytes, + long[] writeIOTime, long[] numWrittenFiles) { this.inputRows = inputRows; this.inputVectors = inputVectors; @@ -120,6 +121,7 @@ public Metrics( this.ioWaitTime = ioWaitTime; this.preloadSplits = preloadSplits; this.physicalWrittenBytes = physicalWrittenBytes; + this.writeIOTime = writeIOTime; this.numWrittenFiles = numWrittenFiles; } @@ -159,6 +161,7 @@ public OperatorMetrics getOperatorMetrics(int index) { ioWaitTime[index], preloadSplits[index], physicalWrittenBytes[index], + writeIOTime[index], numWrittenFiles[index]); } diff --git a/gluten-data/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java b/gluten-data/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java index cad04987eb57..fd0be2113310 100644 --- a/gluten-data/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java +++ b/gluten-data/src/main/java/org/apache/gluten/metrics/OperatorMetrics.java @@ -48,7 +48,7 @@ public class OperatorMetrics implements IOperatorMetrics { public long preloadSplits; public long physicalWrittenBytes; - + public long writeIOTime; public long numWrittenFiles; /** Create an instance for operator metrics. */ @@ -83,6 +83,7 @@ public OperatorMetrics( long ioWaitTime, long preloadSplits, long physicalWrittenBytes, + long writeIOTime, long numWrittenFiles) { this.inputRows = inputRows; this.inputVectors = inputVectors; @@ -114,6 +115,7 @@ public OperatorMetrics( this.ioWaitTime = ioWaitTime; this.preloadSplits = preloadSplits; this.physicalWrittenBytes = physicalWrittenBytes; + this.writeIOTime = writeIOTime; this.numWrittenFiles = numWrittenFiles; } } diff --git a/gluten-data/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala b/gluten-data/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala index 8eea58272f15..ae706380998f 100644 --- a/gluten-data/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala +++ b/gluten-data/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala @@ -100,6 +100,7 @@ object MetricsUtil extends Logging { val outputBytes = operatorMetrics.get(0).outputBytes val physicalWrittenBytes = operatorMetrics.get(0).physicalWrittenBytes + val writeIOTime = operatorMetrics.get(0).writeIOTime var cpuCount: Long = 0 var wallNanos: Long = 0 @@ -182,6 +183,7 @@ object MetricsUtil extends Logging { ioWaitTime, preloadSplits, physicalWrittenBytes, + writeIOTime, numWrittenFiles ) } diff --git a/gluten-data/src/main/scala/org/apache/gluten/metrics/WriteFilesMetricsUpdater.scala b/gluten-data/src/main/scala/org/apache/gluten/metrics/WriteFilesMetricsUpdater.scala index 30ed4aaa78cf..4a8969189503 100644 --- a/gluten-data/src/main/scala/org/apache/gluten/metrics/WriteFilesMetricsUpdater.scala +++ b/gluten-data/src/main/scala/org/apache/gluten/metrics/WriteFilesMetricsUpdater.scala @@ -24,6 +24,7 @@ class WriteFilesMetricsUpdater(val metrics: Map[String, SQLMetric]) extends Metr if (opMetrics != null) { val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics] metrics("physicalWrittenBytes") += operatorMetrics.physicalWrittenBytes + metrics("writeIONanos") += operatorMetrics.writeIOTime metrics("wallNanos") += operatorMetrics.wallNanos metrics("numWrittenFiles") += operatorMetrics.numWrittenFiles }