From b93b098eda072dbb73903550d4848dfde09180c0 Mon Sep 17 00:00:00 2001 From: Shuai li Date: Fri, 26 Jul 2024 11:54:50 +0800 Subject: [PATCH] [GLUTEN-6589][CH] Mergetree supported spark.sql.caseSensitive (#6592) [CH] Mergetree supported spark.sql.caseSensitive --- .../delta/catalog/ClickHouseTableV2Base.scala | 33 ++++-------- .../utils/MergeTreeDeltaUtil.scala | 11 +++- .../v1/CHMergeTreeWriterInjects.scala | 20 ++------ .../GlutenClickHouseMergeTreeWriteSuite.scala | 51 +++++++++++++++++++ .../gluten/expression/ConverterUtils.scala | 2 +- 5 files changed, 74 insertions(+), 43 deletions(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala index 9c129b9f5d91..633d23f77b1b 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala @@ -16,8 +16,11 @@ */ package org.apache.spark.sql.delta.catalog +import org.apache.gluten.expression.ConverterUtils.normalizeColName + import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable} import org.apache.spark.sql.delta.Snapshot +import org.apache.spark.sql.execution.datasources.utils.MergeTreeDeltaUtil import org.apache.hadoop.fs.Path @@ -153,33 +156,15 @@ trait ClickHouseTableV2Base { configs.toMap } - def primaryKey(): String = primaryKeyOption match { - case Some(keys) => keys.mkString(",") - case None => "" - } + def primaryKey(): String = MergeTreeDeltaUtil.columnsToStr(primaryKeyOption) def orderByKey(): String = orderByKeyOption match { - case Some(keys) => keys.mkString(",") + case Some(keys) => keys.map(normalizeColName).mkString(",") case None => "tuple()" } - def lowCardKey(): String = lowCardKeyOption match { - case Some(keys) => keys.mkString(",") - case None => "" - } - - def minmaxIndexKey(): String = minmaxIndexKeyOption match { - case Some(keys) => keys.mkString(",") - case None => "" - } - - def bfIndexKey(): String = bfIndexKeyOption match { - case Some(keys) => keys.mkString(",") - case None => "" - } - - def setIndexKey(): String = setIndexKeyOption match { - case Some(keys) => keys.mkString(",") - case None => "" - } + def lowCardKey(): String = MergeTreeDeltaUtil.columnsToStr(lowCardKeyOption) + def minmaxIndexKey(): String = MergeTreeDeltaUtil.columnsToStr(minmaxIndexKeyOption) + def bfIndexKey(): String = MergeTreeDeltaUtil.columnsToStr(bfIndexKeyOption) + def setIndexKey(): String = MergeTreeDeltaUtil.columnsToStr(setIndexKeyOption) } diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreeDeltaUtil.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreeDeltaUtil.scala index 954b43b6ab6c..6b2af0953f00 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreeDeltaUtil.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/utils/MergeTreeDeltaUtil.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.sql.execution.datasources.utils +import org.apache.gluten.expression.ConverterUtils.normalizeColName + object MergeTreeDeltaUtil { val DEFAULT_ORDER_BY_KEY = "tuple()" @@ -25,7 +27,7 @@ object MergeTreeDeltaUtil { primaryKeyOption: Option[Seq[String]]): (String, String) = { val orderByKey = if (orderByKeyOption.isDefined && orderByKeyOption.get.nonEmpty) { - orderByKeyOption.get.mkString(",") + columnsToStr(orderByKeyOption) } else DEFAULT_ORDER_BY_KEY val primaryKey = @@ -33,9 +35,14 @@ object MergeTreeDeltaUtil { !orderByKey.equals(DEFAULT_ORDER_BY_KEY) && primaryKeyOption.isDefined && primaryKeyOption.get.nonEmpty ) { - primaryKeyOption.get.mkString(",") + columnsToStr(primaryKeyOption) } else "" (orderByKey, primaryKey) } + + def columnsToStr(option: Option[Seq[String]]): String = option match { + case Some(keys) => keys.map(normalizeColName).mkString(",") + case None => "" + } } diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala index e11406d56619..237d5a46d69f 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/CHMergeTreeWriterInjects.scala @@ -170,22 +170,10 @@ object CHMergeTreeWriterInjects { primaryKeyOption ) - val lowCardKey = lowCardKeyOption match { - case Some(keys) => keys.mkString(",") - case None => "" - } - val minmaxIndexKey = minmaxIndexKeyOption match { - case Some(keys) => keys.mkString(",") - case None => "" - } - val bfIndexKey = bfIndexKeyOption match { - case Some(keys) => keys.mkString(",") - case None => "" - } - val setIndexKey = setIndexKeyOption match { - case Some(keys) => keys.mkString(",") - case None => "" - } + val lowCardKey = MergeTreeDeltaUtil.columnsToStr(lowCardKeyOption) + val minmaxIndexKey = MergeTreeDeltaUtil.columnsToStr(minmaxIndexKeyOption) + val bfIndexKey = MergeTreeDeltaUtil.columnsToStr(bfIndexKeyOption) + val setIndexKey = MergeTreeDeltaUtil.columnsToStr(setIndexKeyOption) val substraitContext = new SubstraitContext val extensionTableNode = ExtensionTableBuilder.makeExtensionTable( diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala index e88eb1fedd42..2563d792b040 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseMergeTreeWriteSuite.scala @@ -1971,5 +1971,56 @@ class GlutenClickHouseMergeTreeWriteSuite } }) } + + test("test mergetree with column case sensitive") { + spark.sql(s""" + |DROP TABLE IF EXISTS LINEITEM_MERGETREE_CASE_SENSITIVE; + |""".stripMargin) + + spark.sql(s""" + |CREATE TABLE IF NOT EXISTS LINEITEM_MERGETREE_CASE_SENSITIVE + |( + | L_ORDERKEY bigint, + | L_PARTKEY bigint, + | L_SUPPKEY bigint, + | L_LINENUMBER bigint, + | L_QUANTITY double, + | L_EXTENDEDPRICE double, + | L_DISCOUNT double, + | L_TAX double, + | L_RETURNFLAG string, + | L_LINESTATUS string, + | L_SHIPDATE date, + | L_COMMITDATE date, + | L_RECEIPTDATE date, + | L_SHIPINSTRUCT string, + | L_SHIPMODE string, + | L_COMMENT string + |) + |USING clickhouse + |PARTITIONED BY (L_SHIPDATE) + |TBLPROPERTIES (orderByKey='L_DISCOUNT') + |LOCATION '$basePath/LINEITEM_MERGETREE_CASE_SENSITIVE' + |""".stripMargin) + + spark.sql(s""" + | insert into table lineitem_mergetree_case_sensitive + | select * from lineitem + |""".stripMargin) + + val sqlStr = + s""" + |SELECT + | sum(l_extendedprice * l_discount) AS revenue + |FROM + | lineitem_mergetree_case_sensitive + |WHERE + | l_shipdate >= date'1994-01-01' + | AND l_shipdate < date'1994-01-01' + interval 1 year + | AND l_discount BETWEEN 0.06 - 0.01 AND 0.06 + 0.01 + | AND l_quantity < 24 + |""".stripMargin + runTPCHQueryBySQL(6, sqlStr) { _ => } + } } // scalastyle:off line.size.limit diff --git a/gluten-core/src/main/scala/org/apache/gluten/expression/ConverterUtils.scala b/gluten-core/src/main/scala/org/apache/gluten/expression/ConverterUtils.scala index 473ee7f9d62f..4b929e525197 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/expression/ConverterUtils.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/expression/ConverterUtils.scala @@ -138,7 +138,7 @@ object ConverterUtils extends Logging { /** Convert StructType to Json */ def convertNamedStructJson(tableSchema: StructType): String = { val typeNodes = ConverterUtils.collectAttributeTypeNodes(tableSchema) - val nameList = tableSchema.fieldNames + val nameList = tableSchema.fieldNames.map(normalizeColName) val structBuilder = Type.Struct.newBuilder for (typeNode <- typeNodes.asScala) {