Skip to content

Commit

Permalink
Remove hive support in GlutenClickhouseFunctionSuite
Browse files Browse the repository at this point in the history
  • Loading branch information
baibaichen committed Aug 16, 2024
1 parent 207b320 commit 62900d7
Showing 1 changed file with 48 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,6 @@ import org.apache.gluten.GlutenConfig
import org.apache.gluten.utils.UTSystemParameters

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.DeltaLog

import org.apache.commons.io.FileUtils

import java.io.File

class GlutenClickhouseFunctionSuite extends GlutenClickHouseTPCHAbstractSuite {
override protected val needCopyParquetToTablePath = true
Expand All @@ -39,9 +33,6 @@ class GlutenClickhouseFunctionSuite extends GlutenClickHouseTPCHAbstractSuite {
createNotNullTPCHTablesInParquet(tablesPath)
}

private var _hiveSpark: SparkSession = _
override protected def spark: SparkSession = _hiveSpark

override protected def sparkConf: SparkConf = {
new SparkConf()
.set("spark.plugins", "org.apache.gluten.GlutenPlugin")
Expand Down Expand Up @@ -69,70 +60,21 @@ class GlutenClickhouseFunctionSuite extends GlutenClickHouseTPCHAbstractSuite {
.setMaster("local[1]")
}

override protected def initializeSession(): Unit = {
if (_hiveSpark == null) {
val hiveMetaStoreDB = metaStorePathAbsolute + "/metastore_db"
_hiveSpark = SparkSession
.builder()
.config(sparkConf)
.enableHiveSupport()
.config(
"javax.jdo.option.ConnectionURL",
s"jdbc:derby:;databaseName=$hiveMetaStoreDB;create=true")
.getOrCreate()
}
}

override def beforeAll(): Unit = {
// prepare working paths
val basePathDir = new File(basePath)
if (basePathDir.exists()) {
FileUtils.forceDelete(basePathDir)
}
FileUtils.forceMkdir(basePathDir)
FileUtils.forceMkdir(new File(warehouse))
FileUtils.forceMkdir(new File(metaStorePathAbsolute))
FileUtils.copyDirectory(new File(rootPath + resourcePath), new File(tablesPath))
super.beforeAll()
}

override protected def afterAll(): Unit = {
DeltaLog.clearCache()

try {
super.afterAll()
} finally {
try {
if (_hiveSpark != null) {
try {
_hiveSpark.sessionState.catalog.reset()
} finally {
_hiveSpark.stop()
_hiveSpark = null
}
}
} finally {
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()
}
}
}

test("test uuid - write and read") {
withSQLConf(
("spark.gluten.sql.native.writer.enabled", "true"),
(GlutenConfig.GLUTEN_ENABLED.key, "true")) {
withTable("uuid_test") {
spark.sql("create table if not exists uuid_test (id string) using parquet")

spark.sql("drop table if exists uuid_test")
spark.sql("create table if not exists uuid_test (id string) stored as parquet")

val df = spark.sql("select regexp_replace(uuid(), '-', '') as id from range(1)")
df.cache()
df.write.insertInto("uuid_test")
val df = spark.sql("select regexp_replace(uuid(), '-', '') as id from range(1)")
df.cache()
df.write.insertInto("uuid_test")

val df2 = spark.table("uuid_test")
val diffCount = df.exceptAll(df2).count()
assert(diffCount == 0)
val df2 = spark.table("uuid_test")
val diffCount = df.exceptAll(df2).count()
assert(diffCount == 0)
}
}
}

Expand Down Expand Up @@ -181,49 +123,51 @@ class GlutenClickhouseFunctionSuite extends GlutenClickHouseTPCHAbstractSuite {
}

test("GLUTEN-5981 null value from get_json_object") {
spark.sql("create table json_t1 (a string) using parquet")
spark.sql("insert into json_t1 values ('{\"a\":null}')")
runQueryAndCompare(
"""
|SELECT get_json_object(a, '$.a') is null from json_t1
|""".stripMargin
)(df => checkFallbackOperators(df, 0))
spark.sql("drop table json_t1")
withTable("json_t1") {
spark.sql("create table json_t1 (a string) using parquet")
spark.sql("insert into json_t1 values ('{\"a\":null}')")
runQueryAndCompare(
"""
|SELECT get_json_object(a, '$.a') is null from json_t1
|""".stripMargin
)(df => checkFallbackOperators(df, 0))
}
}

test("Fix arrayDistinct(Array(Nullable(Decimal))) core dump") {
val create_sql =
"""
|create table if not exists test(
| dec array<decimal(10, 2)>
|) using parquet
|""".stripMargin
val fill_sql =
"""
|insert into test values(array(1, 2, null)), (array(null, 2,3, 5))
|""".stripMargin
val query_sql =
"""
|select array_distinct(dec) from test;
|""".stripMargin
spark.sql(create_sql)
spark.sql(fill_sql)
compareResultsAgainstVanillaSpark(query_sql, true, { _ => })
spark.sql("drop table test")
withTable("json_t1") {
val create_sql =
"""
|create table if not exists test(
| dec array<decimal(10, 2)>
|) using parquet
|""".stripMargin
val fill_sql =
"""
|insert into test values(array(1, 2, null)), (array(null, 2,3, 5))
|""".stripMargin
val query_sql =
"""
|select array_distinct(dec) from test;
|""".stripMargin
spark.sql(create_sql)
spark.sql(fill_sql)
compareResultsAgainstVanillaSpark(query_sql, true, { _ => })
}
}

test("intersect all") {
spark.sql("create table t1 (a int, b string) using parquet")
spark.sql("insert into t1 values (1, '1'),(2, '2'),(3, '3'),(4, '4'),(5, '5'),(6, '6')")
spark.sql("create table t2 (a int, b string) using parquet")
spark.sql("insert into t2 values (4, '4'),(5, '5'),(6, '6'),(7, '7'),(8, '8'),(9, '9')")
runQueryAndCompare(
"""
|SELECT a,b FROM t1 INTERSECT ALL SELECT a,b FROM t2
|""".stripMargin
)(df => checkFallbackOperators(df, 0))
spark.sql("drop table t1")
spark.sql("drop table t2")
withTable("t1", "t2") {
spark.sql("create table t1 (a int, b string) using parquet")
spark.sql("insert into t1 values (1, '1'),(2, '2'),(3, '3'),(4, '4'),(5, '5'),(6, '6')")
spark.sql("create table t2 (a int, b string) using parquet")
spark.sql("insert into t2 values (4, '4'),(5, '5'),(6, '6'),(7, '7'),(8, '8'),(9, '9')")
runQueryAndCompare(
"""
|SELECT a,b FROM t1 INTERSECT ALL SELECT a,b FROM t2
|""".stripMargin
)(df => checkFallbackOperators(df, 0))
}
}

test("array decimal32 CH column to row") {
Expand Down

0 comments on commit 62900d7

Please sign in to comment.