Skip to content

Commit

Permalink
update metric
Browse files Browse the repository at this point in the history
  • Loading branch information
dcoliversun committed Sep 5, 2024
1 parent 314045e commit 3b7e69b
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,7 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil {
override def genFileSourceScanTransformerMetricsUpdater(
metrics: Map[String, SQLMetric]): MetricsUpdater = new FileSourceScanMetricsUpdater(metrics)

override def genFilterTransformerMetrics(
sparkContext: SparkContext,
extraMetric: Map[String, SQLMetric] = Map.empty): Map[String, SQLMetric] =
override def genFilterTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
Expand All @@ -165,9 +163,11 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil {
"inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of waiting for data"),
"outputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of waiting for output"),
"totalTime" -> SQLMetrics.createTimingMetric(sparkContext, "time")
) ++ extraMetric
)

override def genFilterTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater =
override def genFilterTransformerMetricsUpdater(
metrics: Map[String, SQLMetric],
extraMetrics: Seq[(String, SQLMetric)] = Seq.empty): MetricsUpdater =
new FilterMetricsUpdater(metrics)

override def genProjectTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
Expand All @@ -184,7 +184,9 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil {
)

override def genProjectTransformerMetricsUpdater(
metrics: Map[String, SQLMetric]): MetricsUpdater = new ProjectMetricsUpdater(metrics)
metrics: Map[String, SQLMetric],
extraMetrics: Seq[(String, SQLMetric)] = Seq.empty): MetricsUpdater =
new ProjectMetricsUpdater(metrics)

override def genHashAggregateTransformerMetrics(
sparkContext: SparkContext): Map[String, SQLMetric] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,7 @@ class VeloxMetricsApi extends MetricsApi with Logging {
override def genFileSourceScanTransformerMetricsUpdater(
metrics: Map[String, SQLMetric]): MetricsUpdater = new FileSourceScanMetricsUpdater(metrics)

override def genFilterTransformerMetrics(
sparkContext: SparkContext,
extraMetric: Map[String, SQLMetric] = Map.empty): Map[String, SQLMetric] =
override def genFilterTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
Expand All @@ -175,10 +173,12 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"numMemoryAllocations" -> SQLMetrics.createMetric(
sparkContext,
"number of memory allocations")
) ++ extraMetric
)

override def genFilterTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater =
new FilterMetricsUpdater(metrics)
override def genFilterTransformerMetricsUpdater(
metrics: Map[String, SQLMetric],
extraMetrics: Seq[(String, SQLMetric)] = Seq.empty): MetricsUpdater =
new FilterMetricsUpdater(metrics, extraMetrics)

override def genProjectTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
Expand All @@ -194,7 +194,9 @@ class VeloxMetricsApi extends MetricsApi with Logging {
)

override def genProjectTransformerMetricsUpdater(
metrics: Map[String, SQLMetric]): MetricsUpdater = new ProjectMetricsUpdater(metrics)
metrics: Map[String, SQLMetric],
extraMetrics: Seq[(String, SQLMetric)] = Seq.empty): MetricsUpdater =
new ProjectMetricsUpdater(metrics, extraMetrics)

override def genHashAggregateTransformerMetrics(
sparkContext: SparkContext): Map[String, SQLMetric] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ package org.apache.gluten.metrics

import org.apache.spark.sql.execution.metric.SQLMetric

class FilterMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsUpdater {
class FilterMetricsUpdater(
val metrics: Map[String, SQLMetric],
val extraMetrics: Seq[(String, SQLMetric)])
extends MetricsUpdater {

override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
if (opMetrics != null) {
Expand All @@ -30,8 +33,12 @@ class FilterMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsU
metrics("wallNanos") += operatorMetrics.wallNanos
metrics("peakMemoryBytes") += operatorMetrics.peakMemoryBytes
metrics("numMemoryAllocations") += operatorMetrics.numMemoryAllocations
if (metrics.contains("incrementMetric")) {
metrics("incrementMetric") += operatorMetrics.outputRows
extraMetrics.foreach {
case (name, metric) =>
name match {
case "increment_metric" => metric += operatorMetrics.outputRows
case _ => // do nothing
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ package org.apache.gluten.metrics

import org.apache.spark.sql.execution.metric.SQLMetric

class ProjectMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsUpdater {
class ProjectMetricsUpdater(
val metrics: Map[String, SQLMetric],
val extraMetrics: Seq[(String, SQLMetric)])
extends MetricsUpdater {

override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = {
if (opMetrics != null) {
Expand All @@ -30,6 +33,13 @@ class ProjectMetricsUpdater(val metrics: Map[String, SQLMetric]) extends Metrics
metrics("wallNanos") += operatorMetrics.wallNanos
metrics("peakMemoryBytes") += operatorMetrics.peakMemoryBytes
metrics("numMemoryAllocations") += operatorMetrics.numMemoryAllocations
extraMetrics.foreach {
case (name, metric) =>
name match {
case "increment_metric" => metric += operatorMetrics.outputRows
case _ => // do nothing
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,16 @@ import scala.collection.JavaConverters._
case class DeltaFilterExecTransformer(condition: Expression, child: SparkPlan)
extends FilterExecTransformerBase(condition, child) {

private var extraMetric: Map[String, SQLMetric] = Map.empty
private var extraMetrics: Seq[(String, SQLMetric)] = Seq.empty

// Note: "metrics" is made transient to avoid sending driver-side metrics to tasks.
@transient override lazy val metrics =
BackendsApiManager.getMetricsApiInstance.genFilterTransformerMetrics(sparkContext, extraMetric)
BackendsApiManager.getMetricsApiInstance.genFilterTransformerMetrics(sparkContext)

override def metricsUpdater(): MetricsUpdater =
BackendsApiManager.getMetricsApiInstance.genFilterTransformerMetricsUpdater(metrics)
BackendsApiManager.getMetricsApiInstance.genFilterTransformerMetricsUpdater(
metrics,
extraMetrics)

override def getRelNode(
context: SubstraitContext,
Expand All @@ -54,7 +56,7 @@ case class DeltaFilterExecTransformer(condition: Expression, child: SparkPlan)
val args = context.registeredFunction
val condExprNode = condExpr match {
case IncrementMetric(child, metric) =>
extraMetric ++= Map(metric.id.toString -> metric)
extraMetrics :+= (condExpr.prettyName, metric)
ExpressionConverter
.replaceWithExpressionTransformer(child, attributeSeq = originalInputAttributes)
.doTransform(args)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,32 @@ import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.expression.{ConverterUtils, ExpressionConverter, ExpressionTransformer}
import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.metrics.MetricsUpdater
import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.`type`.TypeBuilder
import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.extensions.ExtensionBuilder
import org.apache.gluten.substrait.rel.{RelBuilder, RelNode}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CaseWhen, Expression, NamedExpression, NullIntolerant, PredicateHelper, SortOrder}
import org.apache.spark.sql.delta.metric.IncrementMetric
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.{ExplainUtils, OrderPreservingNodeShim, PartitioningPreservingNodeShim, SparkPlan}
import org.apache.spark.sql.execution.metric.SQLMetric

import scala.collection.JavaConverters._
import scala.collection.mutable

case class DeltaProjectExecTransformer private (projectList: Seq[NamedExpression], child: SparkPlan)
extends UnaryTransformSupport
with OrderPreservingNodeShim
with PartitioningPreservingNodeShim
with PredicateHelper
with Logging {
with OrderPreservingNodeShim
with PartitioningPreservingNodeShim
with PredicateHelper
with Logging {

private var extraMetric: Map[String, SQLMetric] = Map.empty
private var extraMetrics = mutable.Seq.empty[(String, SQLMetric)]

// Note: "metrics" is made transient to avoid sending driver-side metrics to tasks.
@transient override lazy val metrics =
BackendsApiManager.getMetricsApiInstance.genProjectTransformerMetrics(sparkContext)
BackendsApiManager.getMetricsApiInstance.genProjectTransformerMetrics(sparkContext)

override protected def doValidateInternal(): ValidationResult = {
val substraitContext = new SubstraitContext
Expand All @@ -61,7 +63,9 @@ case class DeltaProjectExecTransformer private (projectList: Seq[NamedExpression
}

override def metricsUpdater(): MetricsUpdater =
BackendsApiManager.getMetricsApiInstance.genProjectTransformerMetricsUpdater(metrics)
BackendsApiManager.getMetricsApiInstance.genProjectTransformerMetricsUpdater(
metrics,
extraMetrics)

override def doTransform(context: SubstraitContext): TransformContext = {
val childCtx = child.asInstanceOf[TransformSupport].transform(context)
Expand Down Expand Up @@ -135,24 +139,21 @@ case class DeltaProjectExecTransformer private (projectList: Seq[NamedExpression
case alias: Alias =>
alias.child match {
case IncrementMetric(child, metric) =>
extraMetric ++= Map(metric.id.toString -> metric)
extraMetrics :+= (alias.child.prettyName, metric)
Alias(child = child, name = alias.name)()

case CaseWhen(branches, elseValue) =>
val newBranches = branches.map {
case (expr1, expr2: IncrementMetric) =>
val incrementMetric = expr2
extraMetric ++= Map(incrementMetric.metric.id.toString -> incrementMetric.metric)
(expr1, incrementMetric.child)
extraMetrics :+= (expr2.prettyName, expr2.metric)
(expr1, expr2.child)
case other => other
}

val newElseValue = elseValue match {
case Some(IncrementMetric(child: IncrementMetric, metric)) =>
extraMetric ++= Map(
metric.id.toString -> metric,
child.metric.id.toString -> child.metric
)
extraMetrics :+= (child.prettyName, metric)
extraMetrics :+= (child.prettyName, child.metric)
Some(child.child)
case _ => elseValue
}
Expand All @@ -168,4 +169,4 @@ case class DeltaProjectExecTransformer private (projectList: Seq[NamedExpression
case other => other
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,17 @@ trait MetricsApi extends Serializable {

def genFileSourceScanTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater

def genFilterTransformerMetrics(
sparkContext: SparkContext,
extraMetric: Map[String, SQLMetric] = Map.empty): Map[String, SQLMetric]
def genFilterTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric]

def genFilterTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater
def genFilterTransformerMetricsUpdater(
metrics: Map[String, SQLMetric],
extraMetrics: Seq[(String, SQLMetric)] = Seq.empty): MetricsUpdater

def genProjectTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric]

def genProjectTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater
def genProjectTransformerMetricsUpdater(
metrics: Map[String, SQLMetric],
extraMetrics: Seq[(String, SQLMetric)] = Seq.empty): MetricsUpdater

def genHashAggregateTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric]

Expand Down

0 comments on commit 3b7e69b

Please sign in to comment.