Skip to content

Commit

Permalink
Move Hive related suite into hive package
Browse files Browse the repository at this point in the history
  • Loading branch information
baibaichen committed Aug 16, 2024
1 parent 62900d7 commit 6763354
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 205 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,13 @@ class GlutenClickHouseWholeStageTransformerSuite extends WholeStageTransformerSu
super.beforeAll()
}

protected val rootPath: String = this.getClass.getResource("/").getPath
protected val basePath: String = rootPath + "tests-working-home"
protected val warehouse: String = basePath + "/spark-warehouse"
protected val metaStorePathAbsolute: String = basePath + "/meta"
protected val hiveMetaStoreDB: String = metaStorePathAbsolute + "/metastore_db"
final protected val rootPath: String = this.getClass.getResource("/").getPath
final protected val basePath: String = rootPath + "tests-working-home"
final protected val warehouse: String = basePath + "/spark-warehouse"
final protected val metaStorePathAbsolute: String = basePath + "/meta"

protected val hiveMetaStoreDB: String =
s"$metaStorePathAbsolute/${getClass.getSimpleName}/metastore_db"

final override protected val resourcePath: String = "" // ch not need this
override protected val fileFormat: String = "parquet"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
*/
package org.apache.gluten.execution

import org.apache.gluten.execution.AllDataTypesWithComplexType.genTestData
import org.apache.gluten.test.AllDataTypesWithComplexType
import org.apache.gluten.test.AllDataTypesWithComplexType.genTestData

