diff --git a/backends-velox/pom.xml b/backends-velox/pom.xml index 8f45986cdba1..ea49703a2e9f 100755 --- a/backends-velox/pom.xml +++ b/backends-velox/pom.xml @@ -166,6 +166,13 @@ 1.3.0 compile + + + com.github.javafaker + javafaker + 1.0.2 + test + diff --git a/backends-velox/src/test/java/io/glutenproject/tags/FuzzerTest.java b/backends-velox/src/test/java/io/glutenproject/tags/FuzzerTest.java new file mode 100644 index 000000000000..ada28718b944 --- /dev/null +++ b/backends-velox/src/test/java/io/glutenproject/tags/FuzzerTest.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.glutenproject.tags; + +import org.scalatest.TagAnnotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@TagAnnotation +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.METHOD, ElementType.TYPE}) +public @interface FuzzerTest {} diff --git a/backends-velox/src/test/scala/io/glutenproject/benchmarks/ShuffleWriterFuzzerTest.scala b/backends-velox/src/test/scala/io/glutenproject/benchmarks/ShuffleWriterFuzzerTest.scala new file mode 100644 index 000000000000..9af45c78eb84 --- /dev/null +++ b/backends-velox/src/test/scala/io/glutenproject/benchmarks/ShuffleWriterFuzzerTest.scala @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.glutenproject.benchmarks + +import io.glutenproject.benchmarks.ShuffleWriterFuzzerTest.{Failed, OOM, Successful, TestResult} +import io.glutenproject.execution.VeloxWholeStageTransformerSuite +import io.glutenproject.memory.memtarget.ThrowOnOomMemoryTarget +import io.glutenproject.tags.{FuzzerTest, SkipTestTags} + +import org.apache.spark.SparkConf + +object ShuffleWriterFuzzerTest { + trait TestResult { + val seed: Long + + def getSeed: Long = seed + } + case class Successful(seed: Long) extends TestResult + case class Failed(seed: Long) extends TestResult + case class OOM(seed: Long) extends TestResult +} + +@FuzzerTest +@SkipTestTags +class ShuffleWriterFuzzerTest extends VeloxWholeStageTransformerSuite { + override protected val backend: String = "velox" + override protected val resourcePath: String = "/tpch-data-parquet-velox" + override protected val fileFormat: String = "parquet" + + private val dataGenerator = RandomParquetDataGenerator(System.currentTimeMillis()) + private val outputPath = getClass.getResource("/").getPath + "fuzzer_output.parquet" + + private val REPARTITION_SQL = (numPartitions: Int) => + s"select /*+ REPARTITION($numPartitions) */ * from tbl" + private val AGG_REPARTITION_SQL = + """select count(*) as cnt, f_1, f_2, f_3, f_4, f_5, f_6 + |from tbl group by f_1, f_2, f_3, f_4, f_5, f_6 + |order by cnt, f_1, f_2, f_3, f_4, f_5, f_6""".stripMargin + + override protected def sparkConf: SparkConf = { + super.sparkConf + .set("spark.plugins", "io.glutenproject.GlutenPlugin") + .set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager") + .set("spark.memory.offHeap.enabled", "true") + .set("spark.memory.offHeap.size", "512MB") + .set("spark.driver.memory", "4g") + .set("spark.driver.maxResultSize", "4g") + .set("spark.gluten.sql.debug", "true") + .set("spark.gluten.sql.columnar.backend.velox.glogSeverityLevel", "0") + } + + def getRootCause(e: Throwable): Throwable = { + if (e.getCause == null) { + return e + } + getRootCause(e.getCause) + } + + def executeQuery(sql: String): TestResult = { + try { + System.gc() + dataGenerator.generateRandomData(spark, outputPath) + spark.read.format("parquet").load(outputPath).createOrReplaceTempView("tbl") + runQueryAndCompare(sql, true, false)(_ => {}) + Successful(dataGenerator.getSeed) + } catch { + case oom: ThrowOnOomMemoryTarget.OutOfMemoryException => + logError(s"Out of memory while running test with seed: ${dataGenerator.getSeed}", oom) + OOM(dataGenerator.getSeed) + case t: Throwable => + if ( + getRootCause(t).getMessage.contains( + classOf[ThrowOnOomMemoryTarget.OutOfMemoryException].getName) + ) { + logError(s"Out of memory while running test with seed: ${dataGenerator.getSeed}", t) + OOM(dataGenerator.getSeed) + } else { + logError(s"Failed to run test with seed: ${dataGenerator.getSeed}", t) + Failed(dataGenerator.getSeed) + } + } + } + + def repeatQuery(sql: String, iterations: Int, testName: String): Unit = { + val result = (0 until iterations) + .map { + i => + logWarning( + s"==============================> " + + s"Started iteration $i (seed: ${dataGenerator.getSeed})") + val result = executeQuery(sql) + dataGenerator.reFake(System.currentTimeMillis()) + result + } + val oom = result.filter(_.isInstanceOf[OOM]).map(_.getSeed) + if (oom.nonEmpty) { + logError(s"Out of memory while running test '$testName' with seed: ${oom.mkString(", ")}") + } + val failed = result.filter(_.isInstanceOf[Failed]).map(_.getSeed) + assert(failed.isEmpty, s"Failed to run test '$testName' with seed: ${failed.mkString(",")}") + } + + private val REPARTITION_TEST_NAME = (numPartitions: Int) => s"repartition - $numPartitions" + for (numPartitions <- Seq(1, 3, 10, 100, 1000, 4000, 8000)) { + val testName = REPARTITION_TEST_NAME(numPartitions) + test(testName) { + repeatQuery(REPARTITION_SQL(numPartitions), 10, testName) + } + } + + private val AGG_TEST_NAME = "with aggregation" + ignore(AGG_TEST_NAME) { + repeatQuery(AGG_REPARTITION_SQL, 10, AGG_TEST_NAME) + } + + ignore("reproduce") { + // Replace sql with the actual failed sql. + val sql = REPARTITION_SQL(100) + // Replace seed '0L' with the actual failed seed. + Seq(0L).foreach { + seed => + dataGenerator.reFake(seed) + logWarning( + s"==============================> " + + s"Started reproduction (seed: ${dataGenerator.getSeed})") + val result = executeQuery(sql) + assert(result.isInstanceOf[Successful], s"Failed to run 'reproduce' with seed: $seed") + } + } +} diff --git a/gluten-core/pom.xml b/gluten-core/pom.xml index de9d9cca7668..04fcfc11a85b 100644 --- a/gluten-core/pom.xml +++ b/gluten-core/pom.xml @@ -211,6 +211,13 @@ com.fasterxml.jackson.module jackson-module-scala_${scala.binary.version} + + + com.github.javafaker + javafaker + 1.0.2 + test + diff --git a/gluten-core/src/test/scala/io/glutenproject/benchmarks/RandomParquetDataGenerator.scala b/gluten-core/src/test/scala/io/glutenproject/benchmarks/RandomParquetDataGenerator.scala new file mode 100644 index 000000000000..fbd4e25f2599 --- /dev/null +++ b/gluten-core/src/test/scala/io/glutenproject/benchmarks/RandomParquetDataGenerator.scala @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.glutenproject.benchmarks + +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.types._ + +import com.github.javafaker.Faker + +import java.sql.Date +import java.util.Random + +case class RandomParquetDataGenerator(initialSeed: Long = 0L) { + private var seed: Long = initialSeed + private var faker = new Faker(new Random(seed)) + + def reFake(newSeed: Long): Unit = { + seed = newSeed + faker = new Faker(new Random(seed)) + } + + def getSeed: Long = { + seed + } + + def getFaker: Faker = { + faker + } + + def generateRow(schema: StructType, probabilityOfNull: Double = 0): Row = { + val values = schema.fields.map(field => generateDataForType(field.dataType, probabilityOfNull)) + Row.fromSeq(values) + } + + def generateDataForType(dataType: DataType, probabilityOfNull: Double): Any = { + require( + probabilityOfNull >= 0 && probabilityOfNull <= 1, + "Probability should be between 0 and 1") + + if (faker.random().nextDouble() < probabilityOfNull) { + return null + } + + dataType match { + case BooleanType => faker.bool().bool() + case ByteType => faker.number().numberBetween(Byte.MinValue, Byte.MaxValue).toByte + case ShortType => faker.number().numberBetween(Short.MinValue, Short.MaxValue).toShort + case IntegerType => faker.number().numberBetween(Int.MinValue, Int.MaxValue) + case LongType => faker.number().numberBetween(Long.MinValue, Long.MaxValue) + case FloatType => + faker.number().randomDouble(2, Float.MinValue.toInt, Float.MaxValue.toInt).toFloat + case DoubleType => + faker.number().randomDouble(2, Double.MinValue.toLong, Double.MaxValue.toLong) + case DateType => new Date(faker.date().birthday().getTime) +// case TimestampType => new Timestamp(faker.date().birthday().getTime) + case t: DecimalType => + BigDecimal( + faker.number().randomDouble(t.scale, 0, Math.pow(10, t.precision - t.scale).toLong)) + case StringType => faker.lorem().characters(0, 1000) + case BinaryType => faker.lorem().characters(10).getBytes + case ArrayType(elementType, _) => + Seq.fill(faker.number().numberBetween(1, 5))( + generateDataForType(elementType, probabilityOfNull)) + case MapType(keyType, valueType, _) => + Map(generateDataForType(keyType, 0) -> generateDataForType(valueType, probabilityOfNull)) + case struct: StructType => generateRow(struct) + case _ => + throw new UnsupportedOperationException( + s"Data generation not supported for type: $dataType") + } + } + + def generateRandomData( + spark: SparkSession, + schema: StructType, + numRows: Int, + outputPath: String): Unit = { + val data = (0 until numRows).map(_ => generateRow(schema, faker.random().nextDouble())) + val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) + df.coalesce(1) + .write + .mode("overwrite") + .parquet(outputPath) + } + + def generateRandomData(spark: SparkSession, outputPath: String): Unit = { + val schema = generateRandomSchema() + val numRows = faker.random().nextInt(1000, 300000) + generateRandomData(spark, schema, numRows, outputPath) + } + + var fieldIndex = 0 + def fieldName: String = { + fieldIndex += 1 + s"f_$fieldIndex" + } + + // Candidate fields + val numericFields: List[() => StructField] = List( + () => StructField(fieldName, BooleanType, nullable = true), + () => StructField(fieldName, ByteType, nullable = true), + () => StructField(fieldName, ShortType, nullable = true), + () => StructField(fieldName, IntegerType, nullable = true), + () => StructField(fieldName, LongType, nullable = true), + () => StructField(fieldName, FloatType, nullable = true), + () => StructField(fieldName, DoubleType, nullable = true), + () => StructField(fieldName, DateType, nullable = true), +// () => StructField(fieldName, TimestampType, nullable = true), + () => StructField(fieldName, DecimalType(10, 2), nullable = true) + ) + + val binaryFields: List[() => StructField] = List( + () => StructField(fieldName, StringType, nullable = true), + () => StructField(fieldName, BinaryType, nullable = true) + ) + + val complexFields: List[() => StructField] = List( + () => StructField(fieldName, ArrayType(StringType, containsNull = true), nullable = true), + () => + StructField( + fieldName, + MapType(StringType, IntegerType, valueContainsNull = true), + nullable = true), + () => + StructField( + fieldName, + StructType( + Seq( + StructField(fieldName, StringType, nullable = true), + StructField(fieldName, DoubleType, nullable = true) + )), + nullable = true) + ) + + val candidateFields: List[() => StructField] = + numericFields ++ binaryFields ++ complexFields + + // Function to generate random schema with n fields + def generateRandomSchema(n: Int): StructType = { + fieldIndex = 0 + val selectedFields = { + (0 until 3).map(_ => numericFields(faker.random().nextInt(numericFields.length))()) ++ + (0 until 3).map(_ => binaryFields(faker.random().nextInt(binaryFields.length))()) ++ + (0 until Math.max(0, n - 6)) + .map(_ => candidateFields(faker.random().nextInt(candidateFields.length))()) + } + StructType(selectedFields) + } + + // Generate random schema with [10, 30) fields + def generateRandomSchema(): StructType = { + generateRandomSchema(faker.random().nextInt(4, 24)) + } +} + +// An example to demonstrate how to use RandomParquetDataGenerator to generate input data. +object RandomParquetDataGenerator { + def main(args: Array[String]): Unit = { + val spark = + SparkSession.builder().master("local[1]").appName("Random Data Generator").getOrCreate() + + val seed: Long = 0L + val outputPath = s"${seed}_output.parquet" + + RandomParquetDataGenerator(seed).generateRandomData(spark, outputPath) + } +}