Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] Gluten-it: Improve test report table format for parameterized test #6052

Merged
merged 4 commits into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public class Parameterized implements Callable<Integer> {

@Override
public Integer call() throws Exception {
final Map<String, Map<String, List<Map.Entry<String, String>>>> parsed = new HashMap<>();
final Map<String, Map<String, List<Map.Entry<String, String>>>> parsed = new LinkedHashMap<>();

final Seq<scala.collection.immutable.Set<DimKv>> excludedCombinations = JavaConverters.asScalaBufferConverter(Arrays.stream(excludedDims).map(d -> {
final Matcher m = excludedDimsPattern.matcher(d);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ package org.apache.gluten.integration.action

import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.gluten.integration.action.Actions.QuerySelector
import org.apache.gluten.integration.action.TableRender.Field
import org.apache.gluten.integration.action.TableRender.RowParser.FieldAppender.RowAppender
import org.apache.gluten.integration.stat.RamStat
import org.apache.gluten.integration.{QueryRunner, Suite, TableCreator}
import org.apache.spark.sql.ConfUtils.ConfImplicits._
import org.apache.spark.sql.SparkSessionSwitcher

import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable
import scala.collection.mutable.ListBuffer

Expand All @@ -39,6 +41,8 @@ class Parameterized(
metrics: Array[String])
extends Action {

validateDims(configDimensions)

private def validateDims(configDimensions: Seq[Dim]): Unit = {
if (configDimensions
.map(dim => {
Expand All @@ -57,32 +61,33 @@ class Parameterized(
}

private val coordinates: mutable.LinkedHashMap[Coordinate, Seq[(String, String)]] = {
validateDims(configDimensions)
val dimCount = configDimensions.size
val coordinateMap = mutable.LinkedHashMap[Coordinate, Seq[(String, String)]]()
val nextId: AtomicInteger = new AtomicInteger(1);

def fillCoordinates(
dimOffset: Int,
intermediateCoordinates: Map[String, String],
intermediateCoordinate: Map[String, String],
intermediateConf: Seq[(String, String)]): Unit = {
if (dimOffset == dimCount) {
// we got one coordinate
excludedCombinations.foreach { ec: Set[DimKv] =>
if (ec.forall { kv =>
intermediateCoordinates.contains(kv.k) && intermediateCoordinates(kv.k) == kv.v
intermediateCoordinate.contains(kv.k) && intermediateCoordinate(kv.k) == kv.v
}) {
println(s"Coordinate ${Coordinate(intermediateCoordinates)} excluded by $ec.")
println(s"Coordinate ${intermediateCoordinate} excluded by $ec.")
return
}
}
coordinateMap(Coordinate(intermediateCoordinates)) = intermediateConf
coordinateMap(Coordinate(nextId.getAndIncrement(), intermediateCoordinate)) =
intermediateConf
return
}
val dim = configDimensions(dimOffset)
dim.dimValues.foreach { dimValue =>
fillCoordinates(
dimOffset + 1,
intermediateCoordinates + (dim.name -> dimValue.name),
intermediateCoordinate + (dim.name -> dimValue.name),
intermediateConf ++ dimValue.conf)
}
}
Expand All @@ -95,7 +100,6 @@ class Parameterized(
override def execute(suite: Suite): Boolean = {
val runner: QueryRunner =
new QueryRunner(suite.queryResource(), suite.dataWritePath(scale, genPartitionedData))
val allQueries = suite.allQueryIds()

val sessionSwitcher = suite.sessionSwitcher
val testConf = suite.getTestConf()
Expand All @@ -116,36 +120,40 @@ class Parameterized(

val runQueryIds = queries.select(suite)

// warm up
(0 until warmupIterations).foreach { _ =>
runQueryIds.foreach { queryId =>
Parameterized.warmUp(suite.tableCreator(), queryId, suite.desc(), sessionSwitcher, runner)
}
}

val results = coordinates.flatMap { entry =>
val coordinate = entry._1
val coordinateResults = (0 until iterations).flatMap { iteration =>
println(s"Running tests (iteration $iteration) with coordinate $coordinate...")
runQueryIds.map { queryId =>
Parameterized.runQuery(
runner,
suite.tableCreator(),
sessionSwitcher,
val results = (0 until iterations).flatMap { iteration =>
runQueryIds.map { queryId =>
val queryResult =
TestResultLine(
queryId,
coordinate,
suite.desc(),
explain,
metrics)
}
}.toList
coordinateResults
coordinates.map { entry =>
val coordinate = entry._1
println(s"Running tests (iteration $iteration) with coordinate $coordinate...")
// warm up
(0 until warmupIterations).foreach { _ =>
Parameterized.warmUp(
runner,
suite.tableCreator(),
sessionSwitcher,
queryId,
suite.desc())
}
// run
Parameterized.runQuery(
runner,
suite.tableCreator(),
sessionSwitcher,
queryId,
coordinate,
suite.desc(),
explain,
metrics)
}.toList)
queryResult
}
}

val dimNames = configDimensions.map(dim => dim.name)

val passedCount = results.count(l => l.succeed)
val count = results.count(_ => true)
val succeededCount = results.count(l => l.succeeded())
val totalCount = results.count(_ => true)

// RAM stats
println("Performing GC to collect RAM statistics... ")
Expand All @@ -160,22 +168,37 @@ class Parameterized(
println("")
println("Test report: ")
println("")
printf("Summary: %d out of %d queries passed. \n", passedCount, count)
printf(
"Summary: %d out of %d queries successfully run on all config combinations. \n",
succeededCount,
totalCount)
println("")
TestResultLines(dimNames, metrics, results.filter(_.succeed)).print()
println("Configurations:")
coordinates.foreach { coord =>
println(s"${coord._1.id}. ${coord._1}")
}
println("")
val succeeded = results.filter(_.succeeded())
TestResultLines(
coordinates.size,
configDimensions,
metrics,
succeeded ++ TestResultLine.aggregate("all", succeeded))
.print()
println("")

if (passedCount == count) {
if (succeededCount == totalCount) {
println("No failed queries. ")
println("")
} else {
println("Failed queries: ")
println("")
TestResultLines(dimNames, metrics, results.filter(!_.succeed)).print()
TestResultLines(coordinates.size, configDimensions, metrics, results.filter(!_.succeeded()))
.print()
println("")
}

if (passedCount != count) {
if (succeededCount != totalCount) {
return false
}
true
Expand All @@ -185,56 +208,84 @@ class Parameterized(
case class DimKv(k: String, v: String)
case class Dim(name: String, dimValues: Seq[DimValue])
case class DimValue(name: String, conf: Seq[(String, String)])
case class Coordinate(coordinate: Map[String, String]) // [dim, dim value]

case class TestResultLine(
queryId: String,
succeed: Boolean,
coordinate: Coordinate,
rowCount: Option[Long],
planningTimeMillis: Option[Long],
executionTimeMillis: Option[Long],
metrics: Map[String, Long],
errorMessage: Option[String])
// coordinate: [dim, dim value]
case class Coordinate(id: Int, coordinate: Map[String, String]) {
override def toString: String = coordinate.mkString(", ")
}

case class TestResultLine(queryId: String, coordinates: Seq[TestResultLine.Coord]) {
def succeeded(): Boolean = {
coordinates.forall(_.succeeded)
}
}

object TestResultLine {
class Parser(dimNames: Seq[String], metricNames: Seq[String])
extends TableRender.RowParser[TestResultLine] {
case class Coord(
coordinate: Coordinate,
succeeded: Boolean,
rowCount: Option[Long],
planningTimeMillis: Option[Long],
executionTimeMillis: Option[Long],
metrics: Map[String, Long],
errorMessage: Option[String])

class Parser(metricNames: Seq[String]) extends TableRender.RowParser[TestResultLine] {
override def parse(rowAppender: RowAppender, line: TestResultLine): Unit = {
val inc = rowAppender.incremental()
inc.next().write(line.queryId)
inc.next().write(line.succeed)
dimNames.foreach { dimName =>
val coordinate = line.coordinate.coordinate
if (!coordinate.contains(dimName)) {
throw new IllegalStateException("Dimension name not found" + dimName)
}
inc.next().write(coordinate(dimName))
}
metricNames.foreach { metricName =>
val metrics = line.metrics
inc.next().write(metrics.getOrElse(metricName, "N/A"))
}
inc.next().write(line.rowCount.getOrElse("N/A"))
inc.next().write(line.planningTimeMillis.getOrElse("N/A"))
inc.next().write(line.executionTimeMillis.getOrElse("N/A"))
val coords = line.coordinates
coords.foreach(coord => inc.next().write(coord.succeeded))
coords.foreach(coord => inc.next().write(coord.rowCount))
metricNames.foreach(metricName =>
coords.foreach(coord => inc.next().write(coord.metrics(metricName))))
coords.foreach(coord => inc.next().write(coord.planningTimeMillis))
coords.foreach(coord => inc.next().write(coord.executionTimeMillis))
}
}

def aggregate(name: String, lines: Iterable[TestResultLine]): Iterable[TestResultLine] = {
if (lines.isEmpty) {
return Nil
}

if (lines.size == 1) {
return Nil
}

List(lines.reduce { (left, right) =>
TestResultLine(name, left.coordinates.zip(right.coordinates).map {
case (leftCoord, rightCoord) =>
assert(leftCoord.coordinate == rightCoord.coordinate)
Coord(
leftCoord.coordinate,
leftCoord.succeeded && rightCoord.succeeded,
(leftCoord.rowCount, rightCoord.rowCount).onBothProvided(_ + _),
(leftCoord.planningTimeMillis, rightCoord.planningTimeMillis).onBothProvided(_ + _),
(leftCoord.executionTimeMillis, rightCoord.executionTimeMillis).onBothProvided(_ + _),
(leftCoord.metrics, rightCoord.metrics).sumUp,
(leftCoord.errorMessage ++ rightCoord.errorMessage).reduceOption(_ + ", " + _))
})
})
}
}

case class TestResultLines(
dimNames: Seq[String],
coordCount: Int,
configDimensions: Seq[Dim],
metricNames: Seq[String],
lines: Iterable[TestResultLine]) {
def print(): Unit = {
val fields = ListBuffer[String]("Query ID", "Succeeded")
dimNames.foreach(dimName => fields.append(dimName))
metricNames.foreach(metricName => fields.append(metricName))
fields.append("Row Count")
fields.append("Planning Time (Millis)")
fields.append("Query Time (Millis)")
val render = TableRender.plain[TestResultLine](fields: _*)(
new TestResultLine.Parser(dimNames, metricNames))
val fields = ListBuffer[Field](Field.Leaf("Query ID"))
val coordFields = (1 to coordCount).map(id => Field.Leaf(id.toString))

fields.append(Field.Branch("Succeeded", coordFields))
fields.append(Field.Branch("Row Count", coordFields))
metricNames.foreach(metricName => fields.append(Field.Branch(metricName, coordFields)))
fields.append(Field.Branch("Planning Time (Millis)", coordFields))
fields.append(Field.Branch("Query Time (Millis)", coordFields))

val render =
TableRender.create[TestResultLine](fields: _*)(new TestResultLine.Parser(metricNames))

lines.foreach { line =>
render.appendRow(line)
Expand All @@ -253,10 +304,10 @@ object Parameterized {
coordinate: Coordinate,
desc: String,
explain: Boolean,
metrics: Array[String]) = {
metrics: Array[String]): TestResultLine.Coord = {
println(s"Running query: $id...")
try {
val testDesc = "Gluten Spark %s %s %s".format(desc, id, coordinate)
val testDesc = "Gluten Spark %s [%s] %s".format(desc, id, coordinate)
sessionSwitcher.useSession(coordinate.toString, testDesc)
runner.createTables(creator, sessionSwitcher.spark())
val result =
Expand All @@ -265,10 +316,9 @@ object Parameterized {
println(
s"Successfully ran query $id. " +
s"Returned row count: ${resultRows.length}")
TestResultLine(
id,
succeed = true,
TestResultLine.Coord(
coordinate,
succeeded = true,
Some(resultRows.length),
Some(result.planningTimeMillis),
Some(result.executionTimeMillis),
Expand All @@ -280,16 +330,16 @@ object Parameterized {
println(
s"Error running query $id. " +
s" Error: ${error.get}")
TestResultLine(id, succeed = false, coordinate, None, None, None, Map.empty, error)
TestResultLine.Coord(coordinate, succeeded = false, None, None, None, Map.empty, error)
}
}

private[integration] def warmUp(
private def warmUp(
runner: QueryRunner,
creator: TableCreator,
id: String,
desc: String,
sessionSwitcher: SparkSessionSwitcher,
runner: QueryRunner): Unit = {
id: String,
desc: String): Unit = {
println(s"Warming up: Running query: $id...")
try {
val testDesc = "Gluten Spark %s %s warm up".format(desc, id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@ object Queries {
val inc = rowAppender.incremental()
inc.next().write(line.queryId)
inc.next().write(line.testPassed)
inc.next().write(line.rowCount.getOrElse("N/A"))
inc.next().write(line.planningTimeMillis.getOrElse("N/A"))
inc.next().write(line.executionTimeMillis.getOrElse("N/A"))
inc.next().write(line.rowCount)
inc.next().write(line.planningTimeMillis)
inc.next().write(line.executionTimeMillis)
}
}
}
Expand Down
Loading
Loading