diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatWriter.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatWriter.scala index 26bc4edbd80cd..f1489b86b3a13 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatWriter.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatWriter.scala @@ -165,13 +165,14 @@ object MergeTreeFileFormatWriter extends Logging { if (writerBucketSpec.isDefined) { // We need to add the bucket id expression to the output of the sort plan, // so that we can use backend to calculate the bucket id for each row. - wrapped = ProjectExec( - wrapped.output :+ Alias(writerBucketSpec.get.bucketIdExpression, "__bucket_value__")(), - wrapped) + val bucketValueExpr = bindReferences( + Seq(writerBucketSpec.get.bucketIdExpression), + finalOutputSpec.outputColumns) + wrapped = + ProjectExec(wrapped.output :+ Alias(bucketValueExpr.head, "__bucket_value__")(), wrapped) // TODO: to optimize, bucket value is computed twice here } - val nativeFormat = sparkSession.sparkContext.getLocalProperty("nativeFormat") (GlutenMergeTreeWriterInjects.getInstance().executeWriterWrappedSparkPlan(wrapped), None) } diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala index 4018eee6b01eb..b16ae3c119cc8 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala @@ -1291,6 +1291,30 @@ class GlutenClickHouseHiveTableSuite .mode(SaveMode.Overwrite) .save(dataPath3) assert(new File(dataPath3).listFiles().nonEmpty) + + val dataPath4 = s"$basePath/lineitem_mergetree_bucket2" + val df4 = spark + .sql(s""" + |select + | INT_FIELD , + | STRING_FIELD, + | LONG_FIELD , + | DATE_FIELD + | from $txt_table_name + | order by INT_FIELD + |""".stripMargin) + .toDF("INT_FIELD", "STRING_FIELD", "LONG_FIELD", "DATE_FIELD") + + df4.write + .format("clickhouse") + .partitionBy("DATE_FIELD") + .option("clickhouse.numBuckets", "3") + .option("clickhouse.bucketColumnNames", "STRING_FIELD") + .option("clickhouse.orderByKey", "INT_FIELD,LONG_FIELD") + .option("clickhouse.primaryKey", "INT_FIELD") + .mode(SaveMode.Append) + .save(dataPath4) + assert(new File(dataPath4).listFiles().nonEmpty) } test("GLUTEN-6506: Orc read time zone") {