From 88ec5fdd426ad4ba45f7792ea185919814bbcfbc Mon Sep 17 00:00:00 2001 From: lgbo Date: Wed, 21 Aug 2024 15:55:51 +0800 Subject: [PATCH] [GLUTEN-6923][CH] `total_bytes_written` is not updated in celeborn partition writers (#6939) What changes were proposed in this pull request? (Please fill in changes proposed in this fix) Fixes: #6923 AQE has dependency on shuffle written bytes to generate a proper plan. How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) manual tests (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) --- .../backendsapi/clickhouse/CHBackend.scala | 1 - .../clickhouse/CHSparkPlanExecApi.scala | 16 ++++++++++++---- cpp-ch/local-engine/Shuffle/PartitionWriter.cpp | 2 ++ cpp-ch/local-engine/local_engine_jni.cpp | 5 +++++ 4 files changed, 19 insertions(+), 5 deletions(-) diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala index 41ffbdb58354..3c151df7e144 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala @@ -407,5 +407,4 @@ object CHBackendSettings extends BackendSettingsApi with Logging { } } } - } diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala index 02b4777e7120..6761269651c1 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala @@ -18,6 +18,7 @@ package org.apache.gluten.backendsapi.clickhouse import org.apache.gluten.GlutenConfig import org.apache.gluten.backendsapi.{BackendsApiManager, SparkPlanExecApi} +import org.apache.gluten.exception.GlutenException import org.apache.gluten.exception.GlutenNotSupportException import org.apache.gluten.execution._ import org.apache.gluten.expression._ @@ -29,7 +30,8 @@ import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode import org.apache.gluten.utils.{CHJoinValidateUtil, UnknownJoinStrategy} import org.apache.gluten.vectorized.CHColumnarBatchSerializer -import org.apache.spark.{ShuffleDependency, SparkException} +import org.apache.spark.ShuffleDependency +import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.{GenShuffleWriterParameters, GlutenShuffleWriterWrapper, HashPartitioningWrapper} @@ -62,7 +64,7 @@ import java.util.{ArrayList => JArrayList, List => JList, Map => JMap} import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer -class CHSparkPlanExecApi extends SparkPlanExecApi { +class CHSparkPlanExecApi extends SparkPlanExecApi with Logging { /** The columnar-batch type this backend is using. */ override def batchType: Convention.BatchType = CHBatch @@ -532,10 +534,16 @@ class CHSparkPlanExecApi extends SparkPlanExecApi { CHExecUtil.buildSideRDD(dataSize, newChild).collect val batches = countsAndBytes.map(_._2) + val totalBatchesSize = batches.map(_.length).sum val rawSize = dataSize.value if (rawSize >= BroadcastExchangeExec.MAX_BROADCAST_TABLE_BYTES) { - throw new SparkException( - s"Cannot broadcast the table that is larger than 8GB: ${rawSize >> 30} GB") + throw new GlutenException( + s"Cannot broadcast the table that is larger than 8GB: $rawSize bytes") + } + if ((rawSize == 0 && totalBatchesSize != 0) || totalBatchesSize < 0) { + throw new GlutenException( + s"Invalid rawSize($rawSize) or totalBatchesSize ($totalBatchesSize). Ensure the shuffle" + + s" written bytes is correct.") } val rowCount = countsAndBytes.map(_._1).sum numOutputRows += rowCount diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp index 2f22d0e24139..79d640d3b2bc 100644 --- a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp +++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp @@ -469,6 +469,7 @@ size_t MemorySortCelebornPartitionWriter::evictPartitions() celeborn_client->pushPartitionData(cur_partition_id, data.data(), data.size()); shuffle_writer->split_result.total_io_time += push_time_watch.elapsedNanoseconds(); shuffle_writer->split_result.partition_lengths[cur_partition_id] += data.size(); + shuffle_writer->split_result.total_bytes_written += data.size(); } output.restart(); }; @@ -586,6 +587,7 @@ size_t CelebornPartitionWriter::evictSinglePartition(size_t partition_id) shuffle_writer->split_result.total_write_time += push_time_watch.elapsedNanoseconds(); shuffle_writer->split_result.total_io_time += push_time_watch.elapsedNanoseconds(); shuffle_writer->split_result.total_serialize_time += serialization_time_watch.elapsedNanoseconds(); + shuffle_writer->split_result.total_bytes_written += written_bytes; }; Stopwatch spill_time_watch; diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index f27da2f92048..c80379a879f8 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -678,6 +678,11 @@ JNIEXPORT jobject Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_ const auto * raw_src = reinterpret_cast(raw_partition_lengths.data()); env->SetLongArrayRegion(raw_partition_length_arr, 0, raw_partition_lengths.size(), raw_src); + // AQE has dependency on total_bytes_written, if the data is wrong, it will generate inappropriate plan + // add a log here for remining this. + if (!result.total_bytes_written) + LOG_WARNING(getLogger("CHShuffleSplitterJniWrapper"), "total_bytes_written is 0, something may be wrong"); + jobject split_result = env->NewObject( split_result_class, split_result_constructor,