diff --git a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala index 2c46893e4576..993a888b91df 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/execution/VeloxColumnarToRowExec.scala @@ -147,13 +147,14 @@ object VeloxColumnarToRowExec { val rows = batch.numRows() val beforeConvert = System.currentTimeMillis() val batchHandle = ColumnarBatches.getNativeHandle(batch) - val info = - jniWrapper.nativeColumnarToRowConvert(c2rId, batchHandle) + var info = + jniWrapper.nativeColumnarToRowConvert(c2rId, batchHandle, 0) convertTime += (System.currentTimeMillis() - beforeConvert) new Iterator[InternalRow] { var rowId = 0 + var baseLength = 0 val row = new UnsafeRow(cols) override def hasNext: Boolean = { @@ -161,7 +162,14 @@ object VeloxColumnarToRowExec { } override def next: UnsafeRow = { - val (offset, length) = (info.offsets(rowId), info.lengths(rowId)) + if (rowId == baseLength + info.lengths.length) { + baseLength += info.lengths.length + val before = System.currentTimeMillis() + info = jniWrapper.nativeColumnarToRowConvert(c2rId, batchHandle, rowId) + convertTime += (System.currentTimeMillis() - before) + } + val (offset, length) = + (info.offsets(rowId - baseLength), info.lengths(rowId - baseLength)) row.pointTo(null, info.memoryAddress + offset, length) rowId += 1 row diff --git a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala index 798cea8cb94f..22f96bbbc4c2 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/execution/VeloxTPCHSuite.scala @@ -16,6 +16,8 @@ */ package org.apache.gluten.execution +import org.apache.gluten.GlutenConfig + import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, Row, TestUtils} import org.apache.spark.sql.execution.FormattedMode @@ -253,6 +255,7 @@ class VeloxTPCHDistinctSpillSuite extends VeloxTPCHTableSupport { super.sparkConf .set("spark.memory.offHeap.size", "50m") .set("spark.gluten.memory.overAcquiredMemoryRatio", "0.9") // to trigger distinct spill early + .set(GlutenConfig.GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD_KEY, "8k") } test("distinct spill") { diff --git a/cpp/core/compute/Runtime.h b/cpp/core/compute/Runtime.h index fb501dc9acca..8bdf95cd730c 100644 --- a/cpp/core/compute/Runtime.h +++ b/cpp/core/compute/Runtime.h @@ -97,7 +97,7 @@ class Runtime : public std::enable_shared_from_this { /// This function is used to create certain converter from the format used by /// the backend to Spark unsafe row. - virtual std::shared_ptr createColumnar2RowConverter() = 0; + virtual std::shared_ptr createColumnar2RowConverter(int64_t column2RowMemThreshold) = 0; virtual std::shared_ptr createRow2ColumnarConverter(struct ArrowSchema* cSchema) = 0; diff --git a/cpp/core/config/GlutenConfig.h b/cpp/core/config/GlutenConfig.h index 060bbe111265..e4f5a884b920 100644 --- a/cpp/core/config/GlutenConfig.h +++ b/cpp/core/config/GlutenConfig.h @@ -56,6 +56,9 @@ const std::string kGzipWindowSize4k = "4096"; const std::string kParquetCompressionCodec = "spark.sql.parquet.compression.codec"; +const std::string kColumnarToRowMemoryThreshold = "spark.gluten.sql.columnarToRowMemoryThreshold"; +const std::string kColumnarToRowMemoryDefaultThreshold = "67108864"; // 64MB + const std::string kUGIUserName = "spark.gluten.ugi.username"; const std::string kUGITokens = "spark.gluten.ugi.tokens"; diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index add4aa54d207..60f367fd72d1 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -16,6 +16,8 @@ */ #include +#include +#include #include #include "compute/Runtime.h" @@ -27,6 +29,7 @@ #include #include +#include #include "memory/AllocationListener.h" #include "operators/serializer/ColumnarBatchSerializer.h" #include "shuffle/LocalPartitionWriter.h" @@ -528,8 +531,24 @@ Java_org_apache_gluten_vectorized_NativeColumnarToRowJniWrapper_nativeColumnarTo JNI_METHOD_START auto ctx = gluten::getRuntime(env, wrapper); + auto& conf = ctx->getConfMap(); + int64_t column2RowMemThreshold; + auto it = conf.find(kColumnarToRowMemoryThreshold); + bool confIsLegal = + ((it == conf.end()) ? false : std::all_of(it->second.begin(), it->second.end(), [](unsigned char c) { + return std::isdigit(c); + })); + if (confIsLegal) { + column2RowMemThreshold = std::stoll(it->second); + } else { + LOG(INFO) + << "Because the spark.gluten.sql.columnarToRowMemoryThreshold configuration item is invalid, the kColumnarToRowMemoryDefaultThreshold default value is used, which is " + << kColumnarToRowMemoryDefaultThreshold << " byte"; + column2RowMemThreshold = std::stoll(kColumnarToRowMemoryDefaultThreshold); + } + // Convert the native batch to Spark unsafe row. - return ctx->saveObject(ctx->createColumnar2RowConverter()); + return ctx->saveObject(ctx->createColumnar2RowConverter(column2RowMemThreshold)); JNI_METHOD_END(kInvalidObjectHandle) } @@ -538,16 +557,18 @@ Java_org_apache_gluten_vectorized_NativeColumnarToRowJniWrapper_nativeColumnarTo JNIEnv* env, jobject wrapper, jlong c2rHandle, - jlong batchHandle) { + jlong batchHandle, + jlong startRow) { JNI_METHOD_START auto columnarToRowConverter = ObjectStore::retrieve(c2rHandle); auto cb = ObjectStore::retrieve(batchHandle); - columnarToRowConverter->convert(cb); + + columnarToRowConverter->convert(cb, startRow); const auto& offsets = columnarToRowConverter->getOffsets(); const auto& lengths = columnarToRowConverter->getLengths(); - auto numRows = cb->numRows(); + auto numRows = columnarToRowConverter->numRows(); auto offsetsArr = env->NewIntArray(numRows); auto offsetsSrc = reinterpret_cast(offsets.data()); diff --git a/cpp/core/operators/c2r/ColumnarToRow.h b/cpp/core/operators/c2r/ColumnarToRow.h index edee312491a3..062a863f734c 100644 --- a/cpp/core/operators/c2r/ColumnarToRow.h +++ b/cpp/core/operators/c2r/ColumnarToRow.h @@ -17,6 +17,7 @@ #pragma once +#include #include "memory/ColumnarBatch.h" namespace gluten { @@ -27,7 +28,14 @@ class ColumnarToRowConverter { virtual ~ColumnarToRowConverter() = default; - virtual void convert(std::shared_ptr cb = nullptr) = 0; + // We will start conversion from the 'rowId' row of 'cb'. The maximum memory consumption during the grabbing and + // swapping process is 'memoryThreshold' bytes. The number of rows successfully converted is stored in the 'numRows_' + // variable. + virtual void convert(std::shared_ptr cb = nullptr, int64_t startRow = 0) = 0; + + virtual int32_t numRows() { + return numRows_; + } uint8_t* getBufferAddress() const { return bufferAddress_; diff --git a/cpp/velox/benchmarks/ColumnarToRowBenchmark.cc b/cpp/velox/benchmarks/ColumnarToRowBenchmark.cc index 5037264e1b0c..8a55050015d0 100644 --- a/cpp/velox/benchmarks/ColumnarToRowBenchmark.cc +++ b/cpp/velox/benchmarks/ColumnarToRowBenchmark.cc @@ -157,7 +157,7 @@ class GoogleBenchmarkColumnarToRowCacheScanBenchmark : public GoogleBenchmarkCol for (auto _ : state) { for (const auto& vector : vectors) { auto row = std::dynamic_pointer_cast(vector); - auto columnarToRowConverter = std::make_shared(ctxPool); + auto columnarToRowConverter = std::make_shared(ctxPool, 64 << 20); auto cb = std::make_shared(row); TIME_NANO_START(writeTime); columnarToRowConverter->convert(cb); @@ -212,7 +212,7 @@ class GoogleBenchmarkColumnarToRowIterateScanBenchmark : public GoogleBenchmarkC numBatches += 1; numRows += recordBatch->num_rows(); auto vector = recordBatch2RowVector(*recordBatch); - auto columnarToRowConverter = std::make_shared(ctxPool); + auto columnarToRowConverter = std::make_shared(ctxPool, 64 << 20); auto row = std::dynamic_pointer_cast(vector); auto cb = std::make_shared(row); TIME_NANO_START(writeTime); diff --git a/cpp/velox/compute/VeloxRuntime.cc b/cpp/velox/compute/VeloxRuntime.cc index c1e8fc860214..cdce781bd528 100644 --- a/cpp/velox/compute/VeloxRuntime.cc +++ b/cpp/velox/compute/VeloxRuntime.cc @@ -157,9 +157,9 @@ std::shared_ptr VeloxRuntime::createResultIterator( return std::make_shared(std::move(wholestageIter), this); } -std::shared_ptr VeloxRuntime::createColumnar2RowConverter() { +std::shared_ptr VeloxRuntime::createColumnar2RowConverter(int64_t column2RowMemThreshold) { auto veloxPool = vmm_->getLeafMemoryPool(); - return std::make_shared(veloxPool); + return std::make_shared(veloxPool, column2RowMemThreshold); } std::shared_ptr VeloxRuntime::createOrGetEmptySchemaBatch(int32_t numRows) { diff --git a/cpp/velox/compute/VeloxRuntime.h b/cpp/velox/compute/VeloxRuntime.h index 096ecb6fbf13..952a103ed8ad 100644 --- a/cpp/velox/compute/VeloxRuntime.h +++ b/cpp/velox/compute/VeloxRuntime.h @@ -49,7 +49,7 @@ class VeloxRuntime final : public Runtime { const std::vector>& inputs = {}, const std::unordered_map& sessionConf = {}) override; - std::shared_ptr createColumnar2RowConverter() override; + std::shared_ptr createColumnar2RowConverter(int64_t column2RowMemThreshold) override; std::shared_ptr createOrGetEmptySchemaBatch(int32_t numRows) override; diff --git a/cpp/velox/operators/serializer/VeloxColumnarToRowConverter.cc b/cpp/velox/operators/serializer/VeloxColumnarToRowConverter.cc index 52a046ac706b..a609e26d0390 100644 --- a/cpp/velox/operators/serializer/VeloxColumnarToRowConverter.cc +++ b/cpp/velox/operators/serializer/VeloxColumnarToRowConverter.cc @@ -16,8 +16,11 @@ */ #include "VeloxColumnarToRowConverter.h" +#include +#include #include "memory/VeloxColumnarBatch.h" +#include "utils/exception.h" #include "velox/row/UnsafeRowDeserializers.h" #include "velox/row/UnsafeRowFast.h" @@ -25,27 +28,39 @@ using namespace facebook; namespace gluten { -void VeloxColumnarToRowConverter::refreshStates(facebook::velox::RowVectorPtr rowVector) { - numRows_ = rowVector->size(); +void VeloxColumnarToRowConverter::refreshStates(facebook::velox::RowVectorPtr rowVector, int64_t startRow) { + auto vectorLength = rowVector->size(); numCols_ = rowVector->childrenSize(); fast_ = std::make_unique(rowVector); - size_t totalMemorySize = 0; + int64_t totalMemorySize; + if (auto fixedRowSize = velox::row::UnsafeRowFast::fixedRowSize(velox::asRowType(rowVector->type()))) { - totalMemorySize += fixedRowSize.value() * numRows_; + auto rowSize = fixedRowSize.value(); + // make sure it has at least one row + numRows_ = std::max(1, std::min(memThreshold_ / rowSize, vectorLength - startRow)); + totalMemorySize = numRows_ * rowSize; } else { - for (auto i = 0; i < numRows_; ++i) { - totalMemorySize += fast_->rowSize(i); + // Calculate the first row size + totalMemorySize = fast_->rowSize(startRow); + + auto endRow = startRow + 1; + for (; endRow < vectorLength; ++endRow) { + auto rowSize = fast_->rowSize(endRow); + if (UNLIKELY(totalMemorySize + rowSize > memThreshold_)) { + break; + } else { + totalMemorySize += rowSize; + } } + // Make sure the threshold is larger than the first row size + numRows_ = endRow - startRow; } - if (veloxBuffers_ == nullptr) { - // First allocate memory + if (nullptr == veloxBuffers_) { veloxBuffers_ = velox::AlignedBuffer::allocate(totalMemorySize, veloxPool_.get()); - } - - if (veloxBuffers_->capacity() < totalMemorySize) { + } else if (veloxBuffers_->capacity() < totalMemorySize) { velox::AlignedBuffer::reallocate(&veloxBuffers_, totalMemorySize); } @@ -53,9 +68,9 @@ void VeloxColumnarToRowConverter::refreshStates(facebook::velox::RowVectorPtr ro memset(bufferAddress_, 0, sizeof(int8_t) * totalMemorySize); } -void VeloxColumnarToRowConverter::convert(std::shared_ptr cb) { +void VeloxColumnarToRowConverter::convert(std::shared_ptr cb, int64_t startRow) { auto veloxBatch = VeloxColumnarBatch::from(veloxPool_.get(), cb); - refreshStates(veloxBatch->getRowVector()); + refreshStates(veloxBatch->getRowVector(), startRow); // Initialize the offsets_ , lengths_ lengths_.clear(); @@ -64,11 +79,11 @@ void VeloxColumnarToRowConverter::convert(std::shared_ptr cb) { offsets_.resize(numRows_, 0); size_t offset = 0; - for (auto rowIdx = 0; rowIdx < numRows_; ++rowIdx) { - auto rowSize = fast_->serialize(rowIdx, (char*)(bufferAddress_ + offset)); - lengths_[rowIdx] = rowSize; - if (rowIdx > 0) { - offsets_[rowIdx] = offsets_[rowIdx - 1] + lengths_[rowIdx - 1]; + for (auto i = 0; i < numRows_; ++i) { + auto rowSize = fast_->serialize(startRow + i, (char*)(bufferAddress_ + offset)); + lengths_[i] = rowSize; + if (i > 0) { + offsets_[i] = offsets_[i - 1] + lengths_[i - 1]; } offset += rowSize; } diff --git a/cpp/velox/operators/serializer/VeloxColumnarToRowConverter.h b/cpp/velox/operators/serializer/VeloxColumnarToRowConverter.h index 833ffa8aad98..540d991a6c65 100644 --- a/cpp/velox/operators/serializer/VeloxColumnarToRowConverter.h +++ b/cpp/velox/operators/serializer/VeloxColumnarToRowConverter.h @@ -29,17 +29,20 @@ namespace gluten { class VeloxColumnarToRowConverter final : public ColumnarToRowConverter { public: - explicit VeloxColumnarToRowConverter(std::shared_ptr veloxPool) - : ColumnarToRowConverter(), veloxPool_(veloxPool) {} + explicit VeloxColumnarToRowConverter( + std::shared_ptr veloxPool, + int64_t memThreshold) + : ColumnarToRowConverter(), veloxPool_(veloxPool), memThreshold_(memThreshold) {} - void convert(std::shared_ptr cb) override; + void convert(std::shared_ptr cb, int64_t startRow = 0) override; private: - void refreshStates(facebook::velox::RowVectorPtr rowVector); + void refreshStates(facebook::velox::RowVectorPtr rowVector, int64_t startRow); std::shared_ptr veloxPool_; std::shared_ptr fast_; facebook::velox::BufferPtr veloxBuffers_; + int64_t memThreshold_; }; } // namespace gluten diff --git a/cpp/velox/tests/RuntimeTest.cc b/cpp/velox/tests/RuntimeTest.cc index 563539d7d63e..1a353816f797 100644 --- a/cpp/velox/tests/RuntimeTest.cc +++ b/cpp/velox/tests/RuntimeTest.cc @@ -61,7 +61,7 @@ class DummyRuntime final : public Runtime { std::shared_ptr createOrGetEmptySchemaBatch(int32_t numRows) override { throw GlutenException("Not yet implemented"); } - std::shared_ptr createColumnar2RowConverter() override { + std::shared_ptr createColumnar2RowConverter(int64_t column2RowMemThreshold) override { throw GlutenException("Not yet implemented"); } std::shared_ptr createRow2ColumnarConverter(struct ArrowSchema* cSchema) override { diff --git a/cpp/velox/tests/VeloxColumnarToRowTest.cc b/cpp/velox/tests/VeloxColumnarToRowTest.cc index 2309e6e1cd30..3adacdda9d51 100644 --- a/cpp/velox/tests/VeloxColumnarToRowTest.cc +++ b/cpp/velox/tests/VeloxColumnarToRowTest.cc @@ -34,7 +34,7 @@ class VeloxColumnarToRowTest : public ::testing::Test, public test::VectorTestBa } void testRowBufferAddr(velox::RowVectorPtr vector, uint8_t* expectArr, int32_t expectArrSize) { - auto columnarToRowConverter = std::make_shared(pool_); + auto columnarToRowConverter = std::make_shared(pool_, 64 << 10); auto cb = std::make_shared(vector); columnarToRowConverter->convert(cb); diff --git a/cpp/velox/tests/VeloxRowToColumnarTest.cc b/cpp/velox/tests/VeloxRowToColumnarTest.cc index 93f780ca3a38..c784dbd59c34 100644 --- a/cpp/velox/tests/VeloxRowToColumnarTest.cc +++ b/cpp/velox/tests/VeloxRowToColumnarTest.cc @@ -33,7 +33,7 @@ class VeloxRowToColumnarTest : public ::testing::Test, public test::VectorTestBa } void testRowVectorEqual(velox::RowVectorPtr vector) { - auto columnarToRowConverter = std::make_shared(pool_); + auto columnarToRowConverter = std::make_shared(pool_, 64 << 10); auto columnarBatch = std::make_shared(vector); columnarToRowConverter->convert(columnarBatch); diff --git a/gluten-data/src/main/java/org/apache/gluten/vectorized/NativeColumnarToRowJniWrapper.java b/gluten-data/src/main/java/org/apache/gluten/vectorized/NativeColumnarToRowJniWrapper.java index 7f8de78f95ef..ffcb77ad32c8 100644 --- a/gluten-data/src/main/java/org/apache/gluten/vectorized/NativeColumnarToRowJniWrapper.java +++ b/gluten-data/src/main/java/org/apache/gluten/vectorized/NativeColumnarToRowJniWrapper.java @@ -37,8 +37,8 @@ public long handle() { public native long nativeColumnarToRowInit() throws RuntimeException; - public native NativeColumnarToRowInfo nativeColumnarToRowConvert(long c2rHandle, long batchHandle) - throws RuntimeException; + public native NativeColumnarToRowInfo nativeColumnarToRowConvert( + long c2rHandle, long batchHandle, long rowId) throws RuntimeException; public native void nativeClose(long c2rHandle); } diff --git a/gluten-data/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala b/gluten-data/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala index f7bcfd694d52..9f13ea967a8d 100644 --- a/gluten-data/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala +++ b/gluten-data/src/main/scala/org/apache/spark/sql/execution/ColumnarBuildSideRelation.scala @@ -135,8 +135,11 @@ case class ColumnarBuildSideRelation(output: Seq[Attribute], batches: Array[Arra } else { val cols = batch.numCols() val rows = batch.numRows() - val info = - jniWrapper.nativeColumnarToRowConvert(c2rId, ColumnarBatches.getNativeHandle(batch)) + var info = + jniWrapper.nativeColumnarToRowConvert( + c2rId, + ColumnarBatches.getNativeHandle(batch), + 0) batch.close() val columnNames = key.flatMap { case expression: AttributeReference => @@ -183,6 +186,7 @@ case class ColumnarBuildSideRelation(output: Seq[Attribute], batches: Array[Arra new Iterator[InternalRow] { var rowId = 0 + var baseLength = 0 val row = new UnsafeRow(cols) override def hasNext: Boolean = { @@ -191,8 +195,12 @@ case class ColumnarBuildSideRelation(output: Seq[Attribute], batches: Array[Arra override def next: UnsafeRow = { if (rowId >= rows) throw new NoSuchElementException - - val (offset, length) = (info.offsets(rowId), info.lengths(rowId)) + if (rowId == baseLength + info.lengths.length) { + baseLength += info.lengths.length + info = jniWrapper.nativeColumnarToRowConvert(batchHandle, c2rId, rowId) + } + val (offset, length) = + (info.offsets(rowId - baseLength), info.lengths(rowId - baseLength)) row.pointTo(null, info.memoryAddress + offset, length.toInt) rowId += 1 row diff --git a/gluten-data/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala b/gluten-data/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala index 77f35ff48fcc..94bdc73a5b50 100644 --- a/gluten-data/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala +++ b/gluten-data/src/main/scala/org/apache/spark/sql/execution/utils/ExecUtil.scala @@ -46,11 +46,12 @@ object ExecUtil { var info: NativeColumnarToRowInfo = null val batchHandle = ColumnarBatches.getNativeHandle(batch) val c2rHandle = jniWrapper.nativeColumnarToRowInit() - info = jniWrapper.nativeColumnarToRowConvert(c2rHandle, batchHandle) + info = jniWrapper.nativeColumnarToRowConvert(c2rHandle, batchHandle, 0) Iterators .wrap(new Iterator[InternalRow] { var rowId = 0 + var baseLength = 0 val row = new UnsafeRow(batch.numCols()) override def hasNext: Boolean = { @@ -59,7 +60,12 @@ object ExecUtil { override def next: UnsafeRow = { if (rowId >= batch.numRows()) throw new NoSuchElementException - val (offset, length) = (info.offsets(rowId), info.lengths(rowId)) + if (rowId == baseLength + info.lengths.length) { + baseLength += info.lengths.length + info = jniWrapper.nativeColumnarToRowConvert(c2rHandle, batchHandle, rowId) + } + val (offset, length) = + (info.offsets(rowId - baseLength), info.lengths(rowId - baseLength)) row.pointTo(null, info.memoryAddress + offset, length.toInt) rowId += 1 row diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala index e3f6f1d984ed..ed7a81192994 100644 --- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala +++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala @@ -190,6 +190,9 @@ class GlutenConfig(conf: SQLConf) extends Logging { // FIXME: Not clear: MIN or MAX ? def maxBatchSize: Int = conf.getConf(COLUMNAR_MAX_BATCH_SIZE) + def columnarToRowMemThreshold: Long = + conf.getConf(GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD) + def shuffleWriterBufferSize: Int = conf .getConf(SHUFFLE_WRITER_BUFFER_SIZE) .getOrElse(maxBatchSize) @@ -578,11 +581,14 @@ object GlutenConfig { val GLUTEN_SORT_SHUFFLE_WRITER = "sort" val GLUTEN_RSS_SORT_SHUFFLE_WRITER = "rss_sort" - // Shuffle writer buffer size. + // Shuffle Writer buffer size. val GLUTEN_SHUFFLE_WRITER_BUFFER_SIZE = "spark.gluten.shuffleWriter.bufferSize" val GLUTEN_SHUFFLE_WRITER_MERGE_THRESHOLD = "spark.gluten.sql.columnar.shuffle.merge.threshold" + // Columnar to row memory threshold. + val GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD_KEY = "spark.gluten.sql.columnarToRowMemoryThreshold" + // Controls whether to load DLL from jars. User can get dependent native libs packed into a jar // by executing dev/package.sh. Then, with that jar configured, Gluten can load the native libs // at runtime. This config is just for velox backend. And it is NOT applicable to the situation @@ -647,6 +653,7 @@ object GlutenConfig { GLUTEN_SAVE_DIR, GLUTEN_TASK_OFFHEAP_SIZE_IN_BYTES_KEY, GLUTEN_MAX_BATCH_SIZE_KEY, + GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD_KEY, GLUTEN_SHUFFLE_WRITER_BUFFER_SIZE, SQLConf.SESSION_LOCAL_TIMEZONE.key, GLUTEN_DEFAULT_SESSION_TIMEZONE_KEY, @@ -1114,6 +1121,12 @@ object GlutenConfig { .checkValue(_ > 0, s"$GLUTEN_MAX_BATCH_SIZE_KEY must be positive.") .createWithDefault(4096) + val GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD = + buildConf(GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD_KEY) + .internal() + .bytesConf(ByteUnit.BYTE) + .createWithDefaultString("64MB") + // if not set, use COLUMNAR_MAX_BATCH_SIZE instead val SHUFFLE_WRITER_BUFFER_SIZE = buildConf(GLUTEN_SHUFFLE_WRITER_BUFFER_SIZE)