Skip to content

Commit

Permalink
compare result
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Nov 6, 2023
1 parent 2171399 commit 2a7d2fd
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,24 @@
*/
package io.glutenproject.benchmarks

import io.glutenproject.benchmarks.ShuffleWriterFuzzerTest.{Failed, OOM, Successful, TestResult}
import io.glutenproject.execution.VeloxWholeStageTransformerSuite
import io.glutenproject.memory.memtarget.ThrowOnOomMemoryTarget
import io.glutenproject.tags.FuzzerTest

import org.apache.spark.SparkConf

object ShuffleWriterFuzzerTest {
trait TestResult {
val seed: Long

def getSeed: Long = seed
}
case class Successful(seed: Long) extends TestResult
case class Failed(seed: Long) extends TestResult
case class OOM(seed: Long) extends TestResult
}

@FuzzerTest
class ShuffleWriterFuzzerTest extends VeloxWholeStageTransformerSuite {
override protected val backend: String = "velox"
Expand All @@ -41,35 +54,40 @@ class ShuffleWriterFuzzerTest extends VeloxWholeStageTransformerSuite {
.set("spark.driver.memory", "4g")
}

def executeQuery(sql: String): Boolean = {
def executeQuery(sql: String): TestResult = {
try {
System.gc()
dataGenerator.generateRandomData(spark, outputPath)
spark.read.format("parquet").load(outputPath).createOrReplaceTempView("tbl")
spark.sql(sql).foreach(_ => ())
true
runQueryAndCompare(sql, true, false)(_ => {})
Successful(dataGenerator.getSeed)
} catch {
case oom: ThrowOnOomMemoryTarget.OutOfMemoryException =>
logError(s"Out of memory while running test with seed: ${dataGenerator.getSeed}", oom)
OOM(dataGenerator.getSeed)
case t: Throwable =>
logError(s"Failed to run test with seed: ${dataGenerator.getSeed}", t)
false
Failed(dataGenerator.getSeed)
}
}

def repeatQuery(sql: String, iterations: Int): Unit = {
val failed = (0 until iterations)
.filterNot {
val result = (0 until iterations)
.map {
i =>
logWarning(
s"==============================> " +
s"Started iteration $i (seed: ${dataGenerator.getSeed})")
val success = executeQuery(sql)
val result = executeQuery(sql)
dataGenerator.reFake(System.currentTimeMillis())
success
result
}
.map(_ => dataGenerator.getSeed)
if (failed.nonEmpty) {
logError(s"Failed to run test with seed: ${failed.mkString(", ")}")
val oom = result.filter(_.isInstanceOf[OOM]).map(_.getSeed)
if (oom.nonEmpty) {
logError(s"Out of memory while running test with seed: ${oom.mkString(", ")}")
}
val failed = result.filter(_.isInstanceOf[Failed]).map(_.getSeed)
assert(failed.isEmpty, s"Failed to run test with seed: ${failed.mkString(",")}")
}

test("repartition") {
Expand All @@ -88,7 +106,8 @@ class ShuffleWriterFuzzerTest extends VeloxWholeStageTransformerSuite {
logWarning(
s"==============================> " +
s"Started reproduction (seed: ${dataGenerator.getSeed})")
executeQuery(sql)
val result = executeQuery(sql)
assert(result.isInstanceOf[Successful])
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ case class RandomParquetDataGenerator(initialSeed: Long = 0L) {
schema: StructType,
numRows: Int,
outputPath: String): Unit = {
val data = (0 until numRows).map(_ => generateRow(schema, 0))
val data = (0 until numRows).map(_ => generateRow(schema, faker.random().nextDouble()))
val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
df.coalesce(1)
.write
Expand Down Expand Up @@ -119,7 +119,7 @@ case class RandomParquetDataGenerator(initialSeed: Long = 0L) {
() => StructField(fieldName, FloatType, nullable = true),
() => StructField(fieldName, DoubleType, nullable = true),
() => StructField(fieldName, DateType, nullable = true),
// () => StructField(fieldName, TimestampType, nullable = true),
// () => StructField(fieldName, TimestampType, nullable = true),
() => StructField(fieldName, DecimalType(10, 2), nullable = true)
)

Expand Down

0 comments on commit 2a7d2fd

Please sign in to comment.