diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala index 1ab2e12d43e3..b1d9760d7303 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/delta/catalog/ClickHouseTableV2Base.scala @@ -91,7 +91,7 @@ trait ClickHouseTableV2Base { s"$keyName $s can not contain '.' (not support nested column yet)") } }) - Some(keys.map(s => s.toLowerCase())) + Some(keys) } else { None } @@ -102,27 +102,22 @@ trait ClickHouseTableV2Base { lazy val orderByKeyOption: Option[Seq[String]] = { if (bucketOption.isDefined && bucketOption.get.sortColumnNames.nonEmpty) { - val orderByKes = bucketOption.get.sortColumnNames - val invalidKeys = orderByKes.intersect(partitionColumns) + val orderByKeys = bucketOption.get.sortColumnNames.map(normalizeColName).toSeq + val invalidKeys = orderByKeys.intersect(partitionColumns) if (invalidKeys.nonEmpty) { throw new IllegalStateException( s"partition cols $invalidKeys can not be in the order by keys.") } - Some(orderByKes) + Some(orderByKeys) } else { - val tableProperties = deltaProperties - if (tableProperties.containsKey("orderByKey")) { - if (tableProperties.get("orderByKey").nonEmpty) { - val orderByKes = tableProperties.get("orderByKey").split(",").map(_.trim).toSeq - val invalidKeys = orderByKes.intersect(partitionColumns) - if (invalidKeys.nonEmpty) { - throw new IllegalStateException( - s"partition cols $invalidKeys can not be in the order by keys.") - } - Some(orderByKes) - } else { - None + val orderByKeys = getCommaSeparatedColumns("orderByKey") + if (orderByKeys.isDefined) { + val invalidKeys = orderByKeys.get.intersect(partitionColumns) + if (invalidKeys.nonEmpty) { + throw new IllegalStateException( + s"partition cols $invalidKeys can not be in the order by keys.") } + orderByKeys } else { None } @@ -131,27 +126,22 @@ trait ClickHouseTableV2Base { lazy val primaryKeyOption: Option[Seq[String]] = { if (orderByKeyOption.isDefined) { - val tableProperties = deltaProperties - if (tableProperties.containsKey("primaryKey")) { - if (tableProperties.get("primaryKey").nonEmpty) { - val primaryKeys = tableProperties.get("primaryKey").split(",").map(_.trim).toSeq - if (!orderByKeyOption.get.mkString(",").startsWith(primaryKeys.mkString(","))) { - throw new IllegalStateException( - s"Primary key $primaryKeys must be a prefix of the sorting key") - } - Some(primaryKeys) - } else { - None - } - } else { - None + val primaryKeys = getCommaSeparatedColumns("primaryKey") + if ( + primaryKeys.isDefined && !orderByKeyOption.get + .mkString(",") + .startsWith(primaryKeys.get.mkString(",")) + ) { + throw new IllegalStateException( + s"Primary key $primaryKeys must be a prefix of the sorting key") } + primaryKeys } else { None } } - lazy val partitionColumns = deltaSnapshot.metadata.partitionColumns + lazy val partitionColumns = deltaSnapshot.metadata.partitionColumns.map(normalizeColName).toSeq lazy val clickhouseTableConfigs: Map[String, String] = { val tableProperties = deltaProperties() diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatWriter.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatWriter.scala index 506bdd03b4f1..26bc4edbd80c 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatWriter.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatWriter.scala @@ -94,7 +94,8 @@ object MergeTreeFileFormatWriter extends Logging { val writerBucketSpec = bucketSpec.map { spec => - val bucketColumns = spec.bucketColumnNames.map(c => dataColumns.find(_.name == c).get) + val bucketColumns = + spec.bucketColumnNames.map(c => dataColumns.find(_.name.equalsIgnoreCase(c)).get) // Spark bucketed table: use `HashPartitioning.partitionIdExpression` as bucket id // expression, so that we can guarantee the data distribution is same between shuffle and // bucketed data source, which enables us to only shuffle one side when join a bucketed @@ -104,7 +105,7 @@ object MergeTreeFileFormatWriter extends Logging { MergeTreeWriterBucketSpec(bucketIdExpression, (_: Int) => "") } val sortColumns = bucketSpec.toSeq.flatMap { - spec => spec.sortColumnNames.map(c => dataColumns.find(_.name == c).get) + spec => spec.sortColumnNames.map(c => dataColumns.find(_.name.equalsIgnoreCase(c)).get) } val caseInsensitiveOptions = CaseInsensitiveMap(options) diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala index cbc3aed3607d..4018eee6b01e 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala @@ -72,11 +72,14 @@ class GlutenClickHouseHiveTableSuite } private val txt_table_name = "hive_txt_test" + private val txt_upper_table_name = "hive_txt_upper_test" private val txt_user_define_input = "hive_txt_user_define_input" private val json_table_name = "hive_json_test" private val parquet_table_name = "hive_parquet_test" private val txt_table_create_sql = genTableCreateSql(txt_table_name, "textfile") + private val txt_upper_create_sql = genTableCreateUpperSql(txt_upper_table_name, "textfile") + private val parquet_table_create_sql = genTableCreateSql(parquet_table_name, "parquet") private val json_table_create_sql = "create table if not exists %s (".format(json_table_name) + "string_field string," + @@ -136,6 +139,24 @@ class GlutenClickHouseHiveTableSuite "map_field map," + "map_field_with_null map) stored as %s".format(fileFormat) + def genTableCreateUpperSql(tableName: String, fileFormat: String): String = + "create table if not exists %s (".format(tableName) + + "STRING_FIELD string," + + "INT_FIELD int," + + "LONG_FIELD long," + + "FLOAT_FIELD float," + + "DOUBLE_FIELD double," + + "SHORT_FIELD short," + + "BYTE_FIELD byte," + + "BOOL_FIELD boolean," + + "DECIMAL_FIELD decimal(23, 12)," + + "DATE_FIELD date," + + "TIMESTAMP_FIELD timestamp," + + "ARRAY_FIELD array," + + "ARRAY_FIELD_WITH_NULL array," + + "MAP_FIELD map," + + "MAP_FIELD_WITH_NULL map) stored as %s".format(fileFormat) + protected def initializeTable( table_name: String, table_create_sql: String, @@ -161,6 +182,7 @@ class GlutenClickHouseHiveTableSuite override def beforeAll(): Unit = { super.beforeAll() initializeTable(txt_table_name, txt_table_create_sql, null) + initializeTable(txt_upper_table_name, txt_upper_create_sql, null) initializeTable(txt_user_define_input, txt_table_user_define_create_sql, null) initializeTable( json_table_name, @@ -1214,8 +1236,9 @@ class GlutenClickHouseHiveTableSuite |select | string_field, | int_field, - | long_field - | from $txt_user_define_input + | long_field, + | date_field + | from $txt_table_name |""".stripMargin) sourceDF.write @@ -1224,6 +1247,50 @@ class GlutenClickHouseHiveTableSuite .option("clickhouse.bucketColumnNames", "STRING_FIELD") .mode(SaveMode.Overwrite) .save(dataPath) + + assert(new File(dataPath).listFiles().nonEmpty) + + val dataPath2 = s"$basePath/lineitem_mergetree_bucket2" + val df2 = spark.sql(s""" + |select + | string_field STRING_FIELD, + | int_field INT_FIELD, + | long_field LONG_FIELD, + | date_field DATE_FIELD + | from $txt_table_name + |""".stripMargin) + + df2.write + .format("clickhouse") + .partitionBy("DATE_FIELD") + .option("clickhouse.numBuckets", "1") + .option("clickhouse.bucketColumnNames", "STRING_FIELD") + .option("clickhouse.orderByKey", "INT_FIELD,LONG_FIELD") + .option("clickhouse.primaryKey", "INT_FIELD") + .mode(SaveMode.Overwrite) + .save(dataPath2) + assert(new File(dataPath2).listFiles().nonEmpty) + + val dataPath3 = s"$basePath/lineitem_mergetree_bucket3" + val df3 = spark.sql(s""" + |select + | string_field, + | int_field, + | long_field, + | date_field + | from $txt_upper_table_name + |""".stripMargin) + + df3.write + .format("clickhouse") + .partitionBy("date_field") + .option("clickhouse.numBuckets", "1") + .option("clickhouse.bucketColumnNames", "string_field") + .option("clickhouse.orderByKey", "int_field,LONG_FIELD") + .option("clickhouse.primaryKey", "INT_FIELD") + .mode(SaveMode.Overwrite) + .save(dataPath3) + assert(new File(dataPath3).listFiles().nonEmpty) } test("GLUTEN-6506: Orc read time zone") {