Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
XinShuoWang committed May 20, 2024
1 parent cd18ef0 commit a19be93
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ object VeloxColumnarToRowExec {
baseLength += info.lengths.length
val before = System.currentTimeMillis()
info = jniWrapper.nativeColumnarToRowConvert(batchHandle, c2rId, rowId)
convertTime += (System.currentTimeMillis() - beforeConvert)
convertTime += (System.currentTimeMillis() - before)
}
val (offset, length) =
(info.offsets(rowId - baseLength), info.lengths(rowId - baseLength))
Expand Down
3 changes: 3 additions & 0 deletions cpp/core/operators/c2r/ColumnarToRow.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ class ColumnarToRowConverter {

virtual ~ColumnarToRowConverter() = default;

// We will start conversion from the 'rowId' row of 'cb'. The maximum memory consumption during the grabbing and
// swapping process is 'memoryThreshold' bytes. The number of rows successfully converted is stored in the 'numRows_'
// variable.
virtual void
convert(std::shared_ptr<ColumnarBatch> cb = nullptr, int64_t rowId = 0, int64_t memoryThreshold = INT64_MAX) = 0;

Expand Down
17 changes: 10 additions & 7 deletions cpp/velox/operators/serializer/VeloxColumnarToRowConverter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ using namespace facebook;

namespace gluten {

int64_t VeloxColumnarToRowConverter::refreshStates(
void VeloxColumnarToRowConverter::refreshStates(
facebook::velox::RowVectorPtr rowVector,
int64_t rowId,
int64_t memoryThreshold) {
Expand All @@ -41,22 +41,26 @@ int64_t VeloxColumnarToRowConverter::refreshStates(
if (auto fixedRowSize = velox::row::UnsafeRowFast::fixedRowSize(velox::asRowType(rowVector->type()))) {
GLUTEN_CHECK(
memoryThreshold > fixedRowSize.value(),
"Only 1 row exceed Column2RowMemoryThreshold, which is " + velox::succinctBytes(memoryThreshold));
"spark.gluten.sql.columnarToRowMemoryThreshold(" + velox::succinctBytes(memoryThreshold) +
") is too small, it can't hold even one row(" + velox::succinctBytes(fixedRowSize.value()) + ")");
auto rowSize = fixedRowSize.value();
numRows_ = std::min<int64_t>(memoryThreshold / rowSize, vectorLength - rowId);
totalMemorySize = rowSize * numRows_;
} else {
for (auto i = rowId; i < vectorLength; ++i) {
int64_t i = rowId;
for (; i < vectorLength; ++i) {
auto rowSize = fast_->rowSize(i);
if (UNLIKELY(totalMemorySize + rowSize > memoryThreshold)) {
GLUTEN_CHECK(
i >= 1, "Only 1 row exceed Column2RowMemoryThreshold, which is " + velox::succinctBytes(memoryThreshold));
i >= 1,
"spark.gluten.sql.columnarToRowMemoryThreshold(" + velox::succinctBytes(memoryThreshold) +
") is too small, it can't hold even one row(" + velox::succinctBytes(rowSize) + ")");
break;
} else {
totalMemorySize += rowSize;
}
numRows_ = i + 1 - rowId;
}
numRows_ = i - rowId;
}

if (veloxBuffers_ == nullptr) {
Expand All @@ -65,7 +69,6 @@ int64_t VeloxColumnarToRowConverter::refreshStates(

bufferAddress_ = veloxBuffers_->asMutable<uint8_t>();
memset(bufferAddress_, 0, sizeof(int8_t) * totalMemorySize);
return numRows_;
}

void VeloxColumnarToRowConverter::convert(std::shared_ptr<ColumnarBatch> cb, int64_t rowId, int64_t memoryThreshold) {
Expand All @@ -89,4 +92,4 @@ void VeloxColumnarToRowConverter::convert(std::shared_ptr<ColumnarBatch> cb, int
}
}

} // namespace gluten
} // namespace gluten
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class VeloxColumnarToRowConverter final : public ColumnarToRowConverter {
void convert(std::shared_ptr<ColumnarBatch> cb, int64_t rowId = 0, int64_t memoryThreshold = INT64_MAX) override;

private:
int64_t refreshStates(facebook::velox::RowVectorPtr rowVector, int64_t rowId, int64_t memoryThreshold);
void refreshStates(facebook::velox::RowVectorPtr rowVector, int64_t rowId, int64_t memoryThreshold);

std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool_;
std::shared_ptr<facebook::velox::row::UnsafeRowFast> fast_;
Expand Down
12 changes: 6 additions & 6 deletions shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ class GlutenConfig(conf: SQLConf) extends Logging {

def maxBatchSize: Int = conf.getConf(COLUMNAR_MAX_BATCH_SIZE)

def boltColumn2RowMemThreshold: Long =
conf.getConf(GLUTEN_COLUMN_TO_ROW_MEM_THRESHOLD)
def columnarToRowMemThreshold: Long =
conf.getConf(GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD)

def shuffleWriterBufferSize: Int = conf
.getConf(SHUFFLE_WRITER_BUFFER_SIZE)
Expand Down Expand Up @@ -517,8 +517,8 @@ object GlutenConfig {
// Batch size.
val GLUTEN_MAX_BATCH_SIZE_KEY = "spark.gluten.sql.columnar.maxBatchSize"

val GLUTEN_COLUMN_TO_ROW_MEM_THRESHOLD_KEY =
"spark.gluten.sql.columnToRowMemoryThreshold"
val GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD_KEY =
"spark.gluten.sql.columnarToRowMemoryThreshold"

// Shuffle Writer buffer size.
val GLUTEN_SHUFFLE_WRITER_BUFFER_SIZE = "spark.gluten.shuffleWriter.bufferSize"
Expand Down Expand Up @@ -1038,8 +1038,8 @@ object GlutenConfig {
.checkValue(_ > 0, s"$GLUTEN_MAX_BATCH_SIZE_KEY must be positive.")
.createWithDefault(4096)

val GLUTEN_COLUMN_TO_ROW_MEM_THRESHOLD =
buildConf(GLUTEN_COLUMN_TO_ROW_MEM_THRESHOLD_KEY)
val GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD =
buildConf(GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD_KEY)
.internal()
.longConf
.createWithDefault(256 * 1024 * 1024)
Expand Down

0 comments on commit a19be93

Please sign in to comment.