From 9bf10f1fe470fb5bb9cff1b56744633d887cea93 Mon Sep 17 00:00:00 2001 From: Chang Chen Date: Mon, 14 Oct 2024 15:45:58 +0800 Subject: [PATCH] fix spark32 bucket bug --- .../v1/clickhouse/MergeTreeFileFormatWriter.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatWriter.scala b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatWriter.scala index faef5c7d57d14..012a26812e27b 100644 --- a/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatWriter.scala +++ b/backends-clickhouse/src/main/delta-20/org/apache/spark/sql/execution/datasources/v1/clickhouse/MergeTreeFileFormatWriter.scala @@ -88,13 +88,13 @@ object MergeTreeFileFormatWriter extends Logging { val bucketIdExpression = bucketSpec.map { spec => - val bucketColumns = spec.bucketColumnNames.map(c => dataColumns.find(_.name == c).get) + val bucketColumns = + spec.bucketColumnNames.map(c => dataColumns.find(_.name.equalsIgnoreCase(c)).get) // Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that we can // guarantee the data distribution is same between shuffle and bucketed data source, which // enables us to only shuffle one side when join a bucketed table and a normal one. HashPartitioning(bucketColumns, spec.numBuckets).partitionIdExpression } - val sortColumns = bucketSpec.toSeq.flatMap { spec => spec.sortColumnNames.map(c => dataColumns.find(_.name.equalsIgnoreCase(c)).get) } @@ -156,9 +156,10 @@ object MergeTreeFileFormatWriter extends Logging { if (bucketIdExpression.isDefined) { // We need to add the bucket id expression to the output of the sort plan, // so that we can use backend to calculate the bucket id for each row. - wrapped = ProjectExec( - wrapped.output :+ Alias(bucketIdExpression.get, "__bucket_value__")(), - wrapped) + val bucketValueExpr = + bindReferences(Seq(bucketIdExpression.get), finalOutputSpec.outputColumns) + wrapped = + ProjectExec(wrapped.output :+ Alias(bucketValueExpr.head, "__bucket_value__")(), wrapped) // TODO: to optimize, bucket value is computed twice here }