Skip to content

Commit

Permalink
we don't support 3.4
Browse files Browse the repository at this point in the history
1. isSparkVersionGE("3.5")
2. isSparkVersionLE("3.3")
  • Loading branch information
baibaichen committed Jul 31, 2024
1 parent 5da604f commit 29f8756
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ class GlutenClickHouseNativeWriteTableSuite
("timestamp_field", "timestamp")
)
def excludeTimeFieldForORC(format: String): Seq[String] = {
if (format.equals("orc") && isSparkVersionGE("3.4")) {
if (format.equals("orc") && isSparkVersionGE("3.5")) {
// FIXME:https://github.com/apache/incubator-gluten/pull/6507
fields.keys.filterNot(_.equals("timestamp_field")).toSeq
} else {
Expand Down Expand Up @@ -913,7 +913,7 @@ class GlutenClickHouseNativeWriteTableSuite
(table_name, create_sql, insert_sql)
},
(table_name, _) =>
if (isSparkVersionGE("3.4")) {
if (isSparkVersionGE("3.5")) {
compareResultsAgainstVanillaSpark(
s"select * from $table_name",
compareResult = true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,7 @@ class GlutenClickHouseTPCHBucketSuite
}
assert(!plans.head.asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
assert(plans.head.metrics("numFiles").value === 2)
val pruningTimeValue = if (isSparkVersionGE("3.4")) 0 else -1
assert(plans.head.metrics("pruningTime").value === pruningTimeValue)
assert(plans.head.metrics("pruningTime").value === pruningTimeValueSpark)
assert(plans.head.metrics("numOutputRows").value === 591673)
})
}
Expand Down Expand Up @@ -292,7 +291,7 @@ class GlutenClickHouseTPCHBucketSuite
}

if (sparkVersion.equals("3.2")) {
assert(!(plans(11).asInstanceOf[FileSourceScanExecTransformer].bucketedScan))
assert(!plans(11).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
} else {
assert(plans(11).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
}
Expand Down Expand Up @@ -328,14 +327,14 @@ class GlutenClickHouseTPCHBucketSuite
.isInstanceOf[InputIteratorTransformer])

if (sparkVersion.equals("3.2")) {
assert(!(plans(2).asInstanceOf[FileSourceScanExecTransformer].bucketedScan))
assert(!plans(2).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
} else {
assert(plans(2).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
}
assert(plans(2).metrics("numFiles").value === 2)
assert(plans(2).metrics("numOutputRows").value === 3111)

assert(!(plans(3).asInstanceOf[FileSourceScanExecTransformer].bucketedScan))
assert(!plans(3).asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
assert(plans(3).metrics("numFiles").value === 2)
assert(plans(3).metrics("numOutputRows").value === 72678)
})
Expand Down Expand Up @@ -367,12 +366,12 @@ class GlutenClickHouseTPCHBucketSuite
}
// bucket join
assert(
plans(0)
plans.head
.asInstanceOf[HashJoinLikeExecTransformer]
.left
.isInstanceOf[ProjectExecTransformer])
assert(
plans(0)
plans.head
.asInstanceOf[HashJoinLikeExecTransformer]
.right
.isInstanceOf[ProjectExecTransformer])
Expand Down Expand Up @@ -412,8 +411,7 @@ class GlutenClickHouseTPCHBucketSuite
}
assert(!plans.head.asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
assert(plans.head.metrics("numFiles").value === 2)
val pruningTimeValue = if (isSparkVersionGE("3.4")) 0 else -1
assert(plans.head.metrics("pruningTime").value === pruningTimeValue)
assert(plans.head.metrics("pruningTime").value === pruningTimeValueSpark)
assert(plans.head.metrics("numOutputRows").value === 11618)
})
}
Expand All @@ -427,12 +425,12 @@ class GlutenClickHouseTPCHBucketSuite
}
// bucket join
assert(
plans(0)
plans.head
.asInstanceOf[HashJoinLikeExecTransformer]
.left
.isInstanceOf[FilterExecTransformerBase])
assert(
plans(0)
plans.head
.asInstanceOf[HashJoinLikeExecTransformer]
.right
.isInstanceOf[ProjectExecTransformer])
Expand Down Expand Up @@ -587,7 +585,7 @@ class GlutenClickHouseTPCHBucketSuite
def checkResult(df: DataFrame, exceptedResult: Seq[Row]): Unit = {
// check the result
val result = df.collect()
assert(result.size == exceptedResult.size)
assert(result.length == exceptedResult.size)
val sortedRes = result.map {
s =>
Row.fromSeq(s.toSeq.map {
Expand Down Expand Up @@ -788,7 +786,7 @@ class GlutenClickHouseTPCHBucketSuite
|order by l_orderkey, l_returnflag, t
|limit 10
|""".stripMargin
runSql(SQL7, false)(
runSql(SQL7, noFallBack = false)(
df => {
checkResult(
df,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,5 +194,7 @@ class GlutenClickHouseWholeStageTransformerSuite extends WholeStageTransformerSu
ignore(s"[$SPARK_VERSION_SHORT]-$testName", testTag: _*)(testFun)
}
}

lazy val pruningTimeValueSpark: Int = if (isSparkVersionLE("3.3")) -1 else 0
}
// scalastyle:off line.size.limit
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ class GlutenClickhouseCountDistinctSuite extends GlutenClickHouseWholeStageTrans
val sql = s"""
select count(distinct(a,b)) , try_add(c,b) from
values (0, null,1), (0,null,2), (1, 1,4) as data(a,b,c) group by try_add(c,b)
""";
"""
val df = spark.sql(sql)
WholeStageTransformerSuite.checkFallBack(df, noFallback = isSparkVersionGE("3.4"))
WholeStageTransformerSuite.checkFallBack(df, noFallback = isSparkVersionGE("3.5"))
}

test("check count distinct with filter") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.util.TaskResources
import scala.collection.JavaConverters._

class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite {
private val parquetMaxBlockSize = 4096;
private val parquetMaxBlockSize = 4096
override protected val needCopyParquetToTablePath = true

override protected val tablesPath: String = basePath + "/tpch-data"
Expand Down Expand Up @@ -71,8 +71,7 @@ class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite
assert(plans.size == 3)

assert(plans(2).metrics("numFiles").value === 1)
val pruningTimeValue = if (isSparkVersionGE("3.4")) 0 else -1
assert(plans(2).metrics("pruningTime").value === pruningTimeValue)
assert(plans(2).metrics("pruningTime").value === pruningTimeValueSpark)
assert(plans(2).metrics("filesSize").value === 19230111)

assert(plans(1).metrics("numOutputRows").value === 4)
Expand Down Expand Up @@ -140,16 +139,15 @@ class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite
assert(plans.size == 3)

assert(plans(2).metrics("numFiles").value === 1)
val pruningTimeValue = if (isSparkVersionGE("3.4")) 0 else -1
assert(plans(2).metrics("pruningTime").value === pruningTimeValue)
assert(plans(2).metrics("pruningTime").value === pruningTimeValueSpark)
assert(plans(2).metrics("filesSize").value === 19230111)

assert(plans(1).metrics("numOutputRows").value === 4)
assert(plans(1).metrics("outputVectors").value === 1)

// Execute Sort operator, it will read the data twice.
assert(plans(0).metrics("numOutputRows").value === 4)
assert(plans(0).metrics("outputVectors").value === 1)
assert(plans.head.metrics("numOutputRows").value === 4)
assert(plans.head.metrics("outputVectors").value === 1)
}
}
}
Expand All @@ -167,7 +165,7 @@ class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite
)

