From a19be93278a7ccf702a8711b560742bc96e0f64b Mon Sep 17 00:00:00 2001 From: XinShuoWang Date: Mon, 20 May 2024 11:34:17 +0800 Subject: [PATCH] fix --- .../execution/VeloxColumnarToRowExec.scala | 2 +- cpp/core/operators/c2r/ColumnarToRow.h | 3 +++ .../serializer/VeloxColumnarToRowConverter.cc | 17 ++++++++++------- .../serializer/VeloxColumnarToRowConverter.h | 2 +- .../scala/org/apache/gluten/GlutenConfig.scala | 12 ++++++------ 5 files changed, 21 insertions(+), 15 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala index 82c7ea49ac1f8..ea80e21a1c50c 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala @@ -177,7 +177,7 @@ object VeloxColumnarToRowExec { baseLength += info.lengths.length val before = System.currentTimeMillis() info = jniWrapper.nativeColumnarToRowConvert(batchHandle, c2rId, rowId) - convertTime += (System.currentTimeMillis() - beforeConvert) + convertTime += (System.currentTimeMillis() - before) } val (offset, length) = (info.offsets(rowId - baseLength), info.lengths(rowId - baseLength)) diff --git a/cpp/core/operators/c2r/ColumnarToRow.h b/cpp/core/operators/c2r/ColumnarToRow.h index 7e20c8f1a6120..67742703b5822 100644 --- a/cpp/core/operators/c2r/ColumnarToRow.h +++ b/cpp/core/operators/c2r/ColumnarToRow.h @@ -28,6 +28,9 @@ class ColumnarToRowConverter { virtual ~ColumnarToRowConverter() = default; + // We will start conversion from the 'rowId' row of 'cb'. The maximum memory consumption during the grabbing and + // swapping process is 'memoryThreshold' bytes. The number of rows successfully converted is stored in the 'numRows_' + // variable. virtual void convert(std::shared_ptr cb = nullptr, int64_t rowId = 0, int64_t memoryThreshold = INT64_MAX) = 0; diff --git a/cpp/velox/operators/serializer/VeloxColumnarToRowConverter.cc b/cpp/velox/operators/serializer/VeloxColumnarToRowConverter.cc index 18b290fe012e7..63339c067e606 100644 --- a/cpp/velox/operators/serializer/VeloxColumnarToRowConverter.cc +++ b/cpp/velox/operators/serializer/VeloxColumnarToRowConverter.cc @@ -28,7 +28,7 @@ using namespace facebook; namespace gluten { -int64_t VeloxColumnarToRowConverter::refreshStates( +void VeloxColumnarToRowConverter::refreshStates( facebook::velox::RowVectorPtr rowVector, int64_t rowId, int64_t memoryThreshold) { @@ -41,22 +41,26 @@ int64_t VeloxColumnarToRowConverter::refreshStates( if (auto fixedRowSize = velox::row::UnsafeRowFast::fixedRowSize(velox::asRowType(rowVector->type()))) { GLUTEN_CHECK( memoryThreshold > fixedRowSize.value(), - "Only 1 row exceed Column2RowMemoryThreshold, which is " + velox::succinctBytes(memoryThreshold)); + "spark.gluten.sql.columnarToRowMemoryThreshold(" + velox::succinctBytes(memoryThreshold) + + ") is too small, it can't hold even one row(" + velox::succinctBytes(fixedRowSize.value()) + ")"); auto rowSize = fixedRowSize.value(); numRows_ = std::min(memoryThreshold / rowSize, vectorLength - rowId); totalMemorySize = rowSize * numRows_; } else { - for (auto i = rowId; i < vectorLength; ++i) { + int64_t i = rowId; + for (; i < vectorLength; ++i) { auto rowSize = fast_->rowSize(i); if (UNLIKELY(totalMemorySize + rowSize > memoryThreshold)) { GLUTEN_CHECK( - i >= 1, "Only 1 row exceed Column2RowMemoryThreshold, which is " + velox::succinctBytes(memoryThreshold)); + i >= 1, + "spark.gluten.sql.columnarToRowMemoryThreshold(" + velox::succinctBytes(memoryThreshold) + + ") is too small, it can't hold even one row(" + velox::succinctBytes(rowSize) + ")"); break; } else { totalMemorySize += rowSize; } - numRows_ = i + 1 - rowId; } + numRows_ = i - rowId; } if (veloxBuffers_ == nullptr) { @@ -65,7 +69,6 @@ int64_t VeloxColumnarToRowConverter::refreshStates( bufferAddress_ = veloxBuffers_->asMutable(); memset(bufferAddress_, 0, sizeof(int8_t) * totalMemorySize); - return numRows_; } void VeloxColumnarToRowConverter::convert(std::shared_ptr cb, int64_t rowId, int64_t memoryThreshold) { @@ -89,4 +92,4 @@ void VeloxColumnarToRowConverter::convert(std::shared_ptr cb, int } } -} // namespace gluten \ No newline at end of file +} // namespace gluten diff --git a/cpp/velox/operators/serializer/VeloxColumnarToRowConverter.h b/cpp/velox/operators/serializer/VeloxColumnarToRowConverter.h index c99ca091742ab..525313ee7cc12 100644 --- a/cpp/velox/operators/serializer/VeloxColumnarToRowConverter.h +++ b/cpp/velox/operators/serializer/VeloxColumnarToRowConverter.h @@ -35,7 +35,7 @@ class VeloxColumnarToRowConverter final : public ColumnarToRowConverter { void convert(std::shared_ptr cb, int64_t rowId = 0, int64_t memoryThreshold = INT64_MAX) override; private: - int64_t refreshStates(facebook::velox::RowVectorPtr rowVector, int64_t rowId, int64_t memoryThreshold); + void refreshStates(facebook::velox::RowVectorPtr rowVector, int64_t rowId, int64_t memoryThreshold); std::shared_ptr veloxPool_; std::shared_ptr fast_; diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index 205823b6a5d6f..5e57e426ebc4d 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -187,8 +187,8 @@ class GlutenConfig(conf: SQLConf) extends Logging { def maxBatchSize: Int = conf.getConf(COLUMNAR_MAX_BATCH_SIZE) - def boltColumn2RowMemThreshold: Long = - conf.getConf(GLUTEN_COLUMN_TO_ROW_MEM_THRESHOLD) + def columnarToRowMemThreshold: Long = + conf.getConf(GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD) def shuffleWriterBufferSize: Int = conf .getConf(SHUFFLE_WRITER_BUFFER_SIZE) @@ -517,8 +517,8 @@ object GlutenConfig { // Batch size. val GLUTEN_MAX_BATCH_SIZE_KEY = "spark.gluten.sql.columnar.maxBatchSize" - val GLUTEN_COLUMN_TO_ROW_MEM_THRESHOLD_KEY = - "spark.gluten.sql.columnToRowMemoryThreshold" + val GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD_KEY = + "spark.gluten.sql.columnarToRowMemoryThreshold" // Shuffle Writer buffer size. val GLUTEN_SHUFFLE_WRITER_BUFFER_SIZE = "spark.gluten.shuffleWriter.bufferSize" @@ -1038,8 +1038,8 @@ object GlutenConfig { .checkValue(_ > 0, s"$GLUTEN_MAX_BATCH_SIZE_KEY must be positive.") .createWithDefault(4096) - val GLUTEN_COLUMN_TO_ROW_MEM_THRESHOLD = - buildConf(GLUTEN_COLUMN_TO_ROW_MEM_THRESHOLD_KEY) + val GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD = + buildConf(GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD_KEY) .internal() .longConf .createWithDefault(256 * 1024 * 1024)