Skip to content

Commit

Permalink
[GLUTEN-6067][CH][MINOR][UT] Followup 6623, fix backends-clickhouse u…
Browse files Browse the repository at this point in the history
…t issse in CI (#6891)

* fix fallback in spark 3.5

* Remove hive support in GlutenClickhouseFunctionSuite

* Move Hive related suite into hive package

* fix ut for spark 35

* fix celeborn ut for spark 35

* fix gluten ut for spark 35

* remove duplicated dependency

* fix dependency for spark 3.5 ut
  • Loading branch information
baibaichen authored Aug 17, 2024
1 parent 6dcf83f commit f41129e
Show file tree
Hide file tree
Showing 15 changed files with 354 additions and 346 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,20 +188,33 @@ class GlutenClickHouseMergeTreeWriteOnS3Suite
var metadataGlutenExist: Boolean = false
var metadataBinExist: Boolean = false
var dataBinExist: Boolean = false
var hasCommits = false
client
.listObjects(args)
.forEach(
obj => {
objectCount += 1
if (obj.get().objectName().contains("metadata.gluten")) {
val objectName = obj.get().objectName()
if (objectName.contains("metadata.gluten")) {
metadataGlutenExist = true
} else if (obj.get().objectName().contains("meta.bin")) {
} else if (objectName.contains("meta.bin")) {
metadataBinExist = true
} else if (obj.get().objectName().contains("data.bin")) {
} else if (objectName.contains("data.bin")) {
dataBinExist = true
} else if (objectName.contains("_commits")) {
// Spark 35 has _commits directory
// table/_delta_log/_commits/
hasCommits = true
}
})
assertResult(5)(objectCount)

if (isSparkVersionGE("3.5")) {
assertResult(6)(objectCount)
assert(hasCommits)
} else {
assertResult(5)(objectCount)
}

assert(metadataGlutenExist)
assert(metadataBinExist)
assert(dataBinExist)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,13 @@ class GlutenClickHouseWholeStageTransformerSuite extends WholeStageTransformerSu
super.beforeAll()
}

protected val rootPath: String = this.getClass.getResource("/").getPath
protected val basePath: String = rootPath + "tests-working-home"
protected val warehouse: String = basePath + "/spark-warehouse"
protected val metaStorePathAbsolute: String = basePath + "/meta"
protected val hiveMetaStoreDB: String = metaStorePathAbsolute + "/metastore_db"
final protected val rootPath: String = this.getClass.getResource("/").getPath
final protected val basePath: String = rootPath + "tests-working-home"
final protected val warehouse: String = basePath + "/spark-warehouse"
final protected val metaStorePathAbsolute: String = basePath + "/meta"

protected val hiveMetaStoreDB: String =
s"$metaStorePathAbsolute/${getClass.getSimpleName}/metastore_db"

final override protected val resourcePath: String = "" // ch not need this
override protected val fileFormat: String = "parquet"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
*/
package org.apache.gluten.execution

import org.apache.gluten.execution.AllDataTypesWithComplexType.genTestData
import org.apache.gluten.test.AllDataTypesWithComplexType
import org.apache.gluten.test.AllDataTypesWithComplexType.genTestData

import org.apache.spark.SparkConf
class GlutenClickhouseCountDistinctSuite extends GlutenClickHouseWholeStageTransformerSuite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,6 @@ import org.apache.gluten.GlutenConfig
import org.apache.gluten.utils.UTSystemParameters

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.DeltaLog

import org.apache.commons.io.FileUtils

import java.io.File

class GlutenClickhouseFunctionSuite extends GlutenClickHouseTPCHAbstractSuite {
override protected val needCopyParquetToTablePath = true
Expand All @@ -39,9 +33,6 @@ class GlutenClickhouseFunctionSuite extends GlutenClickHouseTPCHAbstractSuite {
createNotNullTPCHTablesInParquet(tablesPath)
}

private var _hiveSpark: SparkSession = _
override protected def spark: SparkSession = _hiveSpark

override protected def sparkConf: SparkConf = {
new SparkConf()
.set("spark.plugins", "org.apache.gluten.GlutenPlugin")
Expand Down Expand Up @@ -69,70 +60,21 @@ class GlutenClickhouseFunctionSuite extends GlutenClickHouseTPCHAbstractSuite {
.setMaster("local[1]")
}

override protected def initializeSession(): Unit = {
if (_hiveSpark == null) {
val hiveMetaStoreDB = metaStorePathAbsolute + "/metastore_db"
_hiveSpark = SparkSession
.builder()
.config(sparkConf)
.enableHiveSupport()
.config(
"javax.jdo.option.ConnectionURL",
s"jdbc:derby:;databaseName=$hiveMetaStoreDB;create=true")
.getOrCreate()
}
}

override def beforeAll(): Unit = {
// prepare working paths
val basePathDir = new File(basePath)
if (basePathDir.exists()) {
FileUtils.forceDelete(basePathDir)
}
FileUtils.forceMkdir(basePathDir)
FileUtils.forceMkdir(new File(warehouse))
FileUtils.forceMkdir(new File(metaStorePathAbsolute))
FileUtils.copyDirectory(new File(rootPath + resourcePath), new File(tablesPath))
super.beforeAll()
}

override protected def afterAll(): Unit = {
DeltaLog.clearCache()

try {
super.afterAll()
} finally {
try {
if (_hiveSpark != null) {
try {
_hiveSpark.sessionState.catalog.reset()
} finally {
_hiveSpark.stop()
_hiveSpark = null
}
}
} finally {
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()
}
}
}

test("test uuid - write and read") {
withSQLConf(
("spark.gluten.sql.native.writer.enabled", "true"),
(GlutenConfig.GLUTEN_ENABLED.key, "true")) {
withTable("uuid_test") {
spark.sql("create table if not exists uuid_test (id string) using parquet")

spark.sql("drop table if exists uuid_test")
spark.sql("create table if not exists uuid_test (id string) stored as parquet")

val df = spark.sql("select regexp_replace(uuid(), '-', '') as id from range(1)")
df.cache()
df.write.insertInto("uuid_test")
val df = spark.sql("select regexp_replace(uuid(), '-', '') as id from range(1)")
df.cache()
df.write.insertInto("uuid_test")

val df2 = spark.table("uuid_test")
val diffCount = df.exceptAll(df2).count()
assert(diffCount == 0)
val df2 = spark.table("uuid_test")
val diffCount = df.exceptAll(df2).count()
assert(diffCount == 0)
}
}
}

Expand Down Expand Up @@ -181,49 +123,51 @@ class GlutenClickhouseFunctionSuite extends GlutenClickHouseTPCHAbstractSuite {
}

test("GLUTEN-5981 null value from get_json_object") {
spark.sql("create table json_t1 (a string) using parquet")
spark.sql("insert into json_t1 values ('{\"a\":null}')")
runQueryAndCompare(
"""
|SELECT get_json_object(a, '$.a') is null from json_t1
|""".stripMargin
)(df => checkFallbackOperators(df, 0))
spark.sql("drop table json_t1")
withTable("json_t1") {
spark.sql("create table json_t1 (a string) using parquet")
spark.sql("insert into json_t1 values ('{\"a\":null}')")
runQueryAndCompare(
"""
|SELECT get_json_object(a, '$.a') is null from json_t1
|""".stripMargin
)(df => checkFallbackOperators(df, 0))
}
}

test("Fix arrayDistinct(Array(Nullable(Decimal))) core dump") {
val create_sql =
"""
|create table if not exists test(
| dec array<decimal(10, 2)>
|) using parquet
|""".stripMargin
val fill_sql =
"""
|insert into test values(array(1, 2, null)), (array(null, 2,3, 5))
|""".stripMargin
val query_sql =
"""
|select array_distinct(dec) from test;
|""".stripMargin
spark.sql(create_sql)
spark.sql(fill_sql)
compareResultsAgainstVanillaSpark(query_sql, true, { _ => })
spark.sql("drop table test")
withTable("json_t1") {
val create_sql =
"""
|create table if not exists test(
| dec array<decimal(10, 2)>
|) using parquet
|""".stripMargin
val fill_sql =
"""
|insert into test values(array(1, 2, null)), (array(null, 2,3, 5))
|""".stripMargin
val query_sql =
"""
|select array_distinct(dec) from test;
|""".stripMargin
spark.sql(create_sql)
spark.sql(fill_sql)
compareResultsAgainstVanillaSpark(query_sql, true, { _ => })
}
}

test("intersect all") {
spark.sql("create table t1 (a int, b string) using parquet")
spark.sql("insert into t1 values (1, '1'),(2, '2'),(3, '3'),(4, '4'),(5, '5'),(6, '6')")
spark.sql("create table t2 (a int, b string) using parquet")
spark.sql("insert into t2 values (4, '4'),(5, '5'),(6, '6'),(7, '7'),(8, '8'),(9, '9')")
runQueryAndCompare(
"""
|SELECT a,b FROM t1 INTERSECT ALL SELECT a,b FROM t2
|""".stripMargin
)(df => checkFallbackOperators(df, 0))
spark.sql("drop table t1")
spark.sql("drop table t2")
withTable("t1", "t2") {
spark.sql("create table t1 (a int, b string) using parquet")
spark.sql("insert into t1 values (1, '1'),(2, '2'),(3, '3'),(4, '4'),(5, '5'),(6, '6')")
spark.sql("create table t2 (a int, b string) using parquet")
spark.sql("insert into t2 values (4, '4'),(5, '5'),(6, '6'),(7, '7'),(8, '8'),(9, '9')")
runQueryAndCompare(
"""
|SELECT a,b FROM t1 INTERSECT ALL SELECT a,b FROM t2
|""".stripMargin
)(df => checkFallbackOperators(df, 0))
}
}

test("array decimal32 CH column to row") {
Expand Down
Loading

0 comments on commit f41129e

Please sign in to comment.