From f31cc821d178db5f4c1be2244bbafb5db5d80c3c Mon Sep 17 00:00:00 2001 From: Xiduo You Date: Tue, 5 Dec 2023 19:28:37 +0800 Subject: [PATCH] [VL] Add sort merge join metrics (#3920) --- .../backendsapi/velox/MetricsApiImpl.scala | 47 ++++--- .../execution/VeloxMetricsSuite.scala | 129 ++++++++++++++++++ .../SortMergeJoinExecTransformer.scala | 6 +- ...Updater.scala => JoinMetricsUpdater.scala} | 93 ++++++++++--- .../glutenproject/metrics/MetricsUtil.scala | 9 ++ .../metrics/SortMergeJoinMetricsUpdater.scala | 26 ---- 6 files changed, 242 insertions(+), 68 deletions(-) create mode 100644 backends-velox/src/test/scala/io/glutenproject/execution/VeloxMetricsSuite.scala rename gluten-data/src/main/scala/io/glutenproject/metrics/{HashJoinMetricsUpdater.scala => JoinMetricsUpdater.scala} (69%) delete mode 100644 gluten-data/src/main/scala/io/glutenproject/metrics/SortMergeJoinMetricsUpdater.scala diff --git a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/MetricsApiImpl.scala b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/MetricsApiImpl.scala index eca07afb311ed..55e08d117f5be 100644 --- a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/MetricsApiImpl.scala +++ b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/MetricsApiImpl.scala @@ -352,12 +352,32 @@ class MetricsApiImpl extends MetricsApi with Logging { sparkContext: SparkContext): Map[String, SQLMetric] = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), - "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of output batches"), - "prepareTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to prepare left list"), - "processTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to process"), - "joinTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to merge join"), - "totaltimeSortmergejoin" -> SQLMetrics - .createTimingMetric(sparkContext, "totaltime sortmergejoin") + "numOutputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"), + "numOutputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"), + "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime of merge join"), + "cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count"), + "peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"), + "numMemoryAllocations" -> SQLMetrics.createMetric( + sparkContext, + "number of memory allocations"), + "streamPreProjectionCpuCount" -> SQLMetrics.createMetric( + sparkContext, + "stream preProject cpu wall time count"), + "streamPreProjectionWallNanos" -> SQLMetrics.createNanoTimingMetric( + sparkContext, + "totaltime of stream preProjection"), + "bufferPreProjectionCpuCount" -> SQLMetrics.createMetric( + sparkContext, + "buffer preProject cpu wall time count"), + "bufferPreProjectionWallNanos" -> SQLMetrics.createNanoTimingMetric( + sparkContext, + "totaltime of buffer preProjection"), + "postProjectionCpuCount" -> SQLMetrics.createMetric( + sparkContext, + "postProject cpu wall time count"), + "postProjectionWallNanos" -> SQLMetrics.createNanoTimingMetric( + sparkContext, + "totaltime of postProjection") ) override def genSortMergeJoinTransformerMetricsUpdater( @@ -476,20 +496,13 @@ class MetricsApiImpl extends MetricsApi with Logging { "postProjectionWallNanos" -> SQLMetrics.createNanoTimingMetric( sparkContext, "totaltime of postProjection"), - "postProjectionOutputRows" -> SQLMetrics.createMetric( - sparkContext, - "number of postProjection output rows"), - "postProjectionOutputVectors" -> SQLMetrics.createMetric( - sparkContext, - "number of postProjection output vectors"), - "finalOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of final output rows"), - "finalOutputVectors" -> SQLMetrics.createMetric( - sparkContext, - "number of final output vectors") + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "numOutputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"), + "numOutputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes") ) override def genHashJoinTransformerMetricsUpdater( - metrics: Map[String, SQLMetric]): MetricsUpdater = new HashJoinMetricsUpdaterImpl(metrics) + metrics: Map[String, SQLMetric]): MetricsUpdater = new HashJoinMetricsUpdater(metrics) override def genGenerateTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = { Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) diff --git a/backends-velox/src/test/scala/io/glutenproject/execution/VeloxMetricsSuite.scala b/backends-velox/src/test/scala/io/glutenproject/execution/VeloxMetricsSuite.scala new file mode 100644 index 0000000000000..0f24e93057f9d --- /dev/null +++ b/backends-velox/src/test/scala/io/glutenproject/execution/VeloxMetricsSuite.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.glutenproject.execution + +import io.glutenproject.GlutenConfig + +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.internal.SQLConf + +class VeloxMetricsSuite extends VeloxWholeStageTransformerSuite with AdaptiveSparkPlanHelper { + override protected val backend: String = "velox" + override protected val resourcePath: String = "/tpch-data-parquet-velox" + override protected val fileFormat: String = "parquet" + + override def beforeAll(): Unit = { + super.beforeAll() + + spark + .range(100) + .selectExpr("id as c1", "id % 3 as c2") + .write + .format("parquet") + .saveAsTable("metrics_t1") + + spark + .range(200) + .selectExpr("id as c1", "id % 3 as c2") + .write + .format("parquet") + .saveAsTable("metrics_t2") + } + + override protected def afterAll(): Unit = { + spark.sql("drop table metrics_t1") + spark.sql("drop table metrics_t2") + + super.afterAll() + } + + test("test sort merge join metrics") { + withSQLConf( + GlutenConfig.COLUMNAR_FPRCE_SHUFFLED_HASH_JOIN_ENABLED.key -> "false", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + // without preproject + runQueryAndCompare( + "SELECT * FROM metrics_t1 join metrics_t2 on metrics_t1.c1 = metrics_t2.c1" + ) { + df => + val smj = find(df.queryExecution.executedPlan) { + case _: SortMergeJoinExecTransformer => true + case _ => false + } + assert(smj.isDefined) + val metrics = smj.get.metrics + assert(metrics("numOutputRows").value == 100) + assert(metrics("numOutputVectors").value > 0) + assert(metrics("numOutputBytes").value > 0) + } + + // with preproject + runQueryAndCompare( + "SELECT * FROM metrics_t1 join metrics_t2 on metrics_t1.c1 + 1 = metrics_t2.c1 + 1" + ) { + df => + val smj = find(df.queryExecution.executedPlan) { + case _: SortMergeJoinExecTransformer => true + case _ => false + } + assert(smj.isDefined) + val metrics = smj.get.metrics + assert(metrics("numOutputRows").value == 100) + assert(metrics("numOutputVectors").value > 0) + assert(metrics("streamPreProjectionCpuCount").value > 0) + assert(metrics("bufferPreProjectionCpuCount").value > 0) + } + } + } + + test("test shuffle hash join metrics") { + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + // without preproject + runQueryAndCompare( + "SELECT * FROM metrics_t1 join metrics_t2 on metrics_t1.c1 = metrics_t2.c1" + ) { + df => + val smj = find(df.queryExecution.executedPlan) { + case _: ShuffledHashJoinExecTransformer => true + case _ => false + } + assert(smj.isDefined) + val metrics = smj.get.metrics + assert(metrics("numOutputRows").value == 100) + assert(metrics("numOutputVectors").value > 0) + assert(metrics("numOutputBytes").value > 0) + } + + // with preproject + runQueryAndCompare( + "SELECT * FROM metrics_t1 join metrics_t2 on metrics_t1.c1 + 1 = metrics_t2.c1 + 1" + ) { + df => + val smj = find(df.queryExecution.executedPlan) { + case _: ShuffledHashJoinExecTransformer => true + case _ => false + } + assert(smj.isDefined) + val metrics = smj.get.metrics + assert(metrics("numOutputRows").value == 100) + assert(metrics("numOutputVectors").value > 0) + assert(metrics("streamPreProjectionCpuCount").value > 0) + assert(metrics("buildPreProjectionCpuCount").value > 0) + } + } + } +} diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/SortMergeJoinExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/SortMergeJoinExecTransformer.scala index 7dcb338a8dadc..ac1ae44bef5b0 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/SortMergeJoinExecTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/SortMergeJoinExecTransformer.scala @@ -49,6 +49,9 @@ case class SortMergeJoinExecTransformer( @transient override lazy val metrics = BackendsApiManager.getMetricsApiInstance.genSortMergeJoinTransformerMetrics(sparkContext) + override def metricsUpdater(): MetricsUpdater = + BackendsApiManager.getMetricsApiInstance.genSortMergeJoinTransformerMetricsUpdater(metrics) + val (bufferedKeys, streamedKeys, bufferedPlan, streamedPlan) = (rightKeys, leftKeys, right, left) @@ -170,9 +173,6 @@ case class SortMergeJoinExecTransformer( getColumnarInputRDDs(streamedPlan) ++ getColumnarInputRDDs(bufferedPlan) } - override def metricsUpdater(): MetricsUpdater = - BackendsApiManager.getMetricsApiInstance.genSortMergeJoinTransformerMetricsUpdater(metrics) - def genJoinParameters(): Any = { val (isSMJ, isNullAwareAntiJoin) = (1, 0) // Start with "JoinParameters:" diff --git a/gluten-data/src/main/scala/io/glutenproject/metrics/HashJoinMetricsUpdater.scala b/gluten-data/src/main/scala/io/glutenproject/metrics/JoinMetricsUpdater.scala similarity index 69% rename from gluten-data/src/main/scala/io/glutenproject/metrics/HashJoinMetricsUpdater.scala rename to gluten-data/src/main/scala/io/glutenproject/metrics/JoinMetricsUpdater.scala index 7194a4b55c5d0..24b142cda99e3 100644 --- a/gluten-data/src/main/scala/io/glutenproject/metrics/HashJoinMetricsUpdater.scala +++ b/gluten-data/src/main/scala/io/glutenproject/metrics/JoinMetricsUpdater.scala @@ -21,15 +21,45 @@ import io.glutenproject.substrait.JoinParams import org.apache.spark.sql.execution.metric.SQLMetric -trait HashJoinMetricsUpdater extends MetricsUpdater { +import java.util + +trait JoinMetricsUpdater extends MetricsUpdater { def updateJoinMetrics( joinMetrics: java.util.ArrayList[OperatorMetrics], singleMetrics: SingleMetric, joinParams: JoinParams): Unit } -class HashJoinMetricsUpdaterImpl(val metrics: Map[String, SQLMetric]) - extends HashJoinMetricsUpdater { +abstract class JoinMetricsUpdaterBase(val metrics: Map[String, SQLMetric]) + extends JoinMetricsUpdater { + val postProjectionCpuCount: SQLMetric = metrics("postProjectionCpuCount") + val postProjectionWallNanos: SQLMetric = metrics("postProjectionWallNanos") + val numOutputRows: SQLMetric = metrics("numOutputRows") + val numOutputVectors: SQLMetric = metrics("numOutputVectors") + val numOutputBytes: SQLMetric = metrics("numOutputBytes") + + final override def updateJoinMetrics( + joinMetrics: util.ArrayList[OperatorMetrics], + singleMetrics: SingleMetric, + joinParams: JoinParams): Unit = { + assert(joinParams.postProjectionNeeded) + val postProjectMetrics = joinMetrics.remove(0) + postProjectionCpuCount += postProjectMetrics.cpuCount + postProjectionWallNanos += postProjectMetrics.wallNanos + numOutputRows += postProjectMetrics.outputRows + numOutputVectors += postProjectMetrics.outputVectors + numOutputBytes += postProjectMetrics.outputBytes + + updateJoinMetricsInternal(joinMetrics, joinParams) + } + + protected def updateJoinMetricsInternal( + joinMetrics: util.ArrayList[OperatorMetrics], + joinParams: JoinParams): Unit +} + +class HashJoinMetricsUpdater(override val metrics: Map[String, SQLMetric]) + extends JoinMetricsUpdaterBase(metrics) { val hashBuildInputRows: SQLMetric = metrics("hashBuildInputRows") val hashBuildOutputRows: SQLMetric = metrics("hashBuildOutputRows") val hashBuildOutputVectors: SQLMetric = metrics("hashBuildOutputVectors") @@ -71,28 +101,10 @@ class HashJoinMetricsUpdaterImpl(val metrics: Map[String, SQLMetric]) val buildPreProjectionCpuCount: SQLMetric = metrics("buildPreProjectionCpuCount") val buildPreProjectionWallNanos: SQLMetric = metrics("buildPreProjectionWallNanos") - val postProjectionCpuCount: SQLMetric = metrics("postProjectionCpuCount") - val postProjectionWallNanos: SQLMetric = metrics("postProjectionWallNanos") - val postProjectionOutputRows: SQLMetric = metrics("postProjectionOutputRows") - val postProjectionOutputVectors: SQLMetric = metrics("postProjectionOutputVectors") - - val finalOutputRows: SQLMetric = metrics("finalOutputRows") - val finalOutputVectors: SQLMetric = metrics("finalOutputVectors") - - override def updateJoinMetrics( + override protected def updateJoinMetricsInternal( joinMetrics: java.util.ArrayList[OperatorMetrics], - singleMetrics: SingleMetric, joinParams: JoinParams): Unit = { var idx = 0 - if (joinParams.postProjectionNeeded) { - val postProjectMetrics = joinMetrics.get(idx) - postProjectionCpuCount += postProjectMetrics.cpuCount - postProjectionWallNanos += postProjectMetrics.wallNanos - postProjectionOutputRows += postProjectMetrics.outputRows - postProjectionOutputVectors += postProjectMetrics.outputVectors - idx += 1 - } - // HashProbe val hashProbeMetrics = joinMetrics.get(idx) hashProbeInputRows += hashProbeMetrics.inputRows @@ -140,3 +152,40 @@ class HashJoinMetricsUpdaterImpl(val metrics: Map[String, SQLMetric]) } } } + +class SortMergeJoinMetricsUpdater(override val metrics: Map[String, SQLMetric]) + extends JoinMetricsUpdaterBase(metrics) { + val cpuCount: SQLMetric = metrics("cpuCount") + val wallNanos: SQLMetric = metrics("wallNanos") + val peakMemoryBytes: SQLMetric = metrics("peakMemoryBytes") + val numMemoryAllocations: SQLMetric = metrics("numMemoryAllocations") + + val streamPreProjectionCpuCount: SQLMetric = metrics("streamPreProjectionCpuCount") + val streamPreProjectionWallNanos: SQLMetric = metrics("streamPreProjectionWallNanos") + val bufferPreProjectionCpuCount: SQLMetric = metrics("bufferPreProjectionCpuCount") + val bufferPreProjectionWallNanos: SQLMetric = metrics("bufferPreProjectionWallNanos") + + override protected def updateJoinMetricsInternal( + joinMetrics: util.ArrayList[OperatorMetrics], + joinParams: JoinParams): Unit = { + var idx = 0 + val smjMetrics = joinMetrics.get(0) + cpuCount += smjMetrics.cpuCount + wallNanos += smjMetrics.wallNanos + peakMemoryBytes += smjMetrics.peakMemoryBytes + numMemoryAllocations += smjMetrics.numMemoryAllocations + idx += 1 + + if (joinParams.buildPreProjectionNeeded) { + bufferPreProjectionCpuCount += joinMetrics.get(idx).cpuCount + bufferPreProjectionWallNanos += joinMetrics.get(idx).wallNanos + idx += 1 + } + + if (joinParams.streamPreProjectionNeeded) { + streamPreProjectionCpuCount += joinMetrics.get(idx).cpuCount + streamPreProjectionWallNanos += joinMetrics.get(idx).wallNanos + idx += 1 + } + } +} diff --git a/gluten-data/src/main/scala/io/glutenproject/metrics/MetricsUtil.scala b/gluten-data/src/main/scala/io/glutenproject/metrics/MetricsUtil.scala index 170d5239975fd..68ab464769782 100644 --- a/gluten-data/src/main/scala/io/glutenproject/metrics/MetricsUtil.scala +++ b/gluten-data/src/main/scala/io/glutenproject/metrics/MetricsUtil.scala @@ -50,6 +50,10 @@ object MetricsUtil extends Logging { MetricsUpdaterTree( j.metricsUpdater(), Seq(treeifyMetricsUpdaters(j.buildPlan), treeifyMetricsUpdaters(j.streamedPlan))) + case smj: SortMergeJoinExecTransformer => + MetricsUpdaterTree( + smj.metricsUpdater(), + Seq(treeifyMetricsUpdaters(smj.bufferedPlan), treeifyMetricsUpdaters(smj.streamedPlan))) case t: TransformSupport => MetricsUpdaterTree(t.metricsUpdater(), t.children.map(treeifyMetricsUpdaters)) case _ => @@ -196,6 +200,11 @@ object MetricsUtil extends Logging { operatorMetrics, metrics.getSingleMetrics, joinParamsMap.get(operatorIdx)) + case smj: SortMergeJoinMetricsUpdater => + smj.updateJoinMetrics( + operatorMetrics, + metrics.getSingleMetrics, + joinParamsMap.get(operatorIdx)) case hau: HashAggregateMetricsUpdater => hau.updateAggregationMetrics(operatorMetrics, aggParamsMap.get(operatorIdx)) case lu: LimitMetricsUpdater => diff --git a/gluten-data/src/main/scala/io/glutenproject/metrics/SortMergeJoinMetricsUpdater.scala b/gluten-data/src/main/scala/io/glutenproject/metrics/SortMergeJoinMetricsUpdater.scala deleted file mode 100644 index 8fd5edaf5f0f4..0000000000000 --- a/gluten-data/src/main/scala/io/glutenproject/metrics/SortMergeJoinMetricsUpdater.scala +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.glutenproject.metrics - -import org.apache.spark.sql.execution.metric.SQLMetric - -class SortMergeJoinMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsUpdater { - - override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = { - if (opMetrics != null) {} - } -}