assert(nativeMetricsList.size == 1)
val nativeMetricsData = nativeMetricsList(0)
val nativeMetricsData = nativeMetricsList.head
assert(nativeMetricsData.metricsDataList.size() == 3)

assert(nativeMetricsData.metricsDataList.get(0).getName.equals("kRead"))
Expand Down Expand Up @@ -289,7 +287,7 @@ class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite
assert(joinPlan.metrics("inputBytes").value == 1920000)
}

val wholeStageTransformer2 = allWholeStageTransformers(0)
val wholeStageTransformer2 = allWholeStageTransformers.head

GlutenClickHouseMetricsUTUtils.executeMetricsUpdater(
wholeStageTransformer2,
Expand Down Expand Up @@ -327,7 +325,7 @@ class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite
)

assert(nativeMetricsList.size == 1)
val nativeMetricsData = nativeMetricsList(0)
val nativeMetricsData = nativeMetricsList.head
assert(nativeMetricsData.metricsDataList.size() == 5)

assert(nativeMetricsData.metricsDataList.get(0).getName.equals("kRead"))
Expand Down Expand Up @@ -401,7 +399,7 @@ class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite
)

assert(nativeMetricsListFinal.size == 1)
val nativeMetricsDataFinal = nativeMetricsListFinal(0)
val nativeMetricsDataFinal = nativeMetricsListFinal.head
assert(nativeMetricsDataFinal.metricsDataList.size() == 3)

