Skip to content

Commit

Permalink
[GLUTEN-6923][CH] total_bytes_written is not updated in celeborn pa…
Browse files Browse the repository at this point in the history
…rtition writers (apache#6939)

What changes were proposed in this pull request?
(Please fill in changes proposed in this fix)

Fixes: apache#6923

AQE has dependency on shuffle written bytes to generate a proper plan.

How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)

manual tests

(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
  • Loading branch information
lgbo-ustc authored and shamirchen committed Oct 14, 2024
1 parent dd8be04 commit 88ec5fd
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -407,5 +407,4 @@ object CHBackendSettings extends BackendSettingsApi with Logging {
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.gluten.backendsapi.clickhouse

import org.apache.gluten.GlutenConfig
import org.apache.gluten.backendsapi.{BackendsApiManager, SparkPlanExecApi}
import org.apache.gluten.exception.GlutenException
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.execution._
import org.apache.gluten.expression._
Expand All @@ -29,7 +30,8 @@ import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode
import org.apache.gluten.utils.{CHJoinValidateUtil, UnknownJoinStrategy}
import org.apache.gluten.vectorized.CHColumnarBatchSerializer

import org.apache.spark.{ShuffleDependency, SparkException}
import org.apache.spark.ShuffleDependency
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{GenShuffleWriterParameters, GlutenShuffleWriterWrapper, HashPartitioningWrapper}
Expand Down Expand Up @@ -62,7 +64,7 @@ import java.util.{ArrayList => JArrayList, List => JList, Map => JMap}
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

class CHSparkPlanExecApi extends SparkPlanExecApi {
class CHSparkPlanExecApi extends SparkPlanExecApi with Logging {

/** The columnar-batch type this backend is using. */
override def batchType: Convention.BatchType = CHBatch
Expand Down Expand Up @@ -532,10 +534,16 @@ class CHSparkPlanExecApi extends SparkPlanExecApi {
CHExecUtil.buildSideRDD(dataSize, newChild).collect

val batches = countsAndBytes.map(_._2)
val totalBatchesSize = batches.map(_.length).sum
val rawSize = dataSize.value
if (rawSize >= BroadcastExchangeExec.MAX_BROADCAST_TABLE_BYTES) {
throw new SparkException(
s"Cannot broadcast the table that is larger than 8GB: ${rawSize >> 30} GB")
throw new GlutenException(
s"Cannot broadcast the table that is larger than 8GB: $rawSize bytes")
}
if ((rawSize == 0 && totalBatchesSize != 0) || totalBatchesSize < 0) {
throw new GlutenException(
s"Invalid rawSize($rawSize) or totalBatchesSize ($totalBatchesSize). Ensure the shuffle" +
s" written bytes is correct.")
}
val rowCount = countsAndBytes.map(_._1).sum
numOutputRows += rowCount
Expand Down
2 changes: 2 additions & 0 deletions cpp-ch/local-engine/Shuffle/PartitionWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ size_t MemorySortCelebornPartitionWriter::evictPartitions()
celeborn_client->pushPartitionData(cur_partition_id, data.data(), data.size());
shuffle_writer->split_result.total_io_time += push_time_watch.elapsedNanoseconds();
shuffle_writer->split_result.partition_lengths[cur_partition_id] += data.size();
shuffle_writer->split_result.total_bytes_written += data.size();
}
output.restart();
};
Expand Down Expand Up @@ -586,6 +587,7 @@ size_t CelebornPartitionWriter::evictSinglePartition(size_t partition_id)
shuffle_writer->split_result.total_write_time += push_time_watch.elapsedNanoseconds();
shuffle_writer->split_result.total_io_time += push_time_watch.elapsedNanoseconds();
shuffle_writer->split_result.total_serialize_time += serialization_time_watch.elapsedNanoseconds();
shuffle_writer->split_result.total_bytes_written += written_bytes;
};

Stopwatch spill_time_watch;
Expand Down
5 changes: 5 additions & 0 deletions cpp-ch/local-engine/local_engine_jni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,11 @@ JNIEXPORT jobject Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_
const auto * raw_src = reinterpret_cast<const jlong *>(raw_partition_lengths.data());
env->SetLongArrayRegion(raw_partition_length_arr, 0, raw_partition_lengths.size(), raw_src);

// AQE has dependency on total_bytes_written, if the data is wrong, it will generate inappropriate plan
// add a log here for remining this.
if (!result.total_bytes_written)
LOG_WARNING(getLogger("CHShuffleSplitterJniWrapper"), "total_bytes_written is 0, something may be wrong");

jobject split_result = env->NewObject(
split_result_class,
split_result_constructor,
Expand Down

0 comments on commit 88ec5fd

Please sign in to comment.