From cfa9afa58ce17ff001282a82f632100d941a0322 Mon Sep 17 00:00:00 2001 From: Chungmin Lee Date: Mon, 11 Mar 2024 02:21:58 -0700 Subject: [PATCH] [GLUTEN-4835][CORE] Match metric names with Spark (#4834) --- .../backendsapi/clickhouse/CHMetricsApi.scala | 52 +++++++++---------- .../metrics/BatchScanMetricsUpdater.scala | 6 +-- .../metrics/ExpandMetricsUpdater.scala | 4 +- .../FileSourceScanMetricsUpdater.scala | 6 +-- .../metrics/FilterMetricsUpdater.scala | 4 +- .../metrics/GenerateMetricsUpdater.scala | 4 +- .../metrics/HashAggregateMetricsUpdater.scala | 4 +- .../metrics/HashJoinMetricsUpdater.scala | 4 +- .../metrics/HiveTableScanMetricsUpdater.scala | 4 +- .../metrics/InputIteratorMetricsUpdater.scala | 4 +- .../metrics/ProjectMetricsUpdater.scala | 4 +- .../metrics/SortMetricsUpdater.scala | 4 +- .../metrics/WindowMetricsUpdater.scala | 4 +- .../GlutenClickHouseTPCHBucketSuite.scala | 18 +++---- ...seTPCHColumnarShuffleParquetAQESuite.scala | 44 ++++++++-------- ...utenClickHouseTPCHParquetBucketSuite.scala | 18 +++---- .../GlutenClickHouseTPCDSMetricsSuite.scala | 12 ++--- .../GlutenClickHouseTPCHMetricsSuite.scala | 36 ++++++------- .../benchmarks/CHParquetReadBenchmark.scala | 2 +- .../backendsapi/velox/MetricsApiImpl.scala | 20 ++++--- .../execution/BasicScanExecTransformer.scala | 2 +- .../metrics/BatchScanMetricsUpdater.scala | 4 +- .../metrics/ExpandMetricsUpdater.scala | 2 +- .../FileSourceScanMetricsUpdater.scala | 2 +- .../metrics/FilterMetricsUpdater.scala | 2 +- .../metrics/HiveTableScanMetricsUpdater.scala | 2 +- .../metrics/InputIteratorMetricsUpdater.scala | 4 +- .../metrics/LimitMetricsUpdater.scala | 2 +- .../metrics/ProjectMetricsUpdater.scala | 2 +- .../metrics/SortMetricsUpdater.scala | 2 +- .../metrics/WindowMetricsUpdater.scala | 2 +- .../spark/sql/GlutenSQLQuerySuite.scala | 26 ++++++++++ 32 files changed, 165 insertions(+), 141 deletions(-) diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala index 838612036af2..488686e93f73 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHMetricsApi.scala @@ -44,8 +44,8 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil { "iterReadTime" -> SQLMetrics.createTimingMetric( sparkContext, "time of reading from iterator"), - "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), - "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "fillingRightJoinSideTime" -> SQLMetrics.createTimingMetric( sparkContext, "filling right join side time") @@ -59,12 +59,12 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil { override def genBatchScanTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = Map( - "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), + "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), "inputVectors" -> SQLMetrics.createMetric(sparkContext, "number of input vectors"), "inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of input bytes"), "rawInputRows" -> SQLMetrics.createMetric(sparkContext, "number of raw input rows"), "rawInputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of raw input bytes"), - "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"), "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"), "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"), @@ -79,12 +79,12 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil { override def genHiveTableScanTransformerMetrics( sparkContext: SparkContext): Map[String, SQLMetric] = Map( - "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), + "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), "inputVectors" -> SQLMetrics.createMetric(sparkContext, "number of input vectors"), "inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of input bytes"), "rawInputRows" -> SQLMetrics.createMetric(sparkContext, "number of raw input rows"), "rawInputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of raw input bytes"), - "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"), "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"), "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"), @@ -107,12 +107,12 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil { override def genFileSourceScanTransformerMetrics( sparkContext: SparkContext): Map[String, SQLMetric] = Map( - "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), + "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), "inputVectors" -> SQLMetrics.createMetric(sparkContext, "number of input vectors"), "inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of input bytes"), "rawInputRows" -> SQLMetrics.createMetric(sparkContext, "number of raw input rows"), "rawInputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of raw input bytes"), - "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"), "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"), "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"), @@ -133,10 +133,10 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil { override def genFilterTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = Map( - "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"), "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"), - "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), + "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), "inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of input bytes"), "extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra operators time"), "inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of waiting for data"), @@ -149,10 +149,10 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil { override def genProjectTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = Map( - "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"), "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"), - "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), + "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), "inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of input bytes"), "extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra operators time"), "inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of waiting for data"), @@ -166,10 +166,10 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil { override def genHashAggregateTransformerMetrics( sparkContext: SparkContext): Map[String, SQLMetric] = Map( - "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"), "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"), - "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), + "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), "inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of input bytes"), "extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra operators time"), "inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of waiting for data"), @@ -187,10 +187,10 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil { override def genExpandTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = Map( - "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"), "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"), - "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), + "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), "inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of input bytes"), "extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra operators time"), "inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of waiting for data"), @@ -233,10 +233,10 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil { override def genWindowTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = Map( - "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"), "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"), - "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), + "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), "inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of input bytes"), "extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra operators time"), "inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of waiting for data"), @@ -263,10 +263,10 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil { override def genLimitTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = Map( - "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"), "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"), - "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), + "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), "inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of input bytes"), "extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra operators time"), "inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of waiting for data"), @@ -279,10 +279,10 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil { override def genSortTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = Map( - "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"), "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"), - "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), + "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), "inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of input bytes"), "extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra operators time"), "inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of waiting for data"), @@ -319,10 +319,10 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil { override def genHashJoinTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = Map( - "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"), "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"), - "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), + "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), "inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of input bytes"), "extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra operators time"), "inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of waiting for data"), @@ -358,10 +358,10 @@ class CHMetricsApi extends MetricsApi with Logging with LogLevelUtil { } override def genGenerateTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = Map( - "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"), "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"), - "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), + "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), "inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of input bytes"), "extraTime" -> SQLMetrics.createTimingMetric(sparkContext, "extra operators time"), "inputWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "time of waiting for data"), diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/BatchScanMetricsUpdater.scala b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/BatchScanMetricsUpdater.scala index 5cd44a508a6c..d173f171502d 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/BatchScanMetricsUpdater.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/BatchScanMetricsUpdater.scala @@ -23,10 +23,10 @@ class BatchScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric]) extends MetricsUpdater { val scanTime: SQLMetric = metrics("scanTime") - val outputRows: SQLMetric = metrics("outputRows") + val outputRows: SQLMetric = metrics("numOutputRows") val outputVectors: SQLMetric = metrics("outputVectors") val outputBytes: SQLMetric = metrics("outputBytes") - val inputRows: SQLMetric = metrics("inputRows") + val inputRows: SQLMetric = metrics("numInputRows") val inputBytes: SQLMetric = metrics("inputBytes") val extraTime: SQLMetric = metrics("extraTime") val inputWaitTime: SQLMetric = metrics("inputWaitTime") @@ -34,7 +34,7 @@ class BatchScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric]) override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = { // inputMetrics.bridgeIncBytesRead(metrics("inputBytes").value) - // inputMetrics.bridgeIncRecordsRead(metrics("inputRows").value) + // inputMetrics.bridgeIncRecordsRead(metrics("numInputRows").value) } override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = { diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/ExpandMetricsUpdater.scala b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/ExpandMetricsUpdater.scala index 0aa62e875a4f..d43cbcbc6d9f 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/ExpandMetricsUpdater.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/ExpandMetricsUpdater.scala @@ -33,9 +33,9 @@ class ExpandMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsU MetricsUtil.updateExtraTimeMetric( metricsData, metrics("extraTime"), - metrics("outputRows"), + metrics("numOutputRows"), metrics("outputBytes"), - metrics("inputRows"), + metrics("numInputRows"), metrics("inputBytes"), ExpandMetricsUpdater.INCLUDING_PROCESSORS, ExpandMetricsUpdater.CH_PLAN_NODE_NAME diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala index 7985efbf0ee6..1c6da8dad26a 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala @@ -27,10 +27,10 @@ class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric extends MetricsUpdater { val scanTime: SQLMetric = metrics("scanTime") - val outputRows: SQLMetric = metrics("outputRows") + val outputRows: SQLMetric = metrics("numOutputRows") val outputVectors: SQLMetric = metrics("outputVectors") val outputBytes: SQLMetric = metrics("outputBytes") - val inputRows: SQLMetric = metrics("inputRows") + val inputRows: SQLMetric = metrics("numInputRows") val inputBytes: SQLMetric = metrics("inputBytes") val extraTime: SQLMetric = metrics("extraTime") val inputWaitTime: SQLMetric = metrics("inputWaitTime") @@ -38,7 +38,7 @@ class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric override def updateInputMetrics(inputMetrics: InputMetricsWrapper): Unit = { // inputMetrics.bridgeIncBytesRead(metrics("inputBytes").value) - // inputMetrics.bridgeIncRecordsRead(metrics("inputRows").value) + // inputMetrics.bridgeIncRecordsRead(metrics("numInputRows").value) } override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = { diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FilterMetricsUpdater.scala b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FilterMetricsUpdater.scala index 1a2b77f35330..b44a9e3820ca 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FilterMetricsUpdater.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/FilterMetricsUpdater.scala @@ -33,9 +33,9 @@ class FilterMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsU MetricsUtil.updateExtraTimeMetric( metricsData, metrics("extraTime"), - metrics("outputRows"), + metrics("numOutputRows"), metrics("outputBytes"), - metrics("inputRows"), + metrics("numInputRows"), metrics("inputBytes"), FilterMetricsUpdater.INCLUDING_PROCESSORS, FilterMetricsUpdater.INCLUDING_PROCESSORS diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/GenerateMetricsUpdater.scala b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/GenerateMetricsUpdater.scala index 166ba5d7ee83..5f2e3f63c0ef 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/GenerateMetricsUpdater.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/GenerateMetricsUpdater.scala @@ -33,9 +33,9 @@ class GenerateMetricsUpdater(val metrics: Map[String, SQLMetric]) extends Metric MetricsUtil.updateExtraTimeMetric( metricsData, metrics("extraTime"), - metrics("outputRows"), + metrics("numOutputRows"), metrics("outputBytes"), - metrics("inputRows"), + metrics("numInputRows"), metrics("inputBytes"), GenerateMetricsUpdater.INCLUDING_PROCESSORS, GenerateMetricsUpdater.CH_PLAN_NODE_NAME diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HashAggregateMetricsUpdater.scala b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HashAggregateMetricsUpdater.scala index ac55b8d17f09..8f6e07a9469f 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HashAggregateMetricsUpdater.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HashAggregateMetricsUpdater.scala @@ -44,9 +44,9 @@ class HashAggregateMetricsUpdater(val metrics: Map[String, SQLMetric]) MetricsUtil.updateExtraTimeMetric( aggMetricsData, metrics("extraTime"), - metrics("outputRows"), + metrics("numOutputRows"), metrics("outputBytes"), - metrics("inputRows"), + metrics("numInputRows"), metrics("inputBytes"), HashAggregateMetricsUpdater.INCLUDING_PROCESSORS, HashAggregateMetricsUpdater.CH_PLAN_NODE_NAME diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HashJoinMetricsUpdater.scala b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HashJoinMetricsUpdater.scala index 522069aef71b..180bcc03482b 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HashJoinMetricsUpdater.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HashJoinMetricsUpdater.scala @@ -83,9 +83,9 @@ class HashJoinMetricsUpdater(val metrics: Map[String, SQLMetric]) metrics("extraTime") += (processor.time / 1000L).toLong } if (HashJoinMetricsUpdater.CH_PLAN_NODE_NAME.contains(processor.name)) { - metrics("outputRows") += processor.outputRows + metrics("numOutputRows") += processor.outputRows metrics("outputBytes") += processor.outputBytes - metrics("inputRows") += processor.inputRows + metrics("numInputRows") += processor.inputRows metrics("inputBytes") += processor.inputBytes } }) diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HiveTableScanMetricsUpdater.scala b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HiveTableScanMetricsUpdater.scala index 4e3682561839..89c3198da403 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HiveTableScanMetricsUpdater.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/HiveTableScanMetricsUpdater.scala @@ -23,10 +23,10 @@ class HiveTableScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric] extends MetricsUpdater { val scanTime: SQLMetric = metrics("scanTime") - val outputRows: SQLMetric = metrics("outputRows") + val outputRows: SQLMetric = metrics("numOutputRows") val outputVectors: SQLMetric = metrics("outputVectors") val outputBytes: SQLMetric = metrics("outputBytes") - val inputRows: SQLMetric = metrics("inputRows") + val inputRows: SQLMetric = metrics("numInputRows") val inputBytes: SQLMetric = metrics("inputBytes") val extraTime: SQLMetric = metrics("extraTime") val inputWaitTime: SQLMetric = metrics("inputWaitTime") diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/InputIteratorMetricsUpdater.scala b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/InputIteratorMetricsUpdater.scala index e608cf1e3c39..3a3477659754 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/InputIteratorMetricsUpdater.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/InputIteratorMetricsUpdater.scala @@ -33,8 +33,8 @@ case class InputIteratorMetricsUpdater(metrics: Map[String, SQLMetric]) extends InputIteratorMetricsUpdater.CH_PLAN_NODE_NAME .exists(processor.name.startsWith(_)) ) { - metrics("inputRows") += processor.inputRows - metrics("outputRows") += processor.outputRows + metrics("numInputRows") += processor.inputRows + metrics("numOutputRows") += processor.outputRows } if (processor.name.equalsIgnoreCase("FillingRightJoinSide")) { metrics("fillingRightJoinSideTime") += (processor.time / 1000L).toLong diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/ProjectMetricsUpdater.scala b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/ProjectMetricsUpdater.scala index bfdcdfd024b5..4bd445590e81 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/ProjectMetricsUpdater.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/ProjectMetricsUpdater.scala @@ -33,9 +33,9 @@ class ProjectMetricsUpdater(val metrics: Map[String, SQLMetric]) extends Metrics MetricsUtil.updateExtraTimeMetric( metricsData, metrics("extraTime"), - metrics("outputRows"), + metrics("numOutputRows"), metrics("outputBytes"), - metrics("inputRows"), + metrics("numInputRows"), metrics("inputBytes"), ProjectMetricsUpdater.INCLUDING_PROCESSORS, ProjectMetricsUpdater.CH_PLAN_NODE_NAME diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/SortMetricsUpdater.scala b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/SortMetricsUpdater.scala index 5aba5e29019e..e53ba6ccfb48 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/SortMetricsUpdater.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/SortMetricsUpdater.scala @@ -33,9 +33,9 @@ class SortMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsUpd MetricsUtil.updateExtraTimeMetric( metricsData, metrics("extraTime"), - metrics("outputRows"), + metrics("numOutputRows"), metrics("outputBytes"), - metrics("inputRows"), + metrics("numInputRows"), metrics("inputBytes"), SortMetricsUpdater.INCLUDING_PROCESSORS, SortMetricsUpdater.CH_PLAN_NODE_NAME diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/WindowMetricsUpdater.scala b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/WindowMetricsUpdater.scala index 6943359ae6ce..e36713c34516 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/metrics/WindowMetricsUpdater.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/metrics/WindowMetricsUpdater.scala @@ -33,9 +33,9 @@ class WindowMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsU MetricsUtil.updateExtraTimeMetric( metricsData, metrics("extraTime"), - metrics("outputRows"), + metrics("numOutputRows"), metrics("outputBytes"), - metrics("inputRows"), + metrics("numInputRows"), metrics("inputBytes"), WindowMetricsUpdater.INCLUDING_PROCESSORS, WindowMetricsUpdater.CH_PLAN_NODE_NAME diff --git a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHBucketSuite.scala b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHBucketSuite.scala index 23face9ce5e0..02d5bcb631be 100644 --- a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHBucketSuite.scala +++ b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHBucketSuite.scala @@ -242,7 +242,7 @@ class GlutenClickHouseTPCHBucketSuite assert(!(plans(0).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)) assert(plans(0).metrics("numFiles").value === 2) assert(plans(0).metrics("pruningTime").value === -1) - assert(plans(0).metrics("outputRows").value === 591673) + assert(plans(0).metrics("numOutputRows").value === 591673) }) } @@ -301,7 +301,7 @@ class GlutenClickHouseTPCHBucketSuite assert(plans(11).asInstanceOf[FileSourceScanExecTransformer].bucketedScan) } assert(plans(11).metrics("numFiles").value === 1) - assert(plans(11).metrics("outputRows").value === 1000) + assert(plans(11).metrics("numOutputRows").value === 1000) }) } @@ -337,11 +337,11 @@ class GlutenClickHouseTPCHBucketSuite assert(plans(2).asInstanceOf[FileSourceScanExecTransformer].bucketedScan) } assert(plans(2).metrics("numFiles").value === 2) - assert(plans(2).metrics("outputRows").value === 3111) + assert(plans(2).metrics("numOutputRows").value === 3111) assert(!(plans(3).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)) assert(plans(3).metrics("numFiles").value === 2) - assert(plans(3).metrics("outputRows").value === 72678) + assert(plans(3).metrics("numOutputRows").value === 72678) }) withSQLConf( @@ -383,11 +383,11 @@ class GlutenClickHouseTPCHBucketSuite assert(plans(1).asInstanceOf[FileSourceScanExecTransformer].bucketedScan) assert(plans(1).metrics("numFiles").value === 2) - assert(plans(1).metrics("outputRows").value === 5552) + assert(plans(1).metrics("numOutputRows").value === 5552) assert(plans(2).asInstanceOf[FileSourceScanExecTransformer].bucketedScan) assert(plans(2).metrics("numFiles").value === 2) - assert(plans(2).metrics("outputRows").value === 379809) + assert(plans(2).metrics("numOutputRows").value === 379809) }) withSQLConf( @@ -417,7 +417,7 @@ class GlutenClickHouseTPCHBucketSuite assert(!(plans(0).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)) assert(plans(0).metrics("numFiles").value === 2) assert(plans(0).metrics("pruningTime").value === -1) - assert(plans(0).metrics("outputRows").value === 11618) + assert(plans(0).metrics("numOutputRows").value === 11618) }) } @@ -442,11 +442,11 @@ class GlutenClickHouseTPCHBucketSuite assert(plans(1).asInstanceOf[FileSourceScanExecTransformer].bucketedScan) assert(plans(1).metrics("numFiles").value === 2) - assert(plans(1).metrics("outputRows").value === 150000) + assert(plans(1).metrics("numOutputRows").value === 150000) assert(plans(2).asInstanceOf[FileSourceScanExecTransformer].bucketedScan) assert(plans(2).metrics("numFiles").value === 2) - assert(plans(2).metrics("outputRows").value === 3155) + assert(plans(2).metrics("numOutputRows").value === 3155) }) withSQLConf( diff --git a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala index f710333d60c5..29815aff6ef4 100644 --- a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala +++ b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala @@ -67,21 +67,21 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite assert(plans(4).metrics("numFiles").value === 1) assert(plans(4).metrics("pruningTime").value === -1) assert(plans(4).metrics("filesSize").value === 19230111) - assert(plans(4).metrics("outputRows").value === 600572) + assert(plans(4).metrics("numOutputRows").value === 600572) - assert(plans(3).metrics("inputRows").value === 591673) - assert(plans(3).metrics("outputRows").value === 4) + assert(plans(3).metrics("numInputRows").value === 591673) + assert(plans(3).metrics("numOutputRows").value === 4) assert(plans(3).metrics("outputVectors").value === 1) - assert(plans(2).metrics("inputRows").value === 8) - assert(plans(2).metrics("outputRows").value === 8) + assert(plans(2).metrics("numInputRows").value === 8) + assert(plans(2).metrics("numOutputRows").value === 8) // Execute Sort operator, it will read the data twice. - assert(plans(1).metrics("outputRows").value === 8) + assert(plans(1).metrics("numOutputRows").value === 8) assert(plans(1).metrics("outputVectors").value === 2) - assert(plans(0).metrics("inputRows").value === 4) - assert(plans(0).metrics("outputRows").value === 4) + assert(plans(0).metrics("numInputRows").value === 4) + assert(plans(0).metrics("numOutputRows").value === 4) } } @@ -100,12 +100,12 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite assert(plans(2).metrics("pruningTime").value === -1) assert(plans(2).metrics("filesSize").value === 19230111) - assert(plans(1).metrics("inputRows").value === 591673) - assert(plans(1).metrics("outputRows").value === 4) + assert(plans(1).metrics("numInputRows").value === 591673) + assert(plans(1).metrics("numOutputRows").value === 4) assert(plans(1).metrics("outputVectors").value === 1) // Execute Sort operator, it will read the data twice. - assert(plans(0).metrics("outputRows").value === 8) + assert(plans(0).metrics("numOutputRows").value === 8) assert(plans(0).metrics("outputVectors").value === 2) } } @@ -138,17 +138,17 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite assert(inputIteratorTransformers.size == 4) - assert(inputIteratorTransformers(3).metrics("inputRows").value === 324322) - assert(inputIteratorTransformers(3).metrics("outputRows").value === 324322) + assert(inputIteratorTransformers(3).metrics("numInputRows").value === 324322) + assert(inputIteratorTransformers(3).metrics("numOutputRows").value === 324322) - assert(inputIteratorTransformers(2).metrics("inputRows").value === 72678) - assert(inputIteratorTransformers(2).metrics("outputRows").value === 72678) + assert(inputIteratorTransformers(2).metrics("numInputRows").value === 72678) + assert(inputIteratorTransformers(2).metrics("numOutputRows").value === 72678) - assert(inputIteratorTransformers(1).metrics("inputRows").value === 3111) - assert(inputIteratorTransformers(1).metrics("outputRows").value === 3111) + assert(inputIteratorTransformers(1).metrics("numInputRows").value === 3111) + assert(inputIteratorTransformers(1).metrics("numOutputRows").value === 3111) - assert(inputIteratorTransformers(0).metrics("inputRows").value === 15224) - assert(inputIteratorTransformers(0).metrics("outputRows").value === 15224) + assert(inputIteratorTransformers(0).metrics("numInputRows").value === 15224) + assert(inputIteratorTransformers(0).metrics("numOutputRows").value === 15224) } } } @@ -280,10 +280,10 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite case scanExec: BasicScanExecTransformer => scanExec case filterExec: FilterExecTransformerBase => filterExec } - assert(plans(2).metrics("inputRows").value === 600572) - assert(plans(2).metrics("outputRows").value === 379809) + assert(plans(2).metrics("numInputRows").value === 600572) + assert(plans(2).metrics("numOutputRows").value === 379809) - assert(plans(3).metrics("outputRows").value === 600572) + assert(plans(3).metrics("numOutputRows").value === 600572) } } diff --git a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHParquetBucketSuite.scala b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHParquetBucketSuite.scala index 427c4a6eb848..8c417499e90e 100644 --- a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHParquetBucketSuite.scala +++ b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHParquetBucketSuite.scala @@ -267,7 +267,7 @@ class GlutenClickHouseTPCHParquetBucketSuite assert(!(plans(0).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)) assert(plans(0).metrics("numFiles").value === 4) assert(plans(0).metrics("pruningTime").value === -1) - assert(plans(0).metrics("outputRows").value === 600572) + assert(plans(0).metrics("numOutputRows").value === 600572) } ) } @@ -329,7 +329,7 @@ class GlutenClickHouseTPCHParquetBucketSuite assert(plans(11).asInstanceOf[FileSourceScanExecTransformer].bucketedScan) } assert(plans(11).metrics("numFiles").value === 1) - assert(plans(11).metrics("outputRows").value === 1000) + assert(plans(11).metrics("numOutputRows").value === 1000) } ) } @@ -369,11 +369,11 @@ class GlutenClickHouseTPCHParquetBucketSuite assert(plans(2).asInstanceOf[FileSourceScanExecTransformer].bucketedScan) } assert(plans(2).metrics("numFiles").value === 4) - assert(plans(2).metrics("outputRows").value === 15000) + assert(plans(2).metrics("numOutputRows").value === 15000) assert(!(plans(3).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)) assert(plans(3).metrics("numFiles").value === 4) - assert(plans(3).metrics("outputRows").value === 150000) + assert(plans(3).metrics("numOutputRows").value === 150000) } ) @@ -421,11 +421,11 @@ class GlutenClickHouseTPCHParquetBucketSuite assert(plans(1).asInstanceOf[FileSourceScanExecTransformer].bucketedScan) assert(plans(1).metrics("numFiles").value === 4) - assert(plans(1).metrics("outputRows").value === 150000) + assert(plans(1).metrics("numOutputRows").value === 150000) assert(plans(2).asInstanceOf[FileSourceScanExecTransformer].bucketedScan) assert(plans(2).metrics("numFiles").value === 4) - assert(plans(2).metrics("outputRows").value === 600572) + assert(plans(2).metrics("numOutputRows").value === 600572) } ) @@ -461,7 +461,7 @@ class GlutenClickHouseTPCHParquetBucketSuite assert(!(plans(0).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)) assert(plans(0).metrics("numFiles").value === 4) assert(plans(0).metrics("pruningTime").value === -1) - assert(plans(0).metrics("outputRows").value === 600572) + assert(plans(0).metrics("numOutputRows").value === 600572) } ) } @@ -489,11 +489,11 @@ class GlutenClickHouseTPCHParquetBucketSuite assert(plans(1).asInstanceOf[FileSourceScanExecTransformer].bucketedScan) assert(plans(1).metrics("numFiles").value === 4) - assert(plans(1).metrics("outputRows").value === 150000) + assert(plans(1).metrics("numOutputRows").value === 150000) assert(plans(2).asInstanceOf[FileSourceScanExecTransformer].bucketedScan) assert(plans(2).metrics("numFiles").value === 4) - assert(plans(2).metrics("outputRows").value === 600572) + assert(plans(2).metrics("numOutputRows").value === 600572) } ) diff --git a/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCDSMetricsSuite.scala b/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCDSMetricsSuite.scala index 3a4cee9c0da3..bffd4f9ed1f4 100644 --- a/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCDSMetricsSuite.scala +++ b/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCDSMetricsSuite.scala @@ -99,9 +99,9 @@ class GlutenClickHouseTPCDSMetricsSuite extends GlutenClickHouseTPCDSAbstractSui assert(windowPlan0.metrics("totalTime").value == 2) assert(windowPlan0.metrics("inputWaitTime").value == 12) assert(windowPlan0.metrics("outputWaitTime").value == 0) - assert(windowPlan0.metrics("outputRows").value == 10717) + assert(windowPlan0.metrics("numOutputRows").value == 10717) assert(windowPlan0.metrics("outputBytes").value == 1224479) - assert(windowPlan0.metrics("inputRows").value == 10717) + assert(windowPlan0.metrics("numInputRows").value == 10717) assert(windowPlan0.metrics("inputBytes").value == 1128026) val windowPlan1 = allGlutenPlans(5) @@ -109,18 +109,18 @@ class GlutenClickHouseTPCDSMetricsSuite extends GlutenClickHouseTPCDSAbstractSui assert(windowPlan1.metrics("extraTime").value == 1) assert(windowPlan1.metrics("inputWaitTime").value == 23) assert(windowPlan1.metrics("outputWaitTime").value == 2) - assert(windowPlan1.metrics("outputRows").value == 12333) + assert(windowPlan1.metrics("numOutputRows").value == 12333) assert(windowPlan1.metrics("outputBytes").value == 1360484) - assert(windowPlan1.metrics("inputRows").value == 12333) + assert(windowPlan1.metrics("numInputRows").value == 12333) assert(windowPlan1.metrics("inputBytes").value == 1261820) val sortPlan = allGlutenPlans(6) assert(sortPlan.metrics("totalTime").value == 3) assert(sortPlan.metrics("inputWaitTime").value == 30) assert(sortPlan.metrics("outputWaitTime").value == 1) - assert(sortPlan.metrics("outputRows").value == 12333) + assert(sortPlan.metrics("numOutputRows").value == 12333) assert(sortPlan.metrics("outputBytes").value == 1261820) - assert(sortPlan.metrics("inputRows").value == 12333) + assert(sortPlan.metrics("numInputRows").value == 12333) assert(sortPlan.metrics("inputBytes").value == 1261820) } } diff --git a/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala b/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala index 7b90a5500c80..01257831761d 100644 --- a/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala +++ b/backends-clickhouse/src/test/scala/io/glutenproject/execution/metrics/GlutenClickHouseTPCHMetricsSuite.scala @@ -68,11 +68,11 @@ class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite assert(plans(2).metrics("pruningTime").value === -1) assert(plans(2).metrics("filesSize").value === 19230111) - assert(plans(1).metrics("outputRows").value === 4) + assert(plans(1).metrics("numOutputRows").value === 4) assert(plans(1).metrics("outputVectors").value === 1) // Execute Sort operator, it will read the data twice. - assert(plans(0).metrics("outputRows").value === 4) + assert(plans(0).metrics("numOutputRows").value === 4) assert(plans(0).metrics("outputVectors").value === 1) } } @@ -89,8 +89,8 @@ class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite case generate: GenerateExecTransformer => generate } assert(plans.size == 1) - assert(plans.head.metrics("inputRows").value == 25) - assert(plans.head.metrics("outputRows").value == 266) + assert(plans.head.metrics("numInputRows").value == 25) + assert(plans.head.metrics("numOutputRows").value == 266) assert(plans.head.metrics("outputVectors").value == 1) } } @@ -109,11 +109,11 @@ class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite assert(plans(2).metrics("pruningTime").value === -1) assert(plans(2).metrics("filesSize").value === 19230111) - assert(plans(1).metrics("outputRows").value === 4) + assert(plans(1).metrics("numOutputRows").value === 4) assert(plans(1).metrics("outputVectors").value === 1) // Execute Sort operator, it will read the data twice. - assert(plans(0).metrics("outputRows").value === 4) + assert(plans(0).metrics("numOutputRows").value === 4) assert(plans(0).metrics("outputVectors").value === 1) } } @@ -193,24 +193,24 @@ class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite assert(s.metrics("scanTime").value == 2) assert(s.metrics("inputWaitTime").value == 4) assert(s.metrics("outputWaitTime").value == 2) - assert(s.metrics("outputRows").value == 20000) + assert(s.metrics("numOutputRows").value == 20000) assert(s.metrics("outputBytes").value == 1451663) case f: FilterExecTransformerBase => assert(f.metrics("totalTime").value == 3) assert(f.metrics("inputWaitTime").value == 14) assert(f.metrics("outputWaitTime").value == 1) - assert(f.metrics("outputRows").value == 73) + assert(f.metrics("numOutputRows").value == 73) assert(f.metrics("outputBytes").value == 5304) - assert(f.metrics("inputRows").value == 20000) + assert(f.metrics("numInputRows").value == 20000) assert(f.metrics("inputBytes").value == 1451663) assert(f.metrics("extraTime").value == 1) case p: ProjectExecTransformer => assert(p.metrics("totalTime").value == 0) assert(p.metrics("inputWaitTime").value == 7) assert(p.metrics("outputWaitTime").value == 0) - assert(p.metrics("outputRows").value == 73) + assert(p.metrics("numOutputRows").value == 73) assert(p.metrics("outputBytes").value == 2336) - assert(p.metrics("inputRows").value == 73) + assert(p.metrics("numInputRows").value == 73) assert(p.metrics("inputBytes").value == 5085) } } @@ -230,25 +230,25 @@ class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite assert(scanPlan.metrics("scanTime").value == 2) assert(scanPlan.metrics("inputWaitTime").value == 3) assert(scanPlan.metrics("outputWaitTime").value == 1) - assert(scanPlan.metrics("outputRows").value == 80000) + assert(scanPlan.metrics("numOutputRows").value == 80000) assert(scanPlan.metrics("outputBytes").value == 2160000) val filterPlan = allGlutenPlans(8) assert(filterPlan.metrics("totalTime").value == 1) assert(filterPlan.metrics("inputWaitTime").value == 13) assert(filterPlan.metrics("outputWaitTime").value == 1) - assert(filterPlan.metrics("outputRows").value == 80000) + assert(filterPlan.metrics("numOutputRows").value == 80000) assert(filterPlan.metrics("outputBytes").value == 2160000) - assert(filterPlan.metrics("inputRows").value == 80000) + assert(filterPlan.metrics("numInputRows").value == 80000) assert(filterPlan.metrics("inputBytes").value == 2160000) val joinPlan = allGlutenPlans(2) assert(joinPlan.metrics("totalTime").value == 1) assert(joinPlan.metrics("inputWaitTime").value == 6) assert(joinPlan.metrics("outputWaitTime").value == 0) - assert(joinPlan.metrics("outputRows").value == 292) + assert(joinPlan.metrics("numOutputRows").value == 292) assert(joinPlan.metrics("outputBytes").value == 16644) - assert(joinPlan.metrics("inputRows").value == 80000) + assert(joinPlan.metrics("numInputRows").value == 80000) assert(joinPlan.metrics("inputBytes").value == 1920000) } @@ -269,9 +269,9 @@ class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite assert(shjPlan.metrics("totalTime").value == 6) assert(shjPlan.metrics("inputWaitTime").value == 5) assert(shjPlan.metrics("outputWaitTime").value == 0) - assert(shjPlan.metrics("outputRows").value == 44) + assert(shjPlan.metrics("numOutputRows").value == 44) assert(shjPlan.metrics("outputBytes").value == 3740) - assert(shjPlan.metrics("inputRows").value == 11985) + assert(shjPlan.metrics("numInputRows").value == 11985) assert(shjPlan.metrics("inputBytes").value == 299625) } } diff --git a/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHParquetReadBenchmark.scala b/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHParquetReadBenchmark.scala index d6e1d314cc24..d2687bd695a3 100644 --- a/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHParquetReadBenchmark.scala +++ b/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHParquetReadBenchmark.scala @@ -98,7 +98,7 @@ object CHParquetReadBenchmark extends SqlBasedBenchmark with CHSqlBasedBenchmark .take(readFileCnt) .map(_.asInstanceOf[FilePartition]) - val numOutputRows = chFileScan.longMetric("outputRows") + val numOutputRows = chFileScan.longMetric("numOutputRows") val numOutputVectors = chFileScan.longMetric("outputVectors") val scanTime = chFileScan.longMetric("scanTime") // Generate Substrait plan diff --git a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/MetricsApiImpl.scala b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/MetricsApiImpl.scala index 2d1eb1315c87..71c2642ff86c 100644 --- a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/MetricsApiImpl.scala +++ b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/MetricsApiImpl.scala @@ -42,7 +42,7 @@ class MetricsApiImpl extends MetricsApi with Logging { Map( "cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count"), "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime of input iterator"), - "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors") ) } @@ -54,12 +54,12 @@ class MetricsApiImpl extends MetricsApi with Logging { override def genBatchScanTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = Map( - "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), + "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"), "inputVectors" -> SQLMetrics.createMetric(sparkContext, "number of input vectors"), "inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of input bytes"), "rawInputRows" -> SQLMetrics.createMetric(sparkContext, "number of raw input rows"), "rawInputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of raw input bytes"), - "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"), "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"), "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime of batch scan"), @@ -91,7 +91,6 @@ class MetricsApiImpl extends MetricsApi with Logging { Map( "rawInputRows" -> SQLMetrics.createMetric(sparkContext, "number of raw input rows"), "rawInputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of raw input bytes"), - "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"), "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"), "scanTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime of scan"), @@ -132,7 +131,6 @@ class MetricsApiImpl extends MetricsApi with Logging { Map( "rawInputRows" -> SQLMetrics.createMetric(sparkContext, "number of raw input rows"), "rawInputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of raw input bytes"), - "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"), "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"), "scanTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime of scan"), @@ -170,7 +168,7 @@ class MetricsApiImpl extends MetricsApi with Logging { override def genFilterTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = Map( - "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"), "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"), "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime of filter"), @@ -186,7 +184,7 @@ class MetricsApiImpl extends MetricsApi with Logging { override def genProjectTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = Map( - "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"), "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"), "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime of project"), @@ -243,7 +241,7 @@ class MetricsApiImpl extends MetricsApi with Logging { override def genExpandTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = Map( - "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"), "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"), "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime of expand"), @@ -285,7 +283,7 @@ class MetricsApiImpl extends MetricsApi with Logging { override def genWindowTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = Map( - "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"), "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"), "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime of window"), @@ -315,7 +313,7 @@ class MetricsApiImpl extends MetricsApi with Logging { override def genLimitTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = Map( - "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"), "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"), "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime of limit"), @@ -340,7 +338,7 @@ class MetricsApiImpl extends MetricsApi with Logging { override def genSortTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = Map( - "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"), "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"), "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime of sort"), diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala index 3ef1eb75bc6f..625ab6e97449 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/BasicScanExecTransformer.scala @@ -67,7 +67,7 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource } def doExecuteColumnarInternal(): RDD[ColumnarBatch] = { - val numOutputRows = longMetric("outputRows") + val numOutputRows = longMetric("numOutputRows") val numOutputVectors = longMetric("outputVectors") val scanTime = longMetric("scanTime") val substraitContext = new SubstraitContext diff --git a/gluten-data/src/main/scala/io/glutenproject/metrics/BatchScanMetricsUpdater.scala b/gluten-data/src/main/scala/io/glutenproject/metrics/BatchScanMetricsUpdater.scala index 0eb8141e5ea3..32f8cd880b9a 100644 --- a/gluten-data/src/main/scala/io/glutenproject/metrics/BatchScanMetricsUpdater.scala +++ b/gluten-data/src/main/scala/io/glutenproject/metrics/BatchScanMetricsUpdater.scala @@ -29,12 +29,12 @@ class BatchScanMetricsUpdater(val metrics: Map[String, SQLMetric]) extends Metri override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = { if (opMetrics != null) { val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics] - metrics("inputRows") += operatorMetrics.inputRows + metrics("numInputRows") += operatorMetrics.inputRows metrics("inputVectors") += operatorMetrics.inputVectors metrics("inputBytes") += operatorMetrics.inputBytes metrics("rawInputRows") += operatorMetrics.rawInputRows metrics("rawInputBytes") += operatorMetrics.rawInputBytes - metrics("outputRows") += operatorMetrics.outputRows + metrics("numOutputRows") += operatorMetrics.outputRows metrics("outputVectors") += operatorMetrics.outputVectors metrics("outputBytes") += operatorMetrics.outputBytes metrics("cpuCount") += operatorMetrics.cpuCount diff --git a/gluten-data/src/main/scala/io/glutenproject/metrics/ExpandMetricsUpdater.scala b/gluten-data/src/main/scala/io/glutenproject/metrics/ExpandMetricsUpdater.scala index 1d57fffd238f..e254469f8ba1 100644 --- a/gluten-data/src/main/scala/io/glutenproject/metrics/ExpandMetricsUpdater.scala +++ b/gluten-data/src/main/scala/io/glutenproject/metrics/ExpandMetricsUpdater.scala @@ -23,7 +23,7 @@ class ExpandMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsU override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = { if (opMetrics != null) { val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics] - metrics("outputRows") += operatorMetrics.outputRows + metrics("numOutputRows") += operatorMetrics.outputRows metrics("outputVectors") += operatorMetrics.outputVectors metrics("outputBytes") += operatorMetrics.outputBytes metrics("cpuCount") += operatorMetrics.cpuCount diff --git a/gluten-data/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala b/gluten-data/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala index cbdb7b9fc41c..ff8b1a576f9e 100644 --- a/gluten-data/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala +++ b/gluten-data/src/main/scala/io/glutenproject/metrics/FileSourceScanMetricsUpdater.scala @@ -28,7 +28,7 @@ class FileSourceScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric val rawInputRows: SQLMetric = metrics("rawInputRows") val rawInputBytes: SQLMetric = metrics("rawInputBytes") - val outputRows: SQLMetric = metrics("outputRows") + val outputRows: SQLMetric = metrics("numOutputRows") val outputVectors: SQLMetric = metrics("outputVectors") val outputBytes: SQLMetric = metrics("outputBytes") val wallNanos: SQLMetric = metrics("wallNanos") diff --git a/gluten-data/src/main/scala/io/glutenproject/metrics/FilterMetricsUpdater.scala b/gluten-data/src/main/scala/io/glutenproject/metrics/FilterMetricsUpdater.scala index f9e95875aa50..f29931023765 100644 --- a/gluten-data/src/main/scala/io/glutenproject/metrics/FilterMetricsUpdater.scala +++ b/gluten-data/src/main/scala/io/glutenproject/metrics/FilterMetricsUpdater.scala @@ -23,7 +23,7 @@ class FilterMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsU override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = { if (opMetrics != null) { val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics] - metrics("outputRows") += operatorMetrics.outputRows + metrics("numOutputRows") += operatorMetrics.outputRows metrics("outputVectors") += operatorMetrics.outputVectors metrics("outputBytes") += operatorMetrics.outputBytes metrics("cpuCount") += operatorMetrics.cpuCount diff --git a/gluten-data/src/main/scala/io/glutenproject/metrics/HiveTableScanMetricsUpdater.scala b/gluten-data/src/main/scala/io/glutenproject/metrics/HiveTableScanMetricsUpdater.scala index 26ad731e69f8..b7a858d44bdf 100644 --- a/gluten-data/src/main/scala/io/glutenproject/metrics/HiveTableScanMetricsUpdater.scala +++ b/gluten-data/src/main/scala/io/glutenproject/metrics/HiveTableScanMetricsUpdater.scala @@ -23,7 +23,7 @@ class HiveTableScanMetricsUpdater(@transient val metrics: Map[String, SQLMetric] extends MetricsUpdater { val rawInputRows: SQLMetric = metrics("rawInputRows") val rawInputBytes: SQLMetric = metrics("rawInputBytes") - val outputRows: SQLMetric = metrics("outputRows") + val outputRows: SQLMetric = metrics("numOutputRows") val outputVectors: SQLMetric = metrics("outputVectors") val outputBytes: SQLMetric = metrics("outputBytes") val wallNanos: SQLMetric = metrics("wallNanos") diff --git a/gluten-data/src/main/scala/io/glutenproject/metrics/InputIteratorMetricsUpdater.scala b/gluten-data/src/main/scala/io/glutenproject/metrics/InputIteratorMetricsUpdater.scala index 2b3967ae3ef2..87ca348faeeb 100644 --- a/gluten-data/src/main/scala/io/glutenproject/metrics/InputIteratorMetricsUpdater.scala +++ b/gluten-data/src/main/scala/io/glutenproject/metrics/InputIteratorMetricsUpdater.scala @@ -26,10 +26,10 @@ case class InputIteratorMetricsUpdater(metrics: Map[String, SQLMetric]) extends if (operatorMetrics.outputRows == 0 && operatorMetrics.outputVectors == 0) { // Sometimes, velox does not update metrics for intermediate operator, // here we try to use the input metrics - metrics("outputRows") += operatorMetrics.inputRows + metrics("numOutputRows") += operatorMetrics.inputRows metrics("outputVectors") += operatorMetrics.inputVectors } else { - metrics("outputRows") += operatorMetrics.outputRows + metrics("numOutputRows") += operatorMetrics.outputRows metrics("outputVectors") += operatorMetrics.outputVectors } } diff --git a/gluten-data/src/main/scala/io/glutenproject/metrics/LimitMetricsUpdater.scala b/gluten-data/src/main/scala/io/glutenproject/metrics/LimitMetricsUpdater.scala index f61f7443f7e7..a3ab24637a5c 100644 --- a/gluten-data/src/main/scala/io/glutenproject/metrics/LimitMetricsUpdater.scala +++ b/gluten-data/src/main/scala/io/glutenproject/metrics/LimitMetricsUpdater.scala @@ -23,7 +23,7 @@ class LimitMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsUp override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = { if (opMetrics != null) { val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics] - metrics("outputRows") += operatorMetrics.outputRows + metrics("numOutputRows") += operatorMetrics.outputRows metrics("outputVectors") += operatorMetrics.outputVectors metrics("outputBytes") += operatorMetrics.outputBytes metrics("cpuCount") += operatorMetrics.cpuCount diff --git a/gluten-data/src/main/scala/io/glutenproject/metrics/ProjectMetricsUpdater.scala b/gluten-data/src/main/scala/io/glutenproject/metrics/ProjectMetricsUpdater.scala index b2bb1796114b..03b41202ead1 100644 --- a/gluten-data/src/main/scala/io/glutenproject/metrics/ProjectMetricsUpdater.scala +++ b/gluten-data/src/main/scala/io/glutenproject/metrics/ProjectMetricsUpdater.scala @@ -23,7 +23,7 @@ class ProjectMetricsUpdater(val metrics: Map[String, SQLMetric]) extends Metrics override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = { if (opMetrics != null) { val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics] - metrics("outputRows") += operatorMetrics.outputRows + metrics("numOutputRows") += operatorMetrics.outputRows metrics("outputVectors") += operatorMetrics.outputVectors metrics("outputBytes") += operatorMetrics.outputBytes metrics("cpuCount") += operatorMetrics.cpuCount diff --git a/gluten-data/src/main/scala/io/glutenproject/metrics/SortMetricsUpdater.scala b/gluten-data/src/main/scala/io/glutenproject/metrics/SortMetricsUpdater.scala index 2351b1c9f9f9..38414002db91 100644 --- a/gluten-data/src/main/scala/io/glutenproject/metrics/SortMetricsUpdater.scala +++ b/gluten-data/src/main/scala/io/glutenproject/metrics/SortMetricsUpdater.scala @@ -23,7 +23,7 @@ class SortMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsUpd override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = { if (opMetrics != null) { val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics] - metrics("outputRows") += operatorMetrics.outputRows + metrics("numOutputRows") += operatorMetrics.outputRows metrics("outputVectors") += operatorMetrics.outputVectors metrics("outputBytes") += operatorMetrics.outputBytes metrics("cpuCount") += operatorMetrics.cpuCount diff --git a/gluten-data/src/main/scala/io/glutenproject/metrics/WindowMetricsUpdater.scala b/gluten-data/src/main/scala/io/glutenproject/metrics/WindowMetricsUpdater.scala index 032d7aa1b27e..7b962294ed1d 100644 --- a/gluten-data/src/main/scala/io/glutenproject/metrics/WindowMetricsUpdater.scala +++ b/gluten-data/src/main/scala/io/glutenproject/metrics/WindowMetricsUpdater.scala @@ -23,7 +23,7 @@ class WindowMetricsUpdater(val metrics: Map[String, SQLMetric]) extends MetricsU override def updateNativeMetrics(opMetrics: IOperatorMetrics): Unit = { if (opMetrics != null) { val operatorMetrics = opMetrics.asInstanceOf[OperatorMetrics] - metrics("outputRows") += operatorMetrics.outputRows + metrics("numOutputRows") += operatorMetrics.outputRows metrics("outputVectors") += operatorMetrics.outputVectors metrics("outputBytes") += operatorMetrics.outputBytes metrics("cpuCount") += operatorMetrics.cpuCount diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSQLQuerySuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSQLQuerySuite.scala index 7199ddd70143..bd9699e4008c 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSQLQuerySuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenSQLQuerySuite.scala @@ -128,4 +128,30 @@ class GlutenSQLQuerySuite extends SQLQuerySuite with GlutenSQLTestsTrait { "Escape character must be followed by '%', '_' or the escape character itself")) } } + + testGluten("StreamingQueryProgress.numInputRows should be correct") { + withTempDir { + dir => + val path = dir.toURI.getPath + val numRows = 20 + val df = spark.range(0, numRows) + df.write.mode("overwrite").format("parquet").save(path) + val q = spark.readStream + .format("parquet") + .schema(df.schema) + .load(path) + .writeStream + .format("memory") + .queryName("test") + .start() + q.processAllAvailable + val inputOutputPairs = q.recentProgress.map(p => (p.numInputRows, p.sink.numOutputRows)) + + // numInputRows and sink.numOutputRows must be the same + assert(inputOutputPairs.forall(x => x._1 == x._2)) + + // Sum of numInputRows must match the total number of rows of the input + assert(inputOutputPairs.map(_._1).sum == numRows) + } + } }