Skip to content

Commit

Permalink
[GLUTEN-4012][CH] Add metrics for GenerateExecTransformer
Browse files Browse the repository at this point in the history
  • Loading branch information
exmy committed Dec 12, 2023
1 parent 9f11335 commit e18a49e
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,18 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil {
metrics: Map[String, SQLMetric]): MetricsUpdater = new HashJoinMetricsUpdater(metrics)

override def genGenerateTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
Map.empty
Map(
"outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
"inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
"inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of input bytes"),
"extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra operators time"),
"inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of waiting for data"),
"outputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of waiting for output"),
"totalTime" -> SQLMetrics.createTimingMetric(sparkContext, "total time")
)

override def genGenerateTransformerMetricsUpdater(
metrics: Map[String, SQLMetric]): MetricsUpdater = NoopMetricsUpdater
metrics: Map[String, SQLMetric]): MetricsUpdater = new GenerateMetricsUpdater(metrics)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.glutenproject.metrics

import org.apache.spark.sql.execution.metric.SQLMetric

class GenerateMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsUpdater {

override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
if (opMetrics != null) {
val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics]
if (!operatorMetrics.metricsList.isEmpty) {
val metricsData = operatorMetrics.metricsList.get(0)
metrics("totalTime") += (metricsData.time / 1000L).toLong
metrics("inputWaitTime") += (metricsData.inputWaitTime / 1000L).toLong
metrics("outputWaitTime") += (metricsData.outputWaitTime / 1000L).toLong
metrics("outputVectors") += metricsData.outputVectors

MetricsUtil.updateExtraTimeMetric(
metricsData,
metrics("extraTime"),
metrics("outputRows"),
metrics("outputBytes"),
metrics("inputRows"),
metrics("inputBytes"),
GenerateMetricsUpdater.INCLUDING_PROCESSORS,
GenerateMetricsUpdater.CH_PLAN_NODE_NAME
)
}
}
}
}

object GenerateMetricsUpdater {
val INCLUDING_PROCESSORS = Array("ArrayJoinTransform")
val CH_PLAN_NODE_NAME = Array("ArrayJoinTransform")
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,22 +155,20 @@ object MetricsUtil extends Logging {
aggParamsMap.getOrDefault(operatorIdx, null))
mutNode.updater.updateNativeMetrics(operatorMetrics)

var newOperatorIdx: java.lang.Long = operatorIdx - 1
var newOperatorIdx: JLong = operatorIdx - 1

mutNode.children.foreach {
child =>
if (child.updater != NoopMetricsUpdater) {
val result = updateTransformerMetricsInternal(
child,
relMap,
newOperatorIdx,
metrics,
curMetricsIdx,
joinParamsMap,
aggParamsMap)
newOperatorIdx = result._1
curMetricsIdx = result._2
}
val result = updateTransformerMetricsInternal(
child,
relMap,
newOperatorIdx,
metrics,
curMetricsIdx,
joinParamsMap,
aggParamsMap)
newOperatorIdx = result._1
curMetricsIdx = result._2
}
(newOperatorIdx, curMetricsIdx)
}
Expand All @@ -183,7 +181,7 @@ object MetricsUtil extends Logging {
})
}

/** Update extral time metric by the processors */
/** Update extra time metric by the processors */
def updateExtraTimeMetric(
metricData: MetricsData,
extraTime: SQLMetric,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package io.glutenproject.execution

import io.glutenproject.backendsapi.BackendsApiManager
import io.glutenproject.exception.GlutenException
import io.glutenproject.expression.{ConverterUtils, ExpressionConverter, ExpressionTransformer}
import io.glutenproject.expression.{ConverterUtils, ExpressionConverter}
import io.glutenproject.extension.ValidationResult
import io.glutenproject.metrics.MetricsUpdater
import io.glutenproject.substrait.`type`.TypeBuilder
Expand Down Expand Up @@ -69,7 +69,7 @@ case class GenerateExecTransformer(
val operatorId = context.nextOperatorId(this.nodeName)
val generatorExpr =
ExpressionConverter.replaceWithExpressionTransformer(generator, child.output)
val generatorNode = generatorExpr.asInstanceOf[ExpressionTransformer].doTransform(args)
val generatorNode = generatorExpr.doTransform(args)
val childOutputNodes = new java.util.ArrayList[ExpressionNode]
for (target <- requiredChildOutput) {
val found = child.output.zipWithIndex.filter(_._1.name == target.name)
Expand Down

0 comments on commit e18a49e

Please sign in to comment.