From e707e354b1ac1a1891e93c8727531ad4b0b395b4 Mon Sep 17 00:00:00 2001 From: xmy Date: Tue, 12 Dec 2023 16:08:42 +0800 Subject: [PATCH] [GLUTEN-4012][CH] Add metrics for GenerateExecTransformer --- .../backendsapi/clickhouse/CHMetricsApi.scala | 14 ++++- .../metrics/GenerateMetricsUpdater.scala | 51 +++++++++++++++++++ .../glutenproject/metrics/MetricsUtil.scala | 4 +- .../GlutenClickHouseTPCHMetricsSuite.scala | 20 +++++++- .../execution/GenerateExecTransformer.scala | 4 +- 5 files changed, 86 insertions(+), 7 deletions(-) create mode 100644 backends-clickhouse/src/main/scala/io/glutenproject/metrics/GenerateMetricsUpdater.scala diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala index f42c44aa0c403..1b9f17ead8c4e 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala @@ -348,8 +348,18 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil { metrics: Map[String, SQLMetric]): MetricsUpdater = new HashJoinMetricsUpdater(metrics) override def genGenerateTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = - Map.empty + Map( + "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"), + "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"), + "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), + "inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of input bytes"), + "extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra operators time"), + "inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of waiting for data"), + "outputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of waiting for output"), + "totalTime" -> SQLMetrics.createTimingMetric(sparkContext, "total time") + ) override def genGenerateTransformerMetricsUpdater( - metrics: Map[String, SQLMetric]): MetricsUpdater = NoopMetricsUpdater + metrics: Map[String, SQLMetric]): MetricsUpdater = new GenerateMetricsUpdater(metrics) } diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/GenerateMetricsUpdater.scala b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/GenerateMetricsUpdater.scala new file mode 100644 index 0000000000000..166ba5d7ee83d --- /dev/null +++ b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/GenerateMetricsUpdater.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.glutenproject.metrics + +import org.apache.spark.sql.execution.metric.SQLMetric + +class GenerateMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsUpdater { + + override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = { + if (opMetrics != null) { + val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics] + if (!operatorMetrics.metricsList.isEmpty) { + val metricsData = operatorMetrics.metricsList.get(0) + metrics("totalTime") += (metricsData.time / 1000L).toLong + metrics("inputWaitTime") += (metricsData.inputWaitTime / 1000L).toLong + metrics("outputWaitTime") += (metricsData.outputWaitTime / 1000L).toLong + metrics("outputVectors") += metricsData.outputVectors + + MetricsUtil.updateExtraTimeMetric( + metricsData, + metrics("extraTime"), + metrics("outputRows"), + metrics("outputBytes"), + metrics("inputRows"), + metrics("inputBytes"), + GenerateMetricsUpdater.INCLUDING_PROCESSORS, + GenerateMetricsUpdater.CH_PLAN_NODE_NAME + ) + } + } + } +} + +object GenerateMetricsUpdater { + val INCLUDING_PROCESSORS = Array("ArrayJoinTransform") + val CH_PLAN_NODE_NAME = Array("ArrayJoinTransform") +} diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/MetricsUtil.scala b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/MetricsUtil.scala index 425fd126bfd3e..d72f68c3c0b6b 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/MetricsUtil.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/MetricsUtil.scala @@ -155,7 +155,7 @@ object MetricsUtil extends Logging { aggParamsMap.getOrDefault(operatorIdx, null)) mutNode.updater.updateNativeMetrics(operatorMetrics) - var newOperatorIdx: java.lang.Long = operatorIdx - 1 + var newOperatorIdx: JLong = operatorIdx - 1 mutNode.children.foreach { child => @@ -183,7 +183,7 @@ object MetricsUtil extends Logging { }) } - /** Update extral time metric by the processors */ + /** Update extra time metric by the processors */ def updateExtraTimeMetric( metricData: MetricsData, extraTime: SQLMetric, diff --git a/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala b/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala index cf10087e24704..b493531776ef3 100644 --- a/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala +++ b/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala @@ -16,7 +16,7 @@ */ package io.glutenproject.execution.metrics -import io.glutenproject.execution.{BasicScanExecTransformer, ColumnarNativeIterator, FileSourceScanExecTransformer, FilterExecTransformerBase, GlutenClickHouseTPCHAbstractSuite, HashAggregateExecBaseTransformer, ProjectExecTransformer, WholeStageTransformer} +import io.glutenproject.execution.{BasicScanExecTransformer, ColumnarNativeIterator, FileSourceScanExecTransformer, FilterExecTransformerBase, GenerateExecTransformer, GlutenClickHouseTPCHAbstractSuite, HashAggregateExecBaseTransformer, ProjectExecTransformer, WholeStageTransformer} import io.glutenproject.extension.GlutenPlan import io.glutenproject.vectorized.GeneralInIterator @@ -78,6 +78,24 @@ class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite } } + test("test Generate metrics") { + val sql = + """ + |select n_nationkey, a from nation lateral view explode(split(n_comment, ' ')) as a + |order by n_nationkey, a + |""".stripMargin + runQueryAndCompare(sql) { + df => + val plans = df.queryExecution.executedPlan.collect { + case generate: GenerateExecTransformer => generate + } + assert(plans.size == 1) + assert(plans.head.metrics("inputRows").value == 25) + assert(plans.head.metrics("outputRows").value == 266) + assert(plans.head.metrics("outputVectors").value == 1) + } + } + test("Check the metrics values") { withSQLConf(("spark.gluten.sql.columnar.sort", "false")) { runTPCHQuery(1, noFallBack = false) { diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/GenerateExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/GenerateExecTransformer.scala index da9313108bd02..6f83f97576f75 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/GenerateExecTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/GenerateExecTransformer.scala @@ -18,7 +18,7 @@ package io.glutenproject.execution import io.glutenproject.backendsapi.BackendsApiManager import io.glutenproject.exception.GlutenException -import io.glutenproject.expression.{ConverterUtils, ExpressionConverter, ExpressionTransformer} +import io.glutenproject.expression.{ConverterUtils, ExpressionConverter} import io.glutenproject.extension.ValidationResult import io.glutenproject.metrics.MetricsUpdater import io.glutenproject.substrait.`type`.TypeBuilder @@ -69,7 +69,7 @@ case class GenerateExecTransformer( val operatorId = context.nextOperatorId(this.nodeName) val generatorExpr = ExpressionConverter.replaceWithExpressionTransformer(generator, child.output) - val generatorNode = generatorExpr.asInstanceOf[ExpressionTransformer].doTransform(args) + val generatorNode = generatorExpr.doTransform(args) val childOutputNodes = new java.util.ArrayList[ExpressionNode] for (target <- requiredChildOutput) { val found = child.output.zipWithIndex.filter(_._1.name == target.name)