Skip to content

Commit

Permalink
[GLUTEN-5484] add missing tests for clickhouse (#5485)
Browse files Browse the repository at this point in the history
[GLUTEN-5484] add missing tests for clickhouse
  • Loading branch information
shuai-xu authored Apr 24, 2024
1 parent fd2519a commit 43d7262
Show file tree
Hide file tree
Showing 13 changed files with 240 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.sql.execution.datasources.v2._
import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite
import org.apache.spark.sql.execution.joins.{GlutenExistenceJoinSuite, GlutenInnerJoinSuite, GlutenOuterJoinSuite}
import org.apache.spark.sql.extension.{GlutenCustomerExpressionTransformerSuite, GlutenCustomerExtensionSuite, GlutenSessionExtensionSuite}
import org.apache.spark.sql.hive.execution.GlutenHiveSQLQueryCHSuite
import org.apache.spark.sql.sources._
import org.apache.spark.sql.statistics.SparkFunctionStatistics

Expand Down Expand Up @@ -2196,6 +2197,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
.exclude("SELECT structFieldComplex.Value.`value_(2)` FROM tableWithSchema")
enableSuite[SparkFunctionStatistics]
enableSuite[GlutenSparkSessionExtensionSuite]
enableSuite[GlutenHiveSQLQueryCHSuite]

override def getSQLQueryTestSettings: SQLQueryTestSettings = ClickHouseSQLQueryTestSettings
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.datasources.v2.GlutenFileTableSuite
import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite
import org.apache.spark.sql.execution.joins.{GlutenBroadcastJoinSuite, GlutenExistenceJoinSuite, GlutenInnerJoinSuite, GlutenOuterJoinSuite}
import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenCustomerExpressionTransformerSuite, GlutenCustomerExtensionSuite, GlutenSessionExtensionSuite}
import org.apache.spark.sql.hive.execution.{GlutenHiveSQLQueryCHSuite, GlutenHiveSQLQuerySuite}
import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite
import org.apache.spark.sql.sources.{GlutenBucketedReadWithoutHiveSupportSuite, GlutenBucketedWriteWithoutHiveSupportSuite, GlutenCreateTableAsSelectSuite, GlutenDDLSourceLoadSuite, GlutenDisableUnnecessaryBucketedScanWithoutHiveSupportSuite, GlutenDisableUnnecessaryBucketedScanWithoutHiveSupportSuiteAE, GlutenExternalCommandRunnerSuite, GlutenFilteredScanSuite, GlutenFiltersSuite, GlutenInsertSuite, GlutenPartitionedWriteSuite, GlutenPathOptionSuite, GlutenPrunedScanSuite, GlutenResolvedDataSourceSuite, GlutenSaveLoadSuite, GlutenTableScanSuite}

// Some settings' line length exceeds 100
Expand Down Expand Up @@ -1081,7 +1081,6 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenStatisticsCollectionSuite]
.exclude("SPARK-33687: analyze all tables in a specific database")
enableSuite[FallbackStrategiesSuite]
enableSuite[GlutenHiveSQLQueryCHSuite]
enableSuite[GlutenHiveSQLQuerySuite]
enableSuite[GlutenCollapseProjectExecTransformerSuite]
enableSuite[GlutenSparkSessionExtensionSuite]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package org.apache.spark.sql.hive.execution

import org.apache.gluten.GlutenConfig
import org.apache.gluten.utils.SystemParameters

