From 2a7d2fdf488fb3f342ce317e0b2f3b525a041c34 Mon Sep 17 00:00:00 2001 From: "Ma, Rong" Date: Mon, 6 Nov 2023 15:22:21 +0800 Subject: [PATCH] compare result --- .../benchmarks/ShuffleWriterFuzzerTest.scala | 43 +++++++++++++------ .../RandomParquetDataGenerator.scala | 4 +- 2 files changed, 33 insertions(+), 14 deletions(-) diff --git a/backends-velox/src/test/scala/io/glutenproject/benchmarks/ShuffleWriterFuzzerTest.scala b/backends-velox/src/test/scala/io/glutenproject/benchmarks/ShuffleWriterFuzzerTest.scala index 9cd4efda6a0a6..7b353438155bc 100644 --- a/backends-velox/src/test/scala/io/glutenproject/benchmarks/ShuffleWriterFuzzerTest.scala +++ b/backends-velox/src/test/scala/io/glutenproject/benchmarks/ShuffleWriterFuzzerTest.scala @@ -16,11 +16,24 @@ */ package io.glutenproject.benchmarks +import io.glutenproject.benchmarks.ShuffleWriterFuzzerTest.{Failed, OOM, Successful, TestResult} import io.glutenproject.execution.VeloxWholeStageTransformerSuite +import io.glutenproject.memory.memtarget.ThrowOnOomMemoryTarget import io.glutenproject.tags.FuzzerTest import org.apache.spark.SparkConf +object ShuffleWriterFuzzerTest { + trait TestResult { + val seed: Long + + def getSeed: Long = seed + } + case class Successful(seed: Long) extends TestResult + case class Failed(seed: Long) extends TestResult + case class OOM(seed: Long) extends TestResult +} + @FuzzerTest class ShuffleWriterFuzzerTest extends VeloxWholeStageTransformerSuite { override protected val backend: String = "velox" @@ -41,35 +54,40 @@ class ShuffleWriterFuzzerTest extends VeloxWholeStageTransformerSuite { .set("spark.driver.memory", "4g") } - def executeQuery(sql: String): Boolean = { + def executeQuery(sql: String): TestResult = { try { System.gc() dataGenerator.generateRandomData(spark, outputPath) spark.read.format("parquet").load(outputPath).createOrReplaceTempView("tbl") - spark.sql(sql).foreach(_ => ()) - true + runQueryAndCompare(sql, true, false)(_ => {}) + Successful(dataGenerator.getSeed) } catch { + case oom: ThrowOnOomMemoryTarget.OutOfMemoryException => + logError(s"Out of memory while running test with seed: ${dataGenerator.getSeed}", oom) + OOM(dataGenerator.getSeed) case t: Throwable => logError(s"Failed to run test with seed: ${dataGenerator.getSeed}", t) - false + Failed(dataGenerator.getSeed) } } def repeatQuery(sql: String, iterations: Int): Unit = { - val failed = (0 until iterations) - .filterNot { + val result = (0 until iterations) + .map { i => logWarning( s"==============================> " + s"Started iteration $i (seed: ${dataGenerator.getSeed})") - val success = executeQuery(sql) + val result = executeQuery(sql) dataGenerator.reFake(System.currentTimeMillis()) - success + result } - .map(_ => dataGenerator.getSeed) - if (failed.nonEmpty) { - logError(s"Failed to run test with seed: ${failed.mkString(", ")}") + val oom = result.filter(_.isInstanceOf[OOM]).map(_.getSeed) + if (oom.nonEmpty) { + logError(s"Out of memory while running test with seed: ${oom.mkString(", ")}") } + val failed = result.filter(_.isInstanceOf[Failed]).map(_.getSeed) + assert(failed.isEmpty, s"Failed to run test with seed: ${failed.mkString(",")}") } test("repartition") { @@ -88,7 +106,8 @@ class ShuffleWriterFuzzerTest extends VeloxWholeStageTransformerSuite { logWarning( s"==============================> " + s"Started reproduction (seed: ${dataGenerator.getSeed})") - executeQuery(sql) + val result = executeQuery(sql) + assert(result.isInstanceOf[Successful]) } } } diff --git a/gluten-core/src/test/scala/io/glutenproject/benchmarks/RandomParquetDataGenerator.scala b/gluten-core/src/test/scala/io/glutenproject/benchmarks/RandomParquetDataGenerator.scala index 17890f3ad4cac..fbd4e25f25993 100644 --- a/gluten-core/src/test/scala/io/glutenproject/benchmarks/RandomParquetDataGenerator.scala +++ b/gluten-core/src/test/scala/io/glutenproject/benchmarks/RandomParquetDataGenerator.scala @@ -89,7 +89,7 @@ case class RandomParquetDataGenerator(initialSeed: Long = 0L) { schema: StructType, numRows: Int, outputPath: String): Unit = { - val data = (0 until numRows).map(_ => generateRow(schema, 0)) + val data = (0 until numRows).map(_ => generateRow(schema, faker.random().nextDouble())) val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) df.coalesce(1) .write @@ -119,7 +119,7 @@ case class RandomParquetDataGenerator(initialSeed: Long = 0L) { () => StructField(fieldName, FloatType, nullable = true), () => StructField(fieldName, DoubleType, nullable = true), () => StructField(fieldName, DateType, nullable = true), - // () => StructField(fieldName, TimestampType, nullable = true), +// () => StructField(fieldName, TimestampType, nullable = true), () => StructField(fieldName, DecimalType(10, 2), nullable = true) )