From fcff6af84f0774b2a5499ef58e5e9a5695ada43f Mon Sep 17 00:00:00 2001 From: "Ma, Rong" Date: Wed, 1 Nov 2023 18:41:53 +0800 Subject: [PATCH] update --- .../benchmarks/ShuffleWriterFuzzerTest.scala | 51 +++++++++---------- .../RandomParquetDataGenerator.scala | 29 ++++++++--- 2 files changed, 44 insertions(+), 36 deletions(-) diff --git a/backends-velox/src/test/scala/io/glutenproject/benchmarks/ShuffleWriterFuzzerTest.scala b/backends-velox/src/test/scala/io/glutenproject/benchmarks/ShuffleWriterFuzzerTest.scala index e061191f6afad..9cd4efda6a0a6 100644 --- a/backends-velox/src/test/scala/io/glutenproject/benchmarks/ShuffleWriterFuzzerTest.scala +++ b/backends-velox/src/test/scala/io/glutenproject/benchmarks/ShuffleWriterFuzzerTest.scala @@ -21,23 +21,17 @@ import io.glutenproject.tags.FuzzerTest import org.apache.spark.SparkConf -import org.apache.commons.io.FileUtils - -import java.io.File - @FuzzerTest class ShuffleWriterFuzzerTest extends VeloxWholeStageTransformerSuite { override protected val backend: String = "velox" override protected val resourcePath: String = "/tpch-data-parquet-velox" override protected val fileFormat: String = "parquet" - private val generatedPlanDir = - getClass.getResource("/").getPath + "../../../generated-native-benchmark/" - - private val seed: Long = System.currentTimeMillis() - private val dataGenerator = RandomParquetDataGenerator(seed) - private val outputPath = generatedPlanDir + s"${seed}_output.parquet" + private val dataGenerator = RandomParquetDataGenerator(System.currentTimeMillis()) + private val outputPath = getClass.getResource("/").getPath + "fuzzer_output.parquet" + private val REPARTITION_SQL = "select /*+ REPARTITION(3) */ * from tbl" + private val AGG_REPARTITION_SQL = "select count(*) from tbl group by f_1, f_2, f_3, f_4, f_5, f_6" override protected def sparkConf: SparkConf = { super.sparkConf .set("spark.plugins", "io.glutenproject.GlutenPlugin") @@ -46,54 +40,55 @@ class ShuffleWriterFuzzerTest extends VeloxWholeStageTransformerSuite { .set("spark.memory.offHeap.size", "512MB") .set("spark.driver.memory", "4g") } - override def beforeAll(): Unit = { - super.beforeAll() - val dir = new File(generatedPlanDir) - if (dir.exists()) { - FileUtils.forceDelete(dir) - } - FileUtils.forceMkdir(dir) - } - def testRepartition(): Boolean = { + def executeQuery(sql: String): Boolean = { try { System.gc() dataGenerator.generateRandomData(spark, outputPath) spark.read.format("parquet").load(outputPath).createOrReplaceTempView("tbl") - spark.sql("select /*+ REPARTITION(3) */ * from tbl").foreach(_ => ()) + spark.sql(sql).foreach(_ => ()) true } catch { case t: Throwable => - logError(s"Failed to run test with seed: $seed", t) + logError(s"Failed to run test with seed: ${dataGenerator.getSeed}", t) false } } - test("repartition") { - val failed = (0 until 10) + def repeatQuery(sql: String, iterations: Int): Unit = { + val failed = (0 until iterations) .filterNot { i => logWarning( s"==============================> " + s"Started iteration $i (seed: ${dataGenerator.getSeed})") - val success = testRepartition() + val success = executeQuery(sql) dataGenerator.reFake(System.currentTimeMillis()) success } - .map(_ => seed) + .map(_ => dataGenerator.getSeed) if (failed.nonEmpty) { logError(s"Failed to run test with seed: ${failed.mkString(", ")}") } } - ignore("reproduce repartition") { - Seq(1698817545149L).foreach { + test("repartition") { + repeatQuery(REPARTITION_SQL, 10) + } + + test("with aggregation") { + repeatQuery(AGG_REPARTITION_SQL, 10) + } + + ignore("reproduce") { + val sql = REPARTITION_SQL + Seq(0L).foreach { seed => dataGenerator.reFake(seed) logWarning( s"==============================> " + s"Started reproduction (seed: ${dataGenerator.getSeed})") - testRepartition() + executeQuery(sql) } } } diff --git a/gluten-core/src/test/scala/io/glutenproject/benchmarks/RandomParquetDataGenerator.scala b/gluten-core/src/test/scala/io/glutenproject/benchmarks/RandomParquetDataGenerator.scala index 73ca3d3e4e15b..17890f3ad4cac 100644 --- a/gluten-core/src/test/scala/io/glutenproject/benchmarks/RandomParquetDataGenerator.scala +++ b/gluten-core/src/test/scala/io/glutenproject/benchmarks/RandomParquetDataGenerator.scala @@ -110,7 +110,7 @@ case class RandomParquetDataGenerator(initialSeed: Long = 0L) { } // Candidate fields - val candidateFields: List[() => StructField] = List( + val numericFields: List[() => StructField] = List( () => StructField(fieldName, BooleanType, nullable = true), () => StructField(fieldName, ByteType, nullable = true), () => StructField(fieldName, ShortType, nullable = true), @@ -119,10 +119,16 @@ case class RandomParquetDataGenerator(initialSeed: Long = 0L) { () => StructField(fieldName, FloatType, nullable = true), () => StructField(fieldName, DoubleType, nullable = true), () => StructField(fieldName, DateType, nullable = true), -// () => StructField(fieldName, TimestampType, nullable = true), - () => StructField(fieldName, DecimalType(10, 2), nullable = true), + // () => StructField(fieldName, TimestampType, nullable = true), + () => StructField(fieldName, DecimalType(10, 2), nullable = true) + ) + + val binaryFields: List[() => StructField] = List( () => StructField(fieldName, StringType, nullable = true), - () => StructField(fieldName, BinaryType, nullable = true), + () => StructField(fieldName, BinaryType, nullable = true) + ) + + val complexFields: List[() => StructField] = List( () => StructField(fieldName, ArrayType(StringType, containsNull = true), nullable = true), () => StructField( @@ -140,17 +146,24 @@ case class RandomParquetDataGenerator(initialSeed: Long = 0L) { nullable = true) ) + val candidateFields: List[() => StructField] = + numericFields ++ binaryFields ++ complexFields + // Function to generate random schema with n fields def generateRandomSchema(n: Int): StructType = { fieldIndex = 0 - val selectedFields = - (0 until n) - .map(_ => candidateFields(faker.random().nextInt(candidateFields.length))()) + val selectedFields = { + (0 until 3).map(_ => numericFields(faker.random().nextInt(numericFields.length))()) ++ + (0 until 3).map(_ => binaryFields(faker.random().nextInt(binaryFields.length))()) ++ + (0 until Math.max(0, n - 6)) + .map(_ => candidateFields(faker.random().nextInt(candidateFields.length))()) + } StructType(selectedFields) } + // Generate random schema with [10, 30) fields def generateRandomSchema(): StructType = { - generateRandomSchema(faker.random().nextInt(5, 30)) + generateRandomSchema(faker.random().nextInt(4, 24)) } }