import org.apache.spark.{DebugFilesystem, SparkConf}
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.TableIdentifier
Expand All @@ -25,7 +28,9 @@ class GlutenHiveSQLQueryCHSuite extends GlutenHiveSQLQuerySuiteBase {
override def sparkConf: SparkConf = {
defaultSparkConf
.set("spark.plugins", "org.apache.gluten.GlutenPlugin")
.set("spark.gluten.sql.columnar.backend.lib", "ch")
.set(GlutenConfig.GLUTEN_LIB_PATH, SystemParameters.getClickHouseLibPath)
.set("spark.gluten.sql.enable.native.validation", "false")
.set("spark.gluten.sql.native.writer.enabled", "true")
.set("spark.sql.storeAssignmentPolicy", "legacy")
.set("spark.default.parallelism", "1")
.set("spark.memory.offHeap.enabled", "true")
Expand Down Expand Up @@ -99,4 +104,5 @@ class GlutenHiveSQLQueryCHSuite extends GlutenHiveSQLQuerySuiteBase {
ignoreIfNotExists = true,
purge = false)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite
import org.apache.spark.sql.execution.joins.{GlutenBroadcastJoinSuite, GlutenExistenceJoinSuite, GlutenInnerJoinSuite, GlutenOuterJoinSuite}
import org.apache.spark.sql.extension.{GlutenCustomerExtensionSuite, GlutenSessionExtensionSuite}
import org.apache.spark.sql.gluten.GlutenFallbackSuite
import org.apache.spark.sql.hive.execution.GlutenHiveSQLQueryCHSuite
import org.apache.spark.sql.sources._
import org.apache.spark.sql.statistics.SparkFunctionStatistics

Expand Down Expand Up @@ -2107,6 +2108,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
.excludeGlutenTest("fallbackSummary with cache")
.excludeGlutenTest("fallbackSummary with cached data and shuffle")
enableSuite[GlutenSparkSessionExtensionSuite]
enableSuite[GlutenHiveSQLQueryCHSuite]

override def getSQLQueryTestSettings: SQLQueryTestSettings = ClickHouseSQLQueryTestSettings
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite
import org.apache.spark.sql.execution.joins.{GlutenBroadcastJoinSuite, GlutenExistenceJoinSuite, GlutenInnerJoinSuite, GlutenOuterJoinSuite}
import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer}
import org.apache.spark.sql.gluten.GlutenFallbackSuite
import org.apache.spark.sql.hive.execution.{GlutenHiveSQLQueryCHSuite, GlutenHiveSQLQuerySuite}
import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite
import org.apache.spark.sql.sources._

// Some settings' line length exceeds 100
Expand Down Expand Up @@ -1135,7 +1135,6 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("cases when literal is max")
enableSuite[GlutenXPathFunctionsSuite]
enableSuite[GlutenFallbackSuite]
enableSuite[GlutenHiveSQLQueryCHSuite]
enableSuite[GlutenHiveSQLQuerySuite]
enableSuite[GlutenImplicitsTest]
enableSuite[GlutenCollapseProjectExecTransformerSuite]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package org.apache.spark.sql.hive.execution

import org.apache.gluten.GlutenConfig
import org.apache.gluten.utils.SystemParameters

import org.apache.spark.{DebugFilesystem, SparkConf}
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.TableIdentifier
Expand All @@ -25,7 +28,9 @@ class GlutenHiveSQLQueryCHSuite extends GlutenHiveSQLQuerySuiteBase {
override def sparkConf: SparkConf = {
defaultSparkConf
.set("spark.plugins", "org.apache.gluten.GlutenPlugin")
.set("spark.gluten.sql.columnar.backend.lib", "ch")
.set(GlutenConfig.GLUTEN_LIB_PATH, SystemParameters.getClickHouseLibPath)
.set("spark.gluten.sql.enable.native.validation", "false")
.set("spark.gluten.sql.native.writer.enabled", "true")
.set("spark.sql.storeAssignmentPolicy", "legacy")
.set("spark.default.parallelism", "1")
.set("spark.memory.offHeap.enabled", "true")
Expand Down Expand Up @@ -99,4 +104,5 @@ class GlutenHiveSQLQueryCHSuite extends GlutenHiveSQLQuerySuiteBase {
ignoreIfNotExists = true,
purge = false)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite
import org.apache.spark.sql.execution.joins.{GlutenBroadcastJoinSuite, GlutenExistenceJoinSuite, GlutenInnerJoinSuite, GlutenOuterJoinSuite}
import org.apache.spark.sql.extension.{GlutenCustomerExtensionSuite, GlutenSessionExtensionSuite}
import org.apache.spark.sql.gluten.GlutenFallbackSuite
import org.apache.spark.sql.hive.execution.GlutenHiveSQLQueryCHSuite
import org.apache.spark.sql.sources._
import org.apache.spark.sql.statistics.SparkFunctionStatistics

Expand Down Expand Up @@ -1891,6 +1892,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
.exclude("SELECT structFieldComplex.Value.`value_(2)` FROM tableWithSchema")
enableSuite[SparkFunctionStatistics]
enableSuite[GlutenSparkSessionExtensionSuite]
enableSuite[GlutenHiveSQLQueryCHSuite]

override def getSQLQueryTestSettings: SQLQueryTestSettings = ClickHouseSQLQueryTestSettings
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite
import org.apache.spark.sql.execution.joins.{GlutenBroadcastJoinSuite, GlutenExistenceJoinSuite, GlutenInnerJoinSuite, GlutenOuterJoinSuite}
import org.apache.spark.sql.extension.{GlutenCollapseProjectExecTransformerSuite, GlutenSessionExtensionSuite, TestFileSourceScanExecTransformer}
import org.apache.spark.sql.gluten.GlutenFallbackSuite
import org.apache.spark.sql.hive.execution.{GlutenHiveSQLQueryCHSuite, GlutenHiveSQLQuerySuite}
import org.apache.spark.sql.hive.execution.GlutenHiveSQLQuerySuite
import org.apache.spark.sql.sources.{GlutenBucketedReadWithoutHiveSupportSuite, GlutenBucketedWriteWithoutHiveSupportSuite, GlutenCreateTableAsSelectSuite, GlutenDDLSourceLoadSuite, GlutenDisableUnnecessaryBucketedScanWithoutHiveSupportSuite, GlutenDisableUnnecessaryBucketedScanWithoutHiveSupportSuiteAE, GlutenExternalCommandRunnerSuite, GlutenFilteredScanSuite, GlutenFiltersSuite, GlutenInsertSuite, GlutenPartitionedWriteSuite, GlutenPathOptionSuite, GlutenPrunedScanSuite, GlutenResolvedDataSourceSuite, GlutenSaveLoadSuite, GlutenTableScanSuite}

// Some settings' line length exceeds 100
Expand Down Expand Up @@ -1147,7 +1147,6 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("cases when literal is max")
enableSuite[GlutenXPathFunctionsSuite]
enableSuite[GlutenFallbackSuite]
enableSuite[GlutenHiveSQLQueryCHSuite]
enableSuite[GlutenHiveSQLQuerySuite]
enableSuite[GlutenCollapseProjectExecTransformerSuite]
enableSuite[GlutenSparkSessionExtensionSuite]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package org.apache.spark.sql.hive.execution

import org.apache.gluten.GlutenConfig
import org.apache.gluten.utils.SystemParameters

import org.apache.spark.{DebugFilesystem, SparkConf}
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.TableIdentifier
Expand All @@ -25,7 +28,9 @@ class GlutenHiveSQLQueryCHSuite extends GlutenHiveSQLQuerySuiteBase {
override def sparkConf: SparkConf = {
defaultSparkConf
.set("spark.plugins", "org.apache.gluten.GlutenPlugin")
.set("spark.gluten.sql.columnar.backend.lib", "ch")
.set(GlutenConfig.GLUTEN_LIB_PATH, SystemParameters.getClickHouseLibPath)
.set("spark.gluten.sql.enable.native.validation", "false")
.set("spark.gluten.sql.native.writer.enabled", "true")
.set("spark.sql.storeAssignmentPolicy", "legacy")
.set("spark.default.parallelism", "1")
.set("spark.memory.offHeap.enabled", "true")
Expand Down Expand Up @@ -99,4 +104,5 @@ class GlutenHiveSQLQueryCHSuite extends GlutenHiveSQLQuerySuiteBase {
ignoreIfNotExists = true,
purge = false)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.spark.sql.execution.exchange.GlutenEnsureRequirementsSuite
import org.apache.spark.sql.execution.joins.{GlutenBroadcastJoinSuite, GlutenExistenceJoinSuite, GlutenInnerJoinSuite, GlutenOuterJoinSuite}
import org.apache.spark.sql.extension.{GlutenCustomerExtensionSuite, GlutenSessionExtensionSuite}
import org.apache.spark.sql.gluten.GlutenFallbackSuite
import org.apache.spark.sql.hive.execution.GlutenHiveSQLQueryCHSuite
import org.apache.spark.sql.sources._
import org.apache.spark.sql.statistics.SparkFunctionStatistics

Expand Down Expand Up @@ -1891,6 +1892,7 @@ class ClickHouseTestSettings extends BackendTestSettings {
.exclude("SELECT structFieldComplex.Value.`value_(2)` FROM tableWithSchema")
enableSuite[SparkFunctionStatistics]
enableSuite[GlutenSparkSessionExtensionSuite]
enableSuite[GlutenHiveSQLQueryCHSuite]

override def getSQLQueryTestSettings: SQLQueryTestSettings = ClickHouseSQLQueryTestSettings
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.execution

import org.apache.gluten.GlutenConfig
import org.apache.gluten.utils.SystemParameters

import org.apache.spark.{DebugFilesystem, SparkConf}
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.TableIdentifier

class GlutenHiveSQLQueryCHSuite extends GlutenHiveSQLQuerySuiteBase {

override def sparkConf: SparkConf = {
defaultSparkConf
.set("spark.plugins", "org.apache.gluten.GlutenPlugin")
.set(GlutenConfig.GLUTEN_LIB_PATH, SystemParameters.getClickHouseLibPath)
.set("spark.gluten.sql.enable.native.validation", "false")
.set("spark.gluten.sql.native.writer.enabled", "true")
.set("spark.sql.storeAssignmentPolicy", "legacy")
.set("spark.default.parallelism", "1")
.set("spark.memory.offHeap.enabled", "true")
.set("spark.memory.offHeap.size", "1024MB")
.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName)
}

testGluten("5182: Fix failed to parse post join filters") {
withSQLConf(
"spark.sql.hive.convertMetastoreParquet" -> "false",
"spark.gluten.sql.complexType.scan.fallback.enabled" -> "false") {
sql("DROP TABLE IF EXISTS test_5182_0;")
sql("DROP TABLE IF EXISTS test_5182_1;")
sql(
"CREATE TABLE test_5182_0 (from_uid STRING, vgift_typeid int, vm_count int, " +
"status bigint, ts bigint, vm_typeid int) " +
"USING hive OPTIONS(fileFormat 'parquet') PARTITIONED BY (`day` STRING);")
sql(
"CREATE TABLE test_5182_1 (typeid int, groupid int, ss_id bigint, " +
"ss_start_time bigint, ss_end_time bigint) " +
"USING hive OPTIONS(fileFormat 'parquet');")
sql(
"INSERT INTO test_5182_0 partition(day='2024-03-31') " +
"VALUES('uid_1', 2, 10, 1, 11111111111, 2);")
sql("INSERT INTO test_5182_1 VALUES(2, 1, 1, 1000000000, 2111111111);")
val df = spark.sql(
"select ee.from_uid as uid,day, vgift_typeid, money from " +
"(select t_a.day, if(cast(substr(t_a.ts,1,10) as bigint) between " +
"t_b.ss_start_time and t_b.ss_end_time, t_b.ss_id, 0) ss_id, " +
"t_a.vgift_typeid, t_a.from_uid, vm_count money from " +
"(select from_uid,day,vgift_typeid,vm_count,ts from test_5182_0 " +
"where day between '2024-03-30' and '2024-03-31' and status=1 and vm_typeid=2) t_a " +
"left join test_5182_1 t_b on t_a.vgift_typeid=t_b.typeid " +
"where t_b.groupid in (1,2)) ee where ss_id=1;")
checkAnswer(df, Seq(Row("uid_1", "2024-03-31", 2, 10)))
}
spark.sessionState.catalog.dropTable(
TableIdentifier("test_5182_0"),
ignoreIfNotExists = true,
purge = false)
spark.sessionState.catalog.dropTable(
TableIdentifier("test_5182_1"),
ignoreIfNotExists = true,
purge = false)
}

testGluten("5249: Reading csv may throw Unexpected empty column") {
withSQLConf(
"spark.gluten.sql.complexType.scan.fallback.enabled" -> "false"
) {
sql("DROP TABLE IF EXISTS test_5249;")
sql(
"CREATE TABLE test_5249 (name STRING, uid STRING) " +
"ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' " +
"STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' " +
"OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';")
sql("INSERT INTO test_5249 VALUES('name_1', 'id_1');")
val df = spark.sql(
"SELECT name, uid, count(distinct uid) total_uid_num from test_5249 " +
"group by name, uid with cube;")
checkAnswer(
df,
Seq(
Row("name_1", "id_1", 1),
Row("name_1", null, 1),
Row(null, "id_1", 1),
Row(null, null, 1)))
}
spark.sessionState.catalog.dropTable(
TableIdentifier("test_5249"),
ignoreIfNotExists = true,
purge = false)
}

}
Loading

0 comments on commit 43d7262

Please sign in to comment.