Skip to content

Commit

Permalink
[GLUTEN-7110][VL][DELTA] support IncrementMetric in gluten
Browse files Browse the repository at this point in the history
  • Loading branch information
dcoliversun committed Sep 4, 2024
1 parent b5366ef commit e1f90c0
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,9 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil {
override def genFileSourceScanTransformerMetricsUpdater(
metrics: Map[String, SQLMetric]): MetricsUpdater = new FileSourceScanMetricsUpdater(metrics)

override def genFilterTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
override def genFilterTransformerMetrics(
sparkContext: SparkContext,
extraMetric: Map[String, SQLMetric] = Map.empty): Map[String, SQLMetric] =
Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
Expand All @@ -163,7 +165,7 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil {
"inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of waiting for data"),
"outputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of waiting for output"),
"totalTime" -> SQLMetrics.createTimingMetric(sparkContext, "time")
)
) ++ extraMetric

override def genFilterTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater =
new FilterMetricsUpdater(metrics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,9 @@ class VeloxMetricsApi extends MetricsApi with Logging {
override def genFileSourceScanTransformerMetricsUpdater(
metrics: Map[String, SQLMetric]): MetricsUpdater = new FileSourceScanMetricsUpdater(metrics)

override def genFilterTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
override def genFilterTransformerMetrics(
sparkContext: SparkContext,
extraMetric: Map[String, SQLMetric] = Map.empty): Map[String, SQLMetric] =
Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
Expand All @@ -173,7 +175,7 @@ class VeloxMetricsApi extends MetricsApi with Logging {
"numMemoryAllocations" -> SQLMetrics.createMetric(
sparkContext,
"number of memory allocations")
)
) ++ extraMetric

override def genFilterTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater =
new FilterMetricsUpdater(metrics)
Expand Down
5 changes: 5 additions & 0 deletions gluten-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@
<version>${hadoop.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>${delta.package.name}_${scala.binary.version}</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ class FilterMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsU
metrics("wallNanos") += operatorMetrics.wallNanos
metrics("peakMemoryBytes") += operatorMetrics.peakMemoryBytes
metrics("numMemoryAllocations") += operatorMetrics.numMemoryAllocations
if (metrics.contains("incrementMetric")) {
metrics("incrementMetric") += operatorMetrics.outputRows
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ trait MetricsApi extends Serializable {

def genFileSourceScanTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater

def genFilterTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric]
def genFilterTransformerMetrics(
sparkContext: SparkContext,
extraMetric: Map[String, SQLMetric]): Map[String, SQLMetric]

def genFilterTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ import org.apache.gluten.substrait.rel.{RelBuilder, RelNode}
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.delta.metric.IncrementMetric
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.utils.StructTypeFWD
import org.apache.spark.sql.vectorized.ColumnarBatch

Expand All @@ -43,9 +45,11 @@ abstract class FilterExecTransformerBase(val cond: Expression, val input: SparkP
with PredicateHelper
with Logging {

private var extraMetric: Map[String, SQLMetric] = Map.empty

// Note: "metrics" is made transient to avoid sending driver-side metrics to tasks.
@transient override lazy val metrics =
BackendsApiManager.getMetricsApiInstance.genFilterTransformerMetrics(sparkContext)
BackendsApiManager.getMetricsApiInstance.genFilterTransformerMetrics(sparkContext, extraMetric)

// Split out all the IsNotNulls from condition.
private val (notNullPreds, otherPreds) = splitConjunctivePredicates(cond).partition {
Expand Down Expand Up @@ -73,9 +77,17 @@ abstract class FilterExecTransformerBase(val cond: Expression, val input: SparkP
validation: Boolean): RelNode = {
assert(condExpr != null)
val args = context.registeredFunction
val condExprNode = ExpressionConverter
.replaceWithExpressionTransformer(condExpr, attributeSeq = originalInputAttributes)
.doTransform(args)
val condExprNode = condExpr match {
case IncrementMetric(child, metric) =>
extraMetric ++= Map("incrementMetric" -> metric)
ExpressionConverter
.replaceWithExpressionTransformer(child, attributeSeq = originalInputAttributes)
.doTransform(args)
case _ =>
ExpressionConverter
.replaceWithExpressionTransformer(condExpr, attributeSeq = originalInputAttributes)
.doTransform(args)
}

if (!validation) {
RelBuilder.makeFilterRel(input, condExprNode, context, operatorId)
Expand Down

0 comments on commit e1f90c0

Please sign in to comment.