Skip to content

Commit

Permalink
[VL] CI: Gluten-it: Print planning time as well as execution time in …
Browse files Browse the repository at this point in the history
…test report (apache#5616)
  • Loading branch information
zhztheplayer authored May 7, 2024
1 parent 72e787c commit 717b263
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.gluten.integration.tpc.action.Actions.QuerySelector

import scala.collection.immutable.Map
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.{ArrayBuffer, ListBuffer}

class Parameterized(
scale: Double,
Expand Down Expand Up @@ -189,30 +189,15 @@ case class TestResultLine(
succeed: Boolean,
coordinate: Coordinate,
rowCount: Option[Long],
planningTimeMillis: Option[Long],
executionTimeMillis: Option[Long],
metrics: Map[String, Long],
errorMessage: Option[String])

case class TestResultLines(
dimNames: Seq[String],
metricNames: Seq[String],
lines: Iterable[TestResultLine]) {
def print(): Unit = {
var fmt = "|%15s|%15s"
for (_ <- dimNames.indices) {
fmt = fmt + "|%20s"
}
for (_ <- metricNames.indices) {
fmt = fmt + "|%35s"
}
fmt = fmt + "|%30s|%30s|\n"
val fields = ArrayBuffer[String]("Query ID", "Succeed")
dimNames.foreach(dimName => fields.append(dimName))
metricNames.foreach(metricName => fields.append(metricName))
fields.append("Row Count")
fields.append("Query Time (Millis)")
printf(fmt, fields: _*)
lines.foreach { line =>
object TestResultLine {
class Parser(dimNames: Seq[String], metricNames: Seq[String])
extends TableFormatter.RowParser[TestResultLine] {
override def parse(line: TestResultLine): Seq[Any] = {
val values = ArrayBuffer[Any](line.queryId, line.succeed)
dimNames.foreach { dimName =>
val coordinate = line.coordinate.coordinate
Expand All @@ -226,9 +211,32 @@ case class TestResultLines(
values.append(metrics.getOrElse(metricName, "N/A"))
}
values.append(line.rowCount.getOrElse("N/A"))
values.append(line.planningTimeMillis.getOrElse("N/A"))
values.append(line.executionTimeMillis.getOrElse("N/A"))
printf(fmt, values: _*)
values
}
}
}

case class TestResultLines(
dimNames: Seq[String],
metricNames: Seq[String],
lines: Iterable[TestResultLine]) {
def print(): Unit = {
val fields = ListBuffer[String]("Query ID", "Succeed")
dimNames.foreach(dimName => fields.append(dimName))
metricNames.foreach(metricName => fields.append(metricName))
fields.append("Row Count")
fields.append("Planning Time (Millis)")
fields.append("Query Time (Millis)")
val formatter = TableFormatter.create[TestResultLine](fields: _*)(
new TestResultLine.Parser(dimNames, metricNames))

lines.foreach { line =>
formatter.appendRow(line)
}

formatter.print(System.out)
}
}

Expand Down Expand Up @@ -257,6 +265,7 @@ object Parameterized {
succeed = true,
coordinate,
Some(resultRows.length),
Some(result.planningTimeMillis),
Some(result.executionTimeMillis),
result.metrics,
None)
Expand All @@ -266,7 +275,7 @@ object Parameterized {
println(
s"Error running query $id. " +
s" Error: ${error.get}")
TestResultLine(id, succeed = false, coordinate, None, None, Map.empty, error)
TestResultLine(id, succeed = false, coordinate, None, None, None, Map.empty, error)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,24 +100,36 @@ object Queries {
queryId: String,
testPassed: Boolean,
rowCount: Option[Long],
planningTimeMillis: Option[Long],
executionTimeMillis: Option[Long],
errorMessage: Option[String])

object TestResultLine {
implicit object Parser extends TableFormatter.RowParser[TestResultLine] {
override def parse(line: TestResultLine): Seq[Any] = {
Seq(
line.queryId,
line.testPassed,
line.rowCount.getOrElse("N/A"),
line.planningTimeMillis.getOrElse("N/A"),
line.executionTimeMillis.getOrElse("N/A"))
}
}
}

private def printResults(results: List[TestResultLine]): Unit = {
printf(
"|%15s|%15s|%30s|%30s|\n",
val formatter = TableFormatter.create[TestResultLine](
"Query ID",
"Was Passed",
"Row Count",
"Plan Time (Millis)",
"Query Time (Millis)")

results.foreach { line =>
printf(
"|%15s|%15s|%30s|%30s|\n",
line.queryId,
line.testPassed,
line.rowCount.getOrElse("N/A"),
line.executionTimeMillis.getOrElse("N/A"))
formatter.appendRow(line)
}

formatter.print(System.out)
}

private def aggregate(succeed: List[TestResultLine], name: String): List[TestResultLine] = {
Expand All @@ -132,6 +144,9 @@ object Queries {
if (r1.rowCount.nonEmpty && r2.rowCount.nonEmpty)
Some(r1.rowCount.get + r2.rowCount.get)
else None,
if (r1.planningTimeMillis.nonEmpty && r2.planningTimeMillis.nonEmpty)
Some(r1.planningTimeMillis.get + r2.planningTimeMillis.get)
else None,
if (r1.executionTimeMillis.nonEmpty && r2.executionTimeMillis.nonEmpty)
Some(r1.executionTimeMillis.get + r2.executionTimeMillis.get)
else None,
Expand Down Expand Up @@ -164,6 +179,7 @@ object Queries {
id,
testPassed = true,
Some(resultRows.length),
Some(result.planningTimeMillis),
Some(result.executionTimeMillis),
None)
} catch {
Expand All @@ -172,7 +188,7 @@ object Queries {
println(
s"Error running query $id. " +
s" Error: ${error.get}")
TestResultLine(id, testPassed = false, None, None, error)
TestResultLine(id, testPassed = false, None, None, None, error)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,37 +101,52 @@ object QueriesCompare {
testPassed: Boolean,
expectedRowCount: Option[Long],
actualRowCount: Option[Long],
expectedPlanningTimeMillis: Option[Long],
actualPlanningTimeMillis: Option[Long],
expectedExecutionTimeMillis: Option[Long],
actualExecutionTimeMillis: Option[Long],
errorMessage: Option[String])

object TestResultLine {
implicit object Parser extends TableFormatter.RowParser[TestResultLine] {
override def parse(line: TestResultLine): Seq[Any] = {
val timeVariation =
if (line.expectedExecutionTimeMillis.nonEmpty && line.actualExecutionTimeMillis.nonEmpty) {
Some(
((line.expectedExecutionTimeMillis.get - line.actualExecutionTimeMillis.get).toDouble
/ line.actualExecutionTimeMillis.get.toDouble) * 100)
} else None
Seq(
line.queryId,
line.testPassed,
line.expectedRowCount.getOrElse("N/A"),
line.actualRowCount.getOrElse("N/A"),
line.expectedPlanningTimeMillis.getOrElse("N/A"),
line.actualPlanningTimeMillis.getOrElse("N/A"),
line.expectedExecutionTimeMillis.getOrElse("N/A"),
line.actualExecutionTimeMillis.getOrElse("N/A"),
timeVariation.map("%15.2f%%".format(_)).getOrElse("N/A"))
}
}
}

private def printResults(results: List[TestResultLine]): Unit = {
printf(
"|%15s|%15s|%30s|%30s|%30s|%30s|%30s|\n",
val formatter = TableFormatter.create[TestResultLine](
"Query ID",
"Was Passed",
"Expected Row Count",
"Actual Row Count",
"Baseline Planning Time (Millis)",
"Planning Time (Millis)",
"Baseline Query Time (Millis)",
"Query Time (Millis)",
"Query Time Variation")

results.foreach { line =>
val timeVariation =
if (line.expectedExecutionTimeMillis.nonEmpty && line.actualExecutionTimeMillis.nonEmpty) {
Some(
((line.expectedExecutionTimeMillis.get - line.actualExecutionTimeMillis.get).toDouble
/ line.actualExecutionTimeMillis.get.toDouble) * 100)
} else None
printf(
"|%15s|%15s|%30s|%30s|%30s|%30s|%30s|\n",
line.queryId,
line.testPassed,
line.expectedRowCount.getOrElse("N/A"),
line.actualRowCount.getOrElse("N/A"),
line.expectedExecutionTimeMillis.getOrElse("N/A"),
line.actualExecutionTimeMillis.getOrElse("N/A"),
timeVariation.map("%15.2f%%".format(_)).getOrElse("N/A"))
formatter.appendRow(line)
}

formatter.print(System.out)
}

private def aggregate(succeed: List[TestResultLine], name: String): List[TestResultLine] = {
Expand All @@ -149,6 +164,12 @@ object QueriesCompare {
if (r1.actualRowCount.nonEmpty && r2.actualRowCount.nonEmpty)
Some(r1.actualRowCount.get + r2.actualRowCount.get)
else None,
if (r1.expectedPlanningTimeMillis.nonEmpty && r2.expectedPlanningTimeMillis.nonEmpty)
Some(r1.expectedPlanningTimeMillis.get + r2.expectedPlanningTimeMillis.get)
else None,
if (r1.actualPlanningTimeMillis.nonEmpty && r2.actualPlanningTimeMillis.nonEmpty)
Some(r1.actualPlanningTimeMillis.get + r2.actualPlanningTimeMillis.get)
else None,
if (r1.expectedExecutionTimeMillis.nonEmpty && r2.expectedExecutionTimeMillis.nonEmpty)
Some(r1.expectedExecutionTimeMillis.get + r2.expectedExecutionTimeMillis.get)
else None,
Expand Down Expand Up @@ -187,6 +208,8 @@ object QueriesCompare {
testPassed = true,
Some(expectedRows.length),
Some(resultRows.length),
Some(expected.planningTimeMillis),
Some(result.planningTimeMillis),
Some(expected.executionTimeMillis),
Some(result.executionTimeMillis),
None)
Expand All @@ -198,6 +221,8 @@ object QueriesCompare {
testPassed = false,
Some(expectedRows.length),
Some(resultRows.length),
Some(expected.planningTimeMillis),
Some(result.planningTimeMillis),
Some(expected.executionTimeMillis),
Some(result.executionTimeMillis),
error)
Expand All @@ -207,7 +232,7 @@ object QueriesCompare {
println(
s"Error running query $id. " +
s" Error: ${error.get}")
TestResultLine(id, testPassed = false, None, None, None, None, error)
TestResultLine(id, testPassed = false, None, None, None, None, None, None, error)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gluten.integration.tpc.action

import java.io.{OutputStream, PrintStream}
import scala.collection.mutable

trait TableFormatter[ROW <: Any] {
import TableFormatter._
def appendRow(row: ROW): Unit
def print(s: OutputStream): Unit
}

object TableFormatter {
def create[ROW <: Any](fields: String*)(
implicit parser: RowParser[ROW]): TableFormatter[ROW] = {
assert(fields.nonEmpty)
new Impl[ROW](Schema(fields), parser)
}

private case class Schema(fields: Seq[String])

private class Impl[ROW <: Any](schema: Schema, parser: RowParser[ROW])
extends TableFormatter[ROW] {
private val rows = mutable.ListBuffer[Seq[String]]()

override def appendRow(row: ROW): Unit = {
val parsed = parser.parse(row)
assert(parsed.size == schema.fields.size)
rows += parsed.map(_.toString)
}

override def print(s: OutputStream): Unit = {
val numFields = schema.fields.size
val widths = (0 until numFields)
.map { i =>
rows.map(_(i).length).max max schema.fields(i).length
}
.map(_ + 1)
val pBuilder = StringBuilder.newBuilder
pBuilder ++= "|"
widths.foreach { w =>
pBuilder ++= s"%${w}s|"
}
val pattern = pBuilder.toString()
val printer = new PrintStream(s)
printer.println(String.format(pattern, schema.fields: _*))
rows.foreach { r =>
printer.println(String.format(pattern, r: _*))
}
printer.flush()
printer.close()
}
}

trait RowParser[ROW <: Any] {
def parse(row: ROW): Seq[Any]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,11 @@ object QueryRunner {
if (explain) {
df.explain(extended = true)
}
val millis = (System.nanoTime() - prev) / 1000000L
val planMillis =
df.queryExecution.tracker.phases.values.map(p => p.endTimeMs - p.startTimeMs).sum
val totalMillis = (System.nanoTime() - prev) / 1000000L
val collectedMetrics = metrics.map(name => (name, em.getMetricValue(name))).toMap
RunResult(rows, millis, collectedMetrics)
RunResult(rows, planMillis, totalMillis - planMillis, collectedMetrics)
} finally {
sc.removeSparkListener(metricsListener)
killTaskListener.foreach(l => {
Expand Down Expand Up @@ -124,7 +126,11 @@ object QueryRunner {

}

case class RunResult(rows: Seq[Row], executionTimeMillis: Long, metrics: Map[String, Long])
case class RunResult(
rows: Seq[Row],
planningTimeMillis: Long,
executionTimeMillis: Long,
metrics: Map[String, Long])

class MetricsListener(em: ExecutorMetrics) extends SparkListener {
override def onExecutorMetricsUpdate(
Expand Down

0 comments on commit 717b263

Please sign in to comment.