assert(nativeMetricsDataFinal.metricsDataList.get(0).getName.equals("kRead"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite
assert(plans.size == 5)

assert(plans(4).metrics("numFiles").value === 1)
val pruningTimeValue = if (isSparkVersionGE("3.4")) 0 else -1
assert(plans(4).metrics("pruningTime").value === pruningTimeValue)
assert(plans(4).metrics("pruningTime").value === pruningTimeValueSpark)
assert(plans(4).metrics("filesSize").value === 19230111)
assert(plans(4).metrics("numOutputRows").value === 600572)

Expand Down Expand Up @@ -99,8 +98,7 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite
assert(plans.size == 3)

assert(plans(2).metrics("numFiles").value === 1)
val pruningTimeValue = if (isSparkVersionGE("3.4")) 0 else -1
assert(plans(2).metrics("pruningTime").value === pruningTimeValue)
assert(plans(2).metrics("pruningTime").value === pruningTimeValueSpark)
assert(plans(2).metrics("filesSize").value === 19230111)

assert(plans(1).metrics("numInputRows").value === 591673)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,7 @@ class GlutenClickHouseTPCHParquetBucketSuite
}
assert(!plans.head.asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
assert(plans.head.metrics("numFiles").value === 4)
val pruningTimeValue = if (isSparkVersionGE("3.4")) 0 else -1
assert(plans.head.metrics("pruningTime").value === pruningTimeValue)
assert(plans.head.metrics("pruningTime").value === pruningTimeValueSpark)
assert(plans.head.metrics("numOutputRows").value === 600572)
}
)
Expand Down Expand Up @@ -458,8 +457,7 @@ class GlutenClickHouseTPCHParquetBucketSuite
}
assert(!plans.head.asInstanceOf[FileSourceScanExecTransformer].bucketedScan)
assert(plans.head.metrics("numFiles").value === 4)
val pruningTimeValue = if (isSparkVersionGE("3.4")) 0 else -1
assert(plans.head.metrics("pruningTime").value === pruningTimeValue)
assert(plans.head.metrics("pruningTime").value === pruningTimeValueSpark)
assert(plans.head.metrics("numOutputRows").value === 600572)
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ trait NativeWriteChecker
override def onSuccess(funcName: String, qe: QueryExecution, duration: Long): Unit = {
if (!nativeUsed) {
val executedPlan = stripAQEPlan(qe.executedPlan)
nativeUsed = if (isSparkVersionGE("3.4")) {
nativeUsed = if (isSparkVersionGE("3.5")) {
executedPlan.find(_.isInstanceOf[ColumnarWriteFilesExec]).isDefined
} else {
executedPlan.find(_.isInstanceOf[FakeRowAdaptor]).isDefined
Expand Down

0 comments on commit 29f8756

Please sign in to comment.