Skip to content

Commit

Permalink
[GLUTEN-4012][CH] Add metrics for GenerateExecTransformer (#4014)
Browse files Browse the repository at this point in the history
[CH] Add metrics for GenerateExecTransformer
  • Loading branch information
exmy authored Dec 26, 2023
1 parent 6335ad0 commit 57f4df1
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,18 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil {
metrics: Map[String, SQLMetric]): MetricsUpdater = new HashJoinMetricsUpdater(metrics)

override def genGenerateTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
Map.empty
Map(
"outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
"inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
"inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of input bytes"),
"extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra operators time"),
"inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of waiting for data"),
"outputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of waiting for output"),
"totalTime" -> SQLMetrics.createTimingMetric(sparkContext, "total time")
)

override def genGenerateTransformerMetricsUpdater(
metrics: Map[String, SQLMetric]): MetricsUpdater = NoopMetricsUpdater
metrics: Map[String, SQLMetric]): MetricsUpdater = new GenerateMetricsUpdater(metrics)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.glutenproject.metrics

import org.apache.spark.sql.execution.metric.SQLMetric

class GenerateMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsUpdater {

override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
if (opMetrics != null) {
val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
if (!operatorMetrics.metricsList.isEmpty) {
val metricsData = operatorMetrics.metricsList.get(0)
metrics("totalTime") += (metricsData.time / 1000L).toLong
metrics("inputWaitTime") += (metricsData.inputWaitTime / 1000L).toLong
metrics("outputWaitTime") += (metricsData.outputWaitTime / 1000L).toLong
metrics("outputVectors") += metricsData.outputVectors

MetricsUtil.updateExtraTimeMetric(
metricsData,
metrics("extraTime"),
metrics("outputRows"),
metrics("outputBytes"),
metrics("inputRows"),
metrics("inputBytes"),
GenerateMetricsUpdater.INCLUDING_PROCESSORS,
GenerateMetricsUpdater.CH_PLAN_NODE_NAME
)
}
}
}
}

object GenerateMetricsUpdater {
val INCLUDING_PROCESSORS = Array("ArrayJoinTransform")
val CH_PLAN_NODE_NAME = Array("ArrayJoinTransform")
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ object MetricsUtil extends Logging {
aggParamsMap.getOrDefault(operatorIdx, null))
mutNode.updater.updateNativeMetrics(operatorMetrics)

var newOperatorIdx: java.lang.Long = operatorIdx - 1
var newOperatorIdx: JLong = operatorIdx - 1

mutNode.children.foreach {
child =>
Expand Down Expand Up @@ -183,7 +183,7 @@ object MetricsUtil extends Logging {
})
}

/** Update extral time metric by the processors */
/** Update extra time metric by the processors */
def updateExtraTimeMetric(
metricData: MetricsData,
extraTime: SQLMetric,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package io.glutenproject.execution.metrics

import io.glutenproject.execution.{BasicScanExecTransformer, ColumnarNativeIterator, FileSourceScanExecTransformer, FilterExecTransformerBase, GlutenClickHouseTPCHAbstractSuite, HashAggregateExecBaseTransformer, ProjectExecTransformer, WholeStageTransformer}
import io.glutenproject.execution.{BasicScanExecTransformer, ColumnarNativeIterator, FileSourceScanExecTransformer, FilterExecTransformerBase, GenerateExecTransformer, GlutenClickHouseTPCHAbstractSuite, HashAggregateExecBaseTransformer, ProjectExecTransformer, WholeStageTransformer}
import io.glutenproject.extension.GlutenPlan
import io.glutenproject.vectorized.GeneralInIterator

Expand Down Expand Up @@ -78,6 +78,24 @@ class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite
}
}

test("test Generate metrics") {
val sql =
"""
|select n_nationkey, a from nation lateral view explode(split(n_comment, ' ')) as a
|order by n_nationkey, a
|""".stripMargin
runQueryAndCompare(sql) {
df =>
val plans = df.queryExecution.executedPlan.collect {
case generate: GenerateExecTransformer => generate
}
assert(plans.size == 1)
assert(plans.head.metrics("inputRows").value == 25)
assert(plans.head.metrics("outputRows").value == 266)
assert(plans.head.metrics("outputVectors").value == 1)
}
}

test("Check the metrics values") {
withSQLConf(("spark.gluten.sql.columnar.sort", "false")) {
runTPCHQuery(1, noFallBack = false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package io.glutenproject.execution

import io.glutenproject.backendsapi.BackendsApiManager
import io.glutenproject.exception.GlutenException
import io.glutenproject.expression.{ConverterUtils, ExpressionConverter, ExpressionTransformer}
import io.glutenproject.expression.{ConverterUtils, ExpressionConverter}
import io.glutenproject.extension.ValidationResult
import io.glutenproject.metrics.MetricsUpdater
import io.glutenproject.substrait.`type`.TypeBuilder
Expand Down Expand Up @@ -67,7 +67,7 @@ case class GenerateExecTransformer(
val operatorId = context.nextOperatorId(this.nodeName)
val generatorExpr =
ExpressionConverter.replaceWithExpressionTransformer(generator, child.output)
val generatorNode = generatorExpr.asInstanceOf[ExpressionTransformer].doTransform(args)
val generatorNode = generatorExpr.doTransform(args)
val childOutputNodes = new java.util.ArrayList[ExpressionNode]
for (target <- requiredChildOutput) {
val found = child.output.zipWithIndex.filter(_._1.name == target.name)
Expand Down

0 comments on commit 57f4df1

Please sign in to comment.