import org.apache.spark.SparkConf
class GlutenClickhouseCountDistinctSuite extends GlutenClickHouseWholeStageTransformerSuite {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution
package org.apache.gluten.execution.hive

import org.apache.gluten.GlutenConfig
import org.apache.gluten.execution.{GlutenClickHouseWholeStageTransformerSuite, ProjectExecTransformer, TransformSupport}
import org.apache.gluten.test.AllDataTypesWithComplexType
import org.apache.gluten.utils.UTSystemParameters

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.{DataFrame, SaveMode}
import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.hive.HiveTableScanExecTransformer
Expand All @@ -29,64 +31,14 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.hadoop.fs.Path

import java.io.{File, PrintWriter}
import java.sql.{Date, Timestamp}

import scala.reflect.ClassTag

case class AllDataTypesWithComplexType(
string_field: String = null,
int_field: java.lang.Integer = null,
long_field: java.lang.Long = null,
float_field: java.lang.Float = null,
double_field: java.lang.Double = null,
short_field: java.lang.Short = null,
byte_field: java.lang.Byte = null,
boolean_field: java.lang.Boolean = null,
decimal_field: java.math.BigDecimal = null,
date_field: java.sql.Date = null,
timestamp_field: java.sql.Timestamp = null,
array: Seq[Int] = null,
arrayContainsNull: Seq[Option[Int]] = null,
map: Map[Int, Long] = null,
mapValueContainsNull: Map[Int, Option[Long]] = null
)

object AllDataTypesWithComplexType {
def genTestData(): Seq[AllDataTypesWithComplexType] = {
(0 to 199).map {
i =>
if (i % 100 == 1) {
AllDataTypesWithComplexType()
} else {
AllDataTypesWithComplexType(
s"$i",
i,
i.toLong,
i.toFloat,
i.toDouble,
i.toShort,
i.toByte,
i % 2 == 0,
new java.math.BigDecimal(i + ".56"),
Date.valueOf(new Date(System.currentTimeMillis()).toLocalDate.plusDays(i % 10)),
Timestamp.valueOf(
new Timestamp(System.currentTimeMillis()).toLocalDateTime.plusDays(i % 10)),
Seq.apply(i + 1, i + 2, i + 3),
Seq.apply(Option.apply(i + 1), Option.empty, Option.apply(i + 3)),
Map.apply((i + 1, i + 2), (i + 3, i + 4)),
Map.empty
)
}
}
}
}

class GlutenClickHouseHiveTableSuite
extends GlutenClickHouseWholeStageTransformerSuite
with ReCreateHiveSession
with AdaptiveSparkPlanHelper {

private var _hiveSpark: SparkSession = _

override protected def sparkConf: SparkConf = {
new SparkConf()
.set("spark.plugins", "org.apache.gluten.GlutenPlugin")
Expand Down Expand Up @@ -119,22 +71,6 @@ class GlutenClickHouseHiveTableSuite
.setMaster("local[*]")
}

override protected def spark: SparkSession = _hiveSpark

override protected def initializeSession(): Unit = {
if (_hiveSpark == null) {
val hiveMetaStoreDB = metaStorePathAbsolute + "/metastore_db"
_hiveSpark = SparkSession
.builder()
.config(sparkConf)
.enableHiveSupport()
.config(
"javax.jdo.option.ConnectionURL",
s"jdbc:derby:;databaseName=$hiveMetaStoreDB;create=true")
.getOrCreate()
}
}

private val txt_table_name = "hive_txt_test"
private val txt_user_define_input = "hive_txt_user_define_input"
private val json_table_name = "hive_json_test"
Expand Down Expand Up @@ -235,24 +171,7 @@ class GlutenClickHouseHiveTableSuite

override protected def afterAll(): Unit = {
DeltaLog.clearCache()

try {
super.afterAll()
} finally {
try {
if (_hiveSpark != null) {
try {
_hiveSpark.sessionState.catalog.reset()
} finally {
_hiveSpark.stop()
_hiveSpark = null
}
}
} finally {
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()
}
}
super.afterAll()
}

test("test hive text table") {
Expand Down Expand Up @@ -957,7 +876,7 @@ class GlutenClickHouseHiveTableSuite
val select_sql_4 = "select id, get_json_object(data, '$.v111') from test_tbl_3337"
val select_sql_5 = "select id, get_json_object(data, 'v112') from test_tbl_3337"
val select_sql_6 =
"select id, get_json_object(data, '$.id') from test_tbl_3337 where id = 123";
"select id, get_json_object(data, '$.id') from test_tbl_3337 where id = 123"
compareResultsAgainstVanillaSpark(select_sql_1, compareResult = true, _ => {})
compareResultsAgainstVanillaSpark(select_sql_2, compareResult = true, _ => {})
compareResultsAgainstVanillaSpark(select_sql_3, compareResult = true, _ => {})
Expand Down Expand Up @@ -1311,7 +1230,7 @@ class GlutenClickHouseHiveTableSuite
.format(dataPath)
val select_sql = "select * from test_tbl_6506"
spark.sql(create_table_sql)
compareResultsAgainstVanillaSpark(select_sql, true, _ => {})
compareResultsAgainstVanillaSpark(select_sql, compareResult = true, _ => {})
spark.sql("drop table test_tbl_6506")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution
package org.apache.gluten.execution.hive

import org.apache.gluten.GlutenConfig
import org.apache.gluten.execution.AllDataTypesWithComplexType.genTestData
import org.apache.gluten.execution.GlutenClickHouseWholeStageTransformerSuite
import org.apache.gluten.test.AllDataTypesWithComplexType.genTestData
import org.apache.gluten.utils.UTSystemParameters

import org.apache.spark.SparkConf
import org.apache.spark.gluten.NativeWriteChecker
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{DecimalType, LongType, StringType, StructField, StructType}

import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql.types._

import scala.reflect.runtime.universe.TypeTag

class GlutenClickHouseNativeWriteTableSuite
extends GlutenClickHouseWholeStageTransformerSuite
with AdaptiveSparkPlanHelper
with SharedSparkSession
with BeforeAndAfterAll
with ReCreateHiveSession
with NativeWriteChecker {

private var _hiveSpark: SparkSession = _

override protected def sparkConf: SparkConf = {
var sessionTimeZone = "GMT"
if (isSparkVersionGE("3.5")) {
Expand Down Expand Up @@ -80,45 +74,12 @@ class GlutenClickHouseNativeWriteTableSuite
basePath + "/中文/spark-warehouse"
}

override protected def spark: SparkSession = _hiveSpark

override protected def initializeSession(): Unit = {
if (_hiveSpark == null) {
val hiveMetaStoreDB = metaStorePathAbsolute + "/metastore_db"
_hiveSpark = SparkSession
.builder()
.config(sparkConf)
.enableHiveSupport()
.config(
"javax.jdo.option.ConnectionURL",
s"jdbc:derby:;databaseName=$hiveMetaStoreDB;create=true")
.getOrCreate()
}
}

private val table_name_template = "hive_%s_test"
private val table_name_vanilla_template = "hive_%s_test_written_by_vanilla"

override protected def afterAll(): Unit = {
DeltaLog.clearCache()

try {
super.afterAll()
} finally {
try {
if (_hiveSpark != null) {
try {
_hiveSpark.sessionState.catalog.reset()
} finally {
_hiveSpark.stop()
_hiveSpark = null
}
}
} finally {
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()
}
}
super.afterAll()
}

def getColumnName(s: String): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution
package org.apache.gluten.execution.hive

import org.apache.gluten.execution.GlutenClickHouseTPCHAbstractSuite

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession.{getActiveSession, getDefaultSession}
import org.apache.spark.sql.delta.{ClickhouseSnapshot, DeltaLog}
import org.apache.spark.sql.delta.ClickhouseSnapshot
import org.apache.spark.sql.delta.catalog.ClickHouseTableV2
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper

Expand All @@ -33,7 +35,8 @@ import java.io.File
// This suite is to make sure clickhouse commands works well even after spark restart
class GlutenClickHouseTableAfterRestart
extends GlutenClickHouseTPCHAbstractSuite
with AdaptiveSparkPlanHelper {
with AdaptiveSparkPlanHelper
with ReCreateHiveSession {

override protected val needCopyParquetToTablePath = true

Expand Down Expand Up @@ -64,56 +67,18 @@ class GlutenClickHouseTableAfterRestart
.set(
"spark.gluten.sql.columnar.backend.ch.runtime_settings.input_format_parquet_max_block_size",
"8192")
.setMaster("local[2]")
}

override protected def createTPCHNotNullTables(): Unit = {
createNotNullTPCHTablesInParquet(tablesPath)
}

private var _hiveSpark: SparkSession = _
override protected def spark: SparkSession = _hiveSpark

override protected def initializeSession(): Unit = {
if (_hiveSpark == null) {
val hiveMetaStoreDB = metaStorePathAbsolute + "/metastore_db_" + current_db_num
current_db_num += 1

_hiveSpark = SparkSession
.builder()
.config(sparkConf)
.enableHiveSupport()
.config(
"javax.jdo.option.ConnectionURL",
s"jdbc:derby:;databaseName=$hiveMetaStoreDB;create=true")
.master("local[2]")
.getOrCreate()
}
}

override protected def afterAll(): Unit = {
DeltaLog.clearCache()

try {
super.afterAll()
} finally {
try {
if (_hiveSpark != null) {
try {
_hiveSpark.sessionState.catalog.reset()
} finally {
_hiveSpark.stop()
_hiveSpark = null
}
}
} finally {
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()
}
}
}

var current_db_num: Int = 0

override protected val hiveMetaStoreDB: String =
metaStorePathAbsolute + "/metastore_db_" + current_db_num

test("test mergetree after restart") {
spark.sql(s"""
|DROP TABLE IF EXISTS lineitem_mergetree;
Expand Down Expand Up @@ -347,22 +312,22 @@ class GlutenClickHouseTableAfterRestart
SparkSession.clearDefaultSession()
}

val hiveMetaStoreDB = metaStorePathAbsolute + "/metastore_db_"
val metaStoreDB = metaStorePathAbsolute + "/metastore_db_"
// use metastore_db2 to avoid issue: "Another instance of Derby may have already booted the database"
val destDir = new File(hiveMetaStoreDB + current_db_num)
destDir.mkdirs()
FileUtils.copyDirectory(new File(hiveMetaStoreDB + (current_db_num - 1)), destDir)
_hiveSpark = null
_hiveSpark = SparkSession
.builder()
.config(sparkConf)
.enableHiveSupport()
.config(
"javax.jdo.option.ConnectionURL",
s"jdbc:derby:;databaseName=$hiveMetaStoreDB$current_db_num")
.master("local[2]")
.getOrCreate()
current_db_num += 1
val destDir = new File(metaStoreDB + current_db_num)
destDir.mkdirs()
FileUtils.copyDirectory(new File(metaStoreDB + (current_db_num - 1)), destDir)
updateHiveSession(
SparkSession
.builder()
.config(sparkConf)
.enableHiveSupport()
.config(
"javax.jdo.option.ConnectionURL",
s"jdbc:derby:;databaseName=$metaStoreDB$current_db_num")
.getOrCreate()
)
}
}
// scalastyle:off line.size.limit
Loading

0 comments on commit 6763354

Please sign in to comment.