diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala index c53448cdd8586..db6f513193476 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHMetricsApi.scala @@ -152,7 +152,9 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil { override def genFileSourceScanTransformerMetricsUpdater( metrics: Map[String, SQLMetric]): MetricsUpdater = new FileSourceScanMetricsUpdater(metrics) - override def genFilterTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = + override def genFilterTransformerMetrics( + sparkContext: SparkContext, + extraMetric: Map[String, SQLMetric] = Map.empty): Map[String, SQLMetric] = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"), @@ -163,7 +165,7 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil { "inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of waiting for data"), "outputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of waiting for output"), "totalTime" -> SQLMetrics.createTimingMetric(sparkContext, "time") - ) + ) ++ extraMetric override def genFilterTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater = new FilterMetricsUpdater(metrics) diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala index 00cba4372891a..0818c4ac22ef5 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala @@ -162,7 +162,9 @@ class VeloxMetricsApi extends MetricsApi with Logging { override def genFileSourceScanTransformerMetricsUpdater( metrics: Map[String, SQLMetric]): MetricsUpdater = new FileSourceScanMetricsUpdater(metrics) - override def genFilterTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = + override def genFilterTransformerMetrics( + sparkContext: SparkContext, + extraMetric: Map[String, SQLMetric] = Map.empty): Map[String, SQLMetric] = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"), @@ -173,7 +175,7 @@ class VeloxMetricsApi extends MetricsApi with Logging { "numMemoryAllocations" -> SQLMetrics.createMetric( sparkContext, "number of memory allocations") - ) + ) ++ extraMetric override def genFilterTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater = new FilterMetricsUpdater(metrics) diff --git a/gluten-data/src/main/scala/org/apache/gluten/metrics/FilterMetricsUpdater.scala b/gluten-data/src/main/scala/org/apache/gluten/metrics/FilterMetricsUpdater.scala index c877fdbb07855..45dfd016f6480 100644 --- a/gluten-data/src/main/scala/org/apache/gluten/metrics/FilterMetricsUpdater.scala +++ b/gluten-data/src/main/scala/org/apache/gluten/metrics/FilterMetricsUpdater.scala @@ -30,6 +30,9 @@ class FilterMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsU metrics("wallNanos") += operatorMetrics.wallNanos metrics("peakMemoryBytes") += operatorMetrics.peakMemoryBytes metrics("numMemoryAllocations") += operatorMetrics.numMemoryAllocations + if (metrics.contains("incrementMetric")) { + metrics("incrementMetric") += operatorMetrics.outputRows + } } } } diff --git a/gluten-substrait/pom.xml b/gluten-substrait/pom.xml index 77bb9f3c33e2b..cbb854bc68055 100644 --- a/gluten-substrait/pom.xml +++ b/gluten-substrait/pom.xml @@ -64,6 +64,11 @@ ${hadoop.version} provided + + io.delta + ${delta.package.name}_${scala.binary.version} + provided + org.apache.spark spark-core_${scala.binary.version} diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala index a96f27f5a8a33..dcd02da905f7d 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/MetricsApi.scala @@ -55,7 +55,9 @@ trait MetricsApi extends Serializable { def genFileSourceScanTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater - def genFilterTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] + def genFilterTransformerMetrics( + sparkContext: SparkContext, + extraMetric: Map[String, SQLMetric]): Map[String, SQLMetric] def genFilterTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala index 8e87baf5381d9..5a90748aa7c47 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala @@ -29,8 +29,10 @@ import org.apache.gluten.substrait.rel.{RelBuilder, RelNode} import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.delta.metric.IncrementMetric import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan} +import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.utils.StructTypeFWD import org.apache.spark.sql.vectorized.ColumnarBatch @@ -43,9 +45,11 @@ abstract class FilterExecTransformerBase(val cond: Expression, val input: SparkP with PredicateHelper with Logging { + private var extraMetric: Map[String, SQLMetric] = Map.empty + // Note: "metrics" is made transient to avoid sending driver-side metrics to tasks. @transient override lazy val metrics = - BackendsApiManager.getMetricsApiInstance.genFilterTransformerMetrics(sparkContext) + BackendsApiManager.getMetricsApiInstance.genFilterTransformerMetrics(sparkContext, extraMetric) // Split out all the IsNotNulls from condition. private val (notNullPreds, otherPreds) = splitConjunctivePredicates(cond).partition { @@ -73,9 +77,17 @@ abstract class FilterExecTransformerBase(val cond: Expression, val input: SparkP validation: Boolean): RelNode = { assert(condExpr != null) val args = context.registeredFunction - val condExprNode = ExpressionConverter - .replaceWithExpressionTransformer(condExpr, attributeSeq = originalInputAttributes) - .doTransform(args) + val condExprNode = condExpr match { + case IncrementMetric(child, metric) => + extraMetric ++= Map("incrementMetric" -> metric) + ExpressionConverter + .replaceWithExpressionTransformer(child, attributeSeq = originalInputAttributes) + .doTransform(args) + case _ => + ExpressionConverter + .replaceWithExpressionTransformer(condExpr, attributeSeq = originalInputAttributes) + .doTransform(args) + } if (!validation) { RelBuilder.makeFilterRel(input, condExprNode, context, operatorId)