diff --git a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/Parameterized.scala b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/Parameterized.scala index b4f7a53943e4..6fc4e66d6f05 100644 --- a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/Parameterized.scala +++ b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/Parameterized.scala @@ -25,7 +25,7 @@ import org.apache.gluten.integration.tpc.action.Actions.QuerySelector import scala.collection.immutable.Map import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.{ArrayBuffer, ListBuffer} class Parameterized( scale: Double, @@ -189,30 +189,15 @@ case class TestResultLine( succeed: Boolean, coordinate: Coordinate, rowCount: Option[Long], + planningTimeMillis: Option[Long], executionTimeMillis: Option[Long], metrics: Map[String, Long], errorMessage: Option[String]) -case class TestResultLines( - dimNames: Seq[String], - metricNames: Seq[String], - lines: Iterable[TestResultLine]) { - def print(): Unit = { - var fmt = "|%15s|%15s" - for (_ <- dimNames.indices) { - fmt = fmt + "|%20s" - } - for (_ <- metricNames.indices) { - fmt = fmt + "|%35s" - } - fmt = fmt + "|%30s|%30s|\n" - val fields = ArrayBuffer[String]("Query ID", "Succeed") - dimNames.foreach(dimName => fields.append(dimName)) - metricNames.foreach(metricName => fields.append(metricName)) - fields.append("Row Count") - fields.append("Query Time (Millis)") - printf(fmt, fields: _*) - lines.foreach { line => +object TestResultLine { + class Parser(dimNames: Seq[String], metricNames: Seq[String]) + extends TableFormatter.RowParser[TestResultLine] { + override def parse(line: TestResultLine): Seq[Any] = { val values = ArrayBuffer[Any](line.queryId, line.succeed) dimNames.foreach { dimName => val coordinate = line.coordinate.coordinate @@ -226,9 +211,32 @@ case class TestResultLines( values.append(metrics.getOrElse(metricName, "N/A")) } values.append(line.rowCount.getOrElse("N/A")) + values.append(line.planningTimeMillis.getOrElse("N/A")) values.append(line.executionTimeMillis.getOrElse("N/A")) - printf(fmt, values: _*) + values + } + } +} + +case class TestResultLines( + dimNames: Seq[String], + metricNames: Seq[String], + lines: Iterable[TestResultLine]) { + def print(): Unit = { + val fields = ListBuffer[String]("Query ID", "Succeed") + dimNames.foreach(dimName => fields.append(dimName)) + metricNames.foreach(metricName => fields.append(metricName)) + fields.append("Row Count") + fields.append("Planning Time (Millis)") + fields.append("Query Time (Millis)") + val formatter = TableFormatter.create[TestResultLine](fields: _*)( + new TestResultLine.Parser(dimNames, metricNames)) + + lines.foreach { line => + formatter.appendRow(line) } + + formatter.print(System.out) } } @@ -257,6 +265,7 @@ object Parameterized { succeed = true, coordinate, Some(resultRows.length), + Some(result.planningTimeMillis), Some(result.executionTimeMillis), result.metrics, None) @@ -266,7 +275,7 @@ object Parameterized { println( s"Error running query $id. " + s" Error: ${error.get}") - TestResultLine(id, succeed = false, coordinate, None, None, Map.empty, error) + TestResultLine(id, succeed = false, coordinate, None, None, None, Map.empty, error) } } diff --git a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/Queries.scala b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/Queries.scala index c5f883189d29..edeb960fcba9 100644 --- a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/Queries.scala +++ b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/Queries.scala @@ -100,24 +100,36 @@ object Queries { queryId: String, testPassed: Boolean, rowCount: Option[Long], + planningTimeMillis: Option[Long], executionTimeMillis: Option[Long], errorMessage: Option[String]) + object TestResultLine { + implicit object Parser extends TableFormatter.RowParser[TestResultLine] { + override def parse(line: TestResultLine): Seq[Any] = { + Seq( + line.queryId, + line.testPassed, + line.rowCount.getOrElse("N/A"), + line.planningTimeMillis.getOrElse("N/A"), + line.executionTimeMillis.getOrElse("N/A")) + } + } + } + private def printResults(results: List[TestResultLine]): Unit = { - printf( - "|%15s|%15s|%30s|%30s|\n", + val formatter = TableFormatter.create[TestResultLine]( "Query ID", "Was Passed", "Row Count", + "Plan Time (Millis)", "Query Time (Millis)") + results.foreach { line => - printf( - "|%15s|%15s|%30s|%30s|\n", - line.queryId, - line.testPassed, - line.rowCount.getOrElse("N/A"), - line.executionTimeMillis.getOrElse("N/A")) + formatter.appendRow(line) } + + formatter.print(System.out) } private def aggregate(succeed: List[TestResultLine], name: String): List[TestResultLine] = { @@ -132,6 +144,9 @@ object Queries { if (r1.rowCount.nonEmpty && r2.rowCount.nonEmpty) Some(r1.rowCount.get + r2.rowCount.get) else None, + if (r1.planningTimeMillis.nonEmpty && r2.planningTimeMillis.nonEmpty) + Some(r1.planningTimeMillis.get + r2.planningTimeMillis.get) + else None, if (r1.executionTimeMillis.nonEmpty && r2.executionTimeMillis.nonEmpty) Some(r1.executionTimeMillis.get + r2.executionTimeMillis.get) else None, @@ -164,6 +179,7 @@ object Queries { id, testPassed = true, Some(resultRows.length), + Some(result.planningTimeMillis), Some(result.executionTimeMillis), None) } catch { @@ -172,7 +188,7 @@ object Queries { println( s"Error running query $id. " + s" Error: ${error.get}") - TestResultLine(id, testPassed = false, None, None, error) + TestResultLine(id, testPassed = false, None, None, None, error) } } } diff --git a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/QueriesCompare.scala b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/QueriesCompare.scala index 5e8e2d6136f7..cfb3e7dc5378 100644 --- a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/QueriesCompare.scala +++ b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/QueriesCompare.scala @@ -101,37 +101,52 @@ object QueriesCompare { testPassed: Boolean, expectedRowCount: Option[Long], actualRowCount: Option[Long], + expectedPlanningTimeMillis: Option[Long], + actualPlanningTimeMillis: Option[Long], expectedExecutionTimeMillis: Option[Long], actualExecutionTimeMillis: Option[Long], errorMessage: Option[String]) + object TestResultLine { + implicit object Parser extends TableFormatter.RowParser[TestResultLine] { + override def parse(line: TestResultLine): Seq[Any] = { + val timeVariation = + if (line.expectedExecutionTimeMillis.nonEmpty && line.actualExecutionTimeMillis.nonEmpty) { + Some( + ((line.expectedExecutionTimeMillis.get - line.actualExecutionTimeMillis.get).toDouble + / line.actualExecutionTimeMillis.get.toDouble) * 100) + } else None + Seq( + line.queryId, + line.testPassed, + line.expectedRowCount.getOrElse("N/A"), + line.actualRowCount.getOrElse("N/A"), + line.expectedPlanningTimeMillis.getOrElse("N/A"), + line.actualPlanningTimeMillis.getOrElse("N/A"), + line.expectedExecutionTimeMillis.getOrElse("N/A"), + line.actualExecutionTimeMillis.getOrElse("N/A"), + timeVariation.map("%15.2f%%".format(_)).getOrElse("N/A")) + } + } + } + private def printResults(results: List[TestResultLine]): Unit = { - printf( - "|%15s|%15s|%30s|%30s|%30s|%30s|%30s|\n", + val formatter = TableFormatter.create[TestResultLine]( "Query ID", "Was Passed", "Expected Row Count", "Actual Row Count", + "Baseline Planning Time (Millis)", + "Planning Time (Millis)", "Baseline Query Time (Millis)", "Query Time (Millis)", "Query Time Variation") + results.foreach { line => - val timeVariation = - if (line.expectedExecutionTimeMillis.nonEmpty && line.actualExecutionTimeMillis.nonEmpty) { - Some( - ((line.expectedExecutionTimeMillis.get - line.actualExecutionTimeMillis.get).toDouble - / line.actualExecutionTimeMillis.get.toDouble) * 100) - } else None - printf( - "|%15s|%15s|%30s|%30s|%30s|%30s|%30s|\n", - line.queryId, - line.testPassed, - line.expectedRowCount.getOrElse("N/A"), - line.actualRowCount.getOrElse("N/A"), - line.expectedExecutionTimeMillis.getOrElse("N/A"), - line.actualExecutionTimeMillis.getOrElse("N/A"), - timeVariation.map("%15.2f%%".format(_)).getOrElse("N/A")) + formatter.appendRow(line) } + + formatter.print(System.out) } private def aggregate(succeed: List[TestResultLine], name: String): List[TestResultLine] = { @@ -149,6 +164,12 @@ object QueriesCompare { if (r1.actualRowCount.nonEmpty && r2.actualRowCount.nonEmpty) Some(r1.actualRowCount.get + r2.actualRowCount.get) else None, + if (r1.expectedPlanningTimeMillis.nonEmpty && r2.expectedPlanningTimeMillis.nonEmpty) + Some(r1.expectedPlanningTimeMillis.get + r2.expectedPlanningTimeMillis.get) + else None, + if (r1.actualPlanningTimeMillis.nonEmpty && r2.actualPlanningTimeMillis.nonEmpty) + Some(r1.actualPlanningTimeMillis.get + r2.actualPlanningTimeMillis.get) + else None, if (r1.expectedExecutionTimeMillis.nonEmpty && r2.expectedExecutionTimeMillis.nonEmpty) Some(r1.expectedExecutionTimeMillis.get + r2.expectedExecutionTimeMillis.get) else None, @@ -187,6 +208,8 @@ object QueriesCompare { testPassed = true, Some(expectedRows.length), Some(resultRows.length), + Some(expected.planningTimeMillis), + Some(result.planningTimeMillis), Some(expected.executionTimeMillis), Some(result.executionTimeMillis), None) @@ -198,6 +221,8 @@ object QueriesCompare { testPassed = false, Some(expectedRows.length), Some(resultRows.length), + Some(expected.planningTimeMillis), + Some(result.planningTimeMillis), Some(expected.executionTimeMillis), Some(result.executionTimeMillis), error) @@ -207,7 +232,7 @@ object QueriesCompare { println( s"Error running query $id. " + s" Error: ${error.get}") - TestResultLine(id, testPassed = false, None, None, None, None, error) + TestResultLine(id, testPassed = false, None, None, None, None, None, None, error) } } } diff --git a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/TableFormatter.scala b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/TableFormatter.scala new file mode 100644 index 000000000000..cb6ab7ebd056 --- /dev/null +++ b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/tpc/action/TableFormatter.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gluten.integration.tpc.action + +import java.io.{OutputStream, PrintStream} +import scala.collection.mutable + +trait TableFormatter[ROW <: Any] { + import TableFormatter._ + def appendRow(row: ROW): Unit + def print(s: OutputStream): Unit +} + +object TableFormatter { + def create[ROW <: Any](fields: String*)( + implicit parser: RowParser[ROW]): TableFormatter[ROW] = { + assert(fields.nonEmpty) + new Impl[ROW](Schema(fields), parser) + } + + private case class Schema(fields: Seq[String]) + + private class Impl[ROW <: Any](schema: Schema, parser: RowParser[ROW]) + extends TableFormatter[ROW] { + private val rows = mutable.ListBuffer[Seq[String]]() + + override def appendRow(row: ROW): Unit = { + val parsed = parser.parse(row) + assert(parsed.size == schema.fields.size) + rows += parsed.map(_.toString) + } + + override def print(s: OutputStream): Unit = { + val numFields = schema.fields.size + val widths = (0 until numFields) + .map { i => + rows.map(_(i).length).max max schema.fields(i).length + } + .map(_ + 1) + val pBuilder = StringBuilder.newBuilder + pBuilder ++= "|" + widths.foreach { w => + pBuilder ++= s"%${w}s|" + } + val pattern = pBuilder.toString() + val printer = new PrintStream(s) + printer.println(String.format(pattern, schema.fields: _*)) + rows.foreach { r => + printer.println(String.format(pattern, r: _*)) + } + printer.flush() + printer.close() + } + } + + trait RowParser[ROW <: Any] { + def parse(row: ROW): Seq[Any] + } +} diff --git a/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/QueryRunner.scala b/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/QueryRunner.scala index 332e56043c45..a5b699a1ae48 100644 --- a/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/QueryRunner.scala +++ b/tools/gluten-it/common/src/main/scala/org/apache/spark/sql/QueryRunner.scala @@ -89,9 +89,11 @@ object QueryRunner { if (explain) { df.explain(extended = true) } - val millis = (System.nanoTime() - prev) / 1000000L + val planMillis = + df.queryExecution.tracker.phases.values.map(p => p.endTimeMs - p.startTimeMs).sum + val totalMillis = (System.nanoTime() - prev) / 1000000L val collectedMetrics = metrics.map(name => (name, em.getMetricValue(name))).toMap - RunResult(rows, millis, collectedMetrics) + RunResult(rows, planMillis, totalMillis - planMillis, collectedMetrics) } finally { sc.removeSparkListener(metricsListener) killTaskListener.foreach(l => { @@ -124,7 +126,11 @@ object QueryRunner { } -case class RunResult(rows: Seq[Row], executionTimeMillis: Long, metrics: Map[String, Long]) +case class RunResult( + rows: Seq[Row], + planningTimeMillis: Long, + executionTimeMillis: Long, + metrics: Map[String, Long]) class MetricsListener(em: ExecutorMetrics) extends SparkListener { override def onExecutorMetricsUpdate(