Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Nov 1, 2023
1 parent 441e303 commit fcff6af
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,17 @@ import io.glutenproject.tags.FuzzerTest

import org.apache.spark.SparkConf

import org.apache.commons.io.FileUtils

import java.io.File

@FuzzerTest
class ShuffleWriterFuzzerTest extends VeloxWholeStageTransformerSuite {
override protected val backend: String = "velox"
override protected val resourcePath: String = "/tpch-data-parquet-velox"
override protected val fileFormat: String = "parquet"

private val generatedPlanDir =
getClass.getResource("/").getPath + "../../../generated-native-benchmark/"

private val seed: Long = System.currentTimeMillis()
private val dataGenerator = RandomParquetDataGenerator(seed)
private val outputPath = generatedPlanDir + s"${seed}_output.parquet"
private val dataGenerator = RandomParquetDataGenerator(System.currentTimeMillis())
private val outputPath = getClass.getResource("/").getPath + "fuzzer_output.parquet"

private val REPARTITION_SQL = "select /*+ REPARTITION(3) */ * from tbl"
private val AGG_REPARTITION_SQL = "select count(*) from tbl group by f_1, f_2, f_3, f_4, f_5, f_6"
override protected def sparkConf: SparkConf = {
super.sparkConf
.set("spark.plugins", "io.glutenproject.GlutenPlugin")
Expand All @@ -46,54 +40,55 @@ class ShuffleWriterFuzzerTest extends VeloxWholeStageTransformerSuite {
.set("spark.memory.offHeap.size", "512MB")
.set("spark.driver.memory", "4g")
}
override def beforeAll(): Unit = {
super.beforeAll()
val dir = new File(generatedPlanDir)
if (dir.exists()) {
FileUtils.forceDelete(dir)
}
FileUtils.forceMkdir(dir)
}

def testRepartition(): Boolean = {
def executeQuery(sql: String): Boolean = {
try {
System.gc()
dataGenerator.generateRandomData(spark, outputPath)
spark.read.format("parquet").load(outputPath).createOrReplaceTempView("tbl")
spark.sql("select /*+ REPARTITION(3) */ * from tbl").foreach(_ => ())
spark.sql(sql).foreach(_ => ())
true
} catch {
case t: Throwable =>
logError(s"Failed to run test with seed: $seed", t)
logError(s"Failed to run test with seed: ${dataGenerator.getSeed}", t)
false
}
}

test("repartition") {
val failed = (0 until 10)
def repeatQuery(sql: String, iterations: Int): Unit = {
val failed = (0 until iterations)
.filterNot {
i =>
logWarning(
s"==============================> " +
s"Started iteration $i (seed: ${dataGenerator.getSeed})")
val success = testRepartition()
val success = executeQuery(sql)
dataGenerator.reFake(System.currentTimeMillis())
success
}
.map(_ => seed)
.map(_ => dataGenerator.getSeed)
if (failed.nonEmpty) {
logError(s"Failed to run test with seed: ${failed.mkString(", ")}")
}
}

ignore("reproduce repartition") {
Seq(1698817545149L).foreach {
test("repartition") {
repeatQuery(REPARTITION_SQL, 10)
}

test("with aggregation") {
repeatQuery(AGG_REPARTITION_SQL, 10)
}

ignore("reproduce") {
val sql = REPARTITION_SQL
Seq(0L).foreach {
seed =>
dataGenerator.reFake(seed)
logWarning(
s"==============================> " +
s"Started reproduction (seed: ${dataGenerator.getSeed})")
testRepartition()
executeQuery(sql)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ case class RandomParquetDataGenerator(initialSeed: Long = 0L) {
}

// Candidate fields
val candidateFields: List[() => StructField] = List(
val numericFields: List[() => StructField] = List(
() => StructField(fieldName, BooleanType, nullable = true),
() => StructField(fieldName, ByteType, nullable = true),
() => StructField(fieldName, ShortType, nullable = true),
Expand All @@ -119,10 +119,16 @@ case class RandomParquetDataGenerator(initialSeed: Long = 0L) {
() => StructField(fieldName, FloatType, nullable = true),
() => StructField(fieldName, DoubleType, nullable = true),
() => StructField(fieldName, DateType, nullable = true),
// () => StructField(fieldName, TimestampType, nullable = true),
() => StructField(fieldName, DecimalType(10, 2), nullable = true),
// () => StructField(fieldName, TimestampType, nullable = true),
() => StructField(fieldName, DecimalType(10, 2), nullable = true)
)

val binaryFields: List[() => StructField] = List(
() => StructField(fieldName, StringType, nullable = true),
() => StructField(fieldName, BinaryType, nullable = true),
() => StructField(fieldName, BinaryType, nullable = true)
)

val complexFields: List[() => StructField] = List(
() => StructField(fieldName, ArrayType(StringType, containsNull = true), nullable = true),
() =>
StructField(
Expand All @@ -140,17 +146,24 @@ case class RandomParquetDataGenerator(initialSeed: Long = 0L) {
nullable = true)
)

val candidateFields: List[() => StructField] =
numericFields ++ binaryFields ++ complexFields

// Function to generate random schema with n fields
def generateRandomSchema(n: Int): StructType = {
fieldIndex = 0
val selectedFields =
(0 until n)
.map(_ => candidateFields(faker.random().nextInt(candidateFields.length))())
val selectedFields = {
(0 until 3).map(_ => numericFields(faker.random().nextInt(numericFields.length))()) ++
(0 until 3).map(_ => binaryFields(faker.random().nextInt(binaryFields.length))()) ++
(0 until Math.max(0, n - 6))
.map(_ => candidateFields(faker.random().nextInt(candidateFields.length))())
}
StructType(selectedFields)
}

// Generate random schema with [10, 30) fields
def generateRandomSchema(): StructType = {
generateRandomSchema(faker.random().nextInt(5, 30))
generateRandomSchema(faker.random().nextInt(4, 24))
}
}

Expand Down

0 comments on commit fcff6af

Please sign in to comment.