Skip to content

Commit

Permalink
fix spark32 bucket bug
Browse files Browse the repository at this point in the history
  • Loading branch information
baibaichen committed Oct 14, 2024
1 parent 0319925 commit 9bf10f1
Showing 1 changed file with 6 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,13 @@ object MergeTreeFileFormatWriter extends Logging {

val bucketIdExpression = bucketSpec.map {
spec =>
val bucketColumns = spec.bucketColumnNames.map(c => dataColumns.find(_.name == c).get)
val bucketColumns =
spec.bucketColumnNames.map(c => dataColumns.find(_.name.equalsIgnoreCase(c)).get)
// Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that we can
// guarantee the data distribution is same between shuffle and bucketed data source, which
// enables us to only shuffle one side when join a bucketed table and a normal one.
HashPartitioning(bucketColumns, spec.numBuckets).partitionIdExpression
}

val sortColumns = bucketSpec.toSeq.flatMap {
spec => spec.sortColumnNames.map(c => dataColumns.find(_.name.equalsIgnoreCase(c)).get)
}
Expand Down Expand Up @@ -156,9 +156,10 @@ object MergeTreeFileFormatWriter extends Logging {
if (bucketIdExpression.isDefined) {
// We need to add the bucket id expression to the output of the sort plan,
// so that we can use backend to calculate the bucket id for each row.
wrapped = ProjectExec(
wrapped.output :+ Alias(bucketIdExpression.get, "__bucket_value__")(),
wrapped)
val bucketValueExpr =
bindReferences(Seq(bucketIdExpression.get), finalOutputSpec.outputColumns)
wrapped =
ProjectExec(wrapped.output :+ Alias(bucketValueExpr.head, "__bucket_value__")(), wrapped)
// TODO: to optimize, bucket value is computed twice here
}

Expand Down

0 comments on commit 9bf10f1

Please sign in to comment.