Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] Use conf to control C2R occupied memory #5952

Merged
merged 30 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
0591ac5
Use conf to control C2R occupied memory
XinShuoWang May 17, 2024
6b9b4d1
update default conf value
Jun 20, 2024
dbe1624
fix
Jul 4, 2024
6757787
Update JniWrapper.cc
FelixYBW Jul 20, 2024
c865da0
fix style
FelixYBW Jul 20, 2024
fe012f3
fixup
zhztheplayer Jul 26, 2024
2ea3961
fixup
zhztheplayer Jul 26, 2024
5acd195
fixup
zhztheplayer Jul 26, 2024
e1a2a26
Update VeloxColumnarToRowConverter.cc
FelixYBW Aug 1, 2024
a18fdd7
Update JniWrapper.cc
FelixYBW Aug 1, 2024
05a7763
Update ColumnarToRow.h
FelixYBW Aug 1, 2024
a140b8f
Update VeloxColumnarToRowConverter.cc
FelixYBW Aug 1, 2024
f6d36e2
Update VeloxColumnarToRowConverter.h
FelixYBW Aug 1, 2024
5a9b104
Update VeloxRuntime.cc
FelixYBW Aug 1, 2024
96e6948
Update VeloxRuntime.h
FelixYBW Aug 1, 2024
01f0755
Update Runtime.h
FelixYBW Aug 1, 2024
95dc709
Update RuntimeTest.cc
FelixYBW Aug 1, 2024
68fc73c
Update ColumnarToRow.h
FelixYBW Aug 1, 2024
8d97833
Update VeloxColumnarToRowConverter.cc
FelixYBW Aug 1, 2024
70fd1ec
set rowsize in benchmark to 64M
FelixYBW Aug 1, 2024
6802015
Update ColumnarToRowBenchmark.cc
FelixYBW Aug 1, 2024
c4f22f6
Update ColumnarToRowBenchmark.cc
FelixYBW Aug 1, 2024
d223d27
Update VeloxColumnarToRowTest.cc
FelixYBW Aug 1, 2024
d27136d
Update VeloxRowToColumnarTest.cc
FelixYBW Aug 1, 2024
bf1c97a
Merge branch 'main' into c2r_oom
FelixYBW Aug 1, 2024
3eac3b3
Update VeloxRowToColumnarTest.cc
FelixYBW Aug 2, 2024
1bcda04
Update VeloxColumnarToRowTest.cc
FelixYBW Aug 2, 2024
345d1d9
fixup
zhztheplayer Aug 2, 2024
d47adaa
allocate smaller memory if batch is small
FelixYBW Aug 3, 2024
6bd1fe9
Merge branch 'main' into c2r_oom
zhztheplayer Aug 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -147,21 +147,29 @@ object VeloxColumnarToRowExec {
val rows = batch.numRows()
val beforeConvert = System.currentTimeMillis()
val batchHandle = ColumnarBatches.getNativeHandle(batch)
val info =
jniWrapper.nativeColumnarToRowConvert(c2rId, batchHandle)
var info =
jniWrapper.nativeColumnarToRowConvert(c2rId, batchHandle, 0)

convertTime += (System.currentTimeMillis() - beforeConvert)

new Iterator[InternalRow] {
var rowId = 0
var baseLength = 0
val row = new UnsafeRow(cols)

override def hasNext: Boolean = {
rowId < rows
}

override def next: UnsafeRow = {
val (offset, length) = (info.offsets(rowId), info.lengths(rowId))
if (rowId == baseLength + info.lengths.length) {
baseLength += info.lengths.length
val before = System.currentTimeMillis()
info = jniWrapper.nativeColumnarToRowConvert(c2rId, batchHandle, rowId)
convertTime += (System.currentTimeMillis() - before)
}
val (offset, length) =
(info.offsets(rowId - baseLength), info.lengths(rowId - baseLength))
row.pointTo(null, info.memoryAddress + offset, length)
rowId += 1
row
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.gluten.execution

import org.apache.gluten.GlutenConfig

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, Row, TestUtils}
import org.apache.spark.sql.execution.FormattedMode
Expand Down Expand Up @@ -253,6 +255,7 @@ class VeloxTPCHDistinctSpillSuite extends VeloxTPCHTableSupport {
super.sparkConf
.set("spark.memory.offHeap.size", "50m")
.set("spark.gluten.memory.overAcquiredMemoryRatio", "0.9") // to trigger distinct spill early
.set(GlutenConfig.GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD_KEY, "8k")
}

test("distinct spill") {
Expand Down
2 changes: 1 addition & 1 deletion cpp/core/compute/Runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class Runtime : public std::enable_shared_from_this<Runtime> {

/// This function is used to create certain converter from the format used by
/// the backend to Spark unsafe row.
virtual std::shared_ptr<ColumnarToRowConverter> createColumnar2RowConverter() = 0;
virtual std::shared_ptr<ColumnarToRowConverter> createColumnar2RowConverter(int64_t column2RowMemThreshold) = 0;

virtual std::shared_ptr<RowToColumnarConverter> createRow2ColumnarConverter(struct ArrowSchema* cSchema) = 0;

Expand Down
3 changes: 3 additions & 0 deletions cpp/core/config/GlutenConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ const std::string kGzipWindowSize4k = "4096";

const std::string kParquetCompressionCodec = "spark.sql.parquet.compression.codec";

const std::string kColumnarToRowMemoryThreshold = "spark.gluten.sql.columnarToRowMemoryThreshold";
const std::string kColumnarToRowMemoryDefaultThreshold = "67108864"; // 64MB

const std::string kUGIUserName = "spark.gluten.ugi.username";
const std::string kUGITokens = "spark.gluten.ugi.tokens";

Expand Down
29 changes: 25 additions & 4 deletions cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/

#include <jni.h>
#include <algorithm>
#include <cstdint>
#include <filesystem>

#include "compute/Runtime.h"
Expand All @@ -27,6 +29,7 @@

#include <arrow/c/bridge.h>
#include <optional>
#include <string>
#include "memory/AllocationListener.h"
#include "operators/serializer/ColumnarBatchSerializer.h"
#include "shuffle/LocalPartitionWriter.h"
Expand Down Expand Up @@ -528,8 +531,24 @@ Java_org_apache_gluten_vectorized_NativeColumnarToRowJniWrapper_nativeColumnarTo
JNI_METHOD_START
auto ctx = gluten::getRuntime(env, wrapper);

auto& conf = ctx->getConfMap();
int64_t column2RowMemThreshold;
auto it = conf.find(kColumnarToRowMemoryThreshold);
bool confIsLegal =
((it == conf.end()) ? false : std::all_of(it->second.begin(), it->second.end(), [](unsigned char c) {
return std::isdigit(c);
}));
if (confIsLegal) {
column2RowMemThreshold = std::stoll(it->second);
} else {
LOG(INFO)
<< "Because the spark.gluten.sql.columnarToRowMemoryThreshold configuration item is invalid, the kColumnarToRowMemoryDefaultThreshold default value is used, which is "
<< kColumnarToRowMemoryDefaultThreshold << " byte";
column2RowMemThreshold = std::stoll(kColumnarToRowMemoryDefaultThreshold);
}

// Convert the native batch to Spark unsafe row.
return ctx->saveObject(ctx->createColumnar2RowConverter());
return ctx->saveObject(ctx->createColumnar2RowConverter(column2RowMemThreshold));
JNI_METHOD_END(kInvalidObjectHandle)
}

Expand All @@ -538,16 +557,18 @@ Java_org_apache_gluten_vectorized_NativeColumnarToRowJniWrapper_nativeColumnarTo
JNIEnv* env,
jobject wrapper,
jlong c2rHandle,
jlong batchHandle) {
jlong batchHandle,
jlong startRow) {
JNI_METHOD_START
auto columnarToRowConverter = ObjectStore::retrieve<ColumnarToRowConverter>(c2rHandle);
auto cb = ObjectStore::retrieve<ColumnarBatch>(batchHandle);
columnarToRowConverter->convert(cb);

columnarToRowConverter->convert(cb, startRow);

const auto& offsets = columnarToRowConverter->getOffsets();
const auto& lengths = columnarToRowConverter->getLengths();

auto numRows = cb->numRows();
auto numRows = columnarToRowConverter->numRows();

auto offsetsArr = env->NewIntArray(numRows);
auto offsetsSrc = reinterpret_cast<const jint*>(offsets.data());
Expand Down
10 changes: 9 additions & 1 deletion cpp/core/operators/c2r/ColumnarToRow.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#pragma once

#include <cstdint>
#include "memory/ColumnarBatch.h"

namespace gluten {
Expand All @@ -27,7 +28,14 @@ class ColumnarToRowConverter {

virtual ~ColumnarToRowConverter() = default;

virtual void convert(std::shared_ptr<ColumnarBatch> cb = nullptr) = 0;
// We will start conversion from the 'rowId' row of 'cb'. The maximum memory consumption during the grabbing and
// swapping process is 'memoryThreshold' bytes. The number of rows successfully converted is stored in the 'numRows_'
// variable.
virtual void convert(std::shared_ptr<ColumnarBatch> cb = nullptr, int64_t startRow = 0) = 0;

virtual int32_t numRows() {
return numRows_;
}

uint8_t* getBufferAddress() const {
return bufferAddress_;
Expand Down
4 changes: 2 additions & 2 deletions cpp/velox/benchmarks/ColumnarToRowBenchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ class GoogleBenchmarkColumnarToRowCacheScanBenchmark : public GoogleBenchmarkCol
for (auto _ : state) {
for (const auto& vector : vectors) {
auto row = std::dynamic_pointer_cast<velox::RowVector>(vector);
auto columnarToRowConverter = std::make_shared<gluten::VeloxColumnarToRowConverter>(ctxPool);
auto columnarToRowConverter = std::make_shared<gluten::VeloxColumnarToRowConverter>(ctxPool, 64 << 20);
auto cb = std::make_shared<VeloxColumnarBatch>(row);
TIME_NANO_START(writeTime);
columnarToRowConverter->convert(cb);
Expand Down Expand Up @@ -212,7 +212,7 @@ class GoogleBenchmarkColumnarToRowIterateScanBenchmark : public GoogleBenchmarkC
numBatches += 1;
numRows += recordBatch->num_rows();
auto vector = recordBatch2RowVector(*recordBatch);
auto columnarToRowConverter = std::make_shared<gluten::VeloxColumnarToRowConverter>(ctxPool);
auto columnarToRowConverter = std::make_shared<gluten::VeloxColumnarToRowConverter>(ctxPool, 64 << 20);
auto row = std::dynamic_pointer_cast<velox::RowVector>(vector);
auto cb = std::make_shared<VeloxColumnarBatch>(row);
TIME_NANO_START(writeTime);
Expand Down
4 changes: 2 additions & 2 deletions cpp/velox/compute/VeloxRuntime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,9 @@ std::shared_ptr<ResultIterator> VeloxRuntime::createResultIterator(
return std::make_shared<ResultIterator>(std::move(wholestageIter), this);
}

std::shared_ptr<ColumnarToRowConverter> VeloxRuntime::createColumnar2RowConverter() {
std::shared_ptr<ColumnarToRowConverter> VeloxRuntime::createColumnar2RowConverter(int64_t column2RowMemThreshold) {
auto veloxPool = vmm_->getLeafMemoryPool();
return std::make_shared<VeloxColumnarToRowConverter>(veloxPool);
return std::make_shared<VeloxColumnarToRowConverter>(veloxPool, column2RowMemThreshold);
}

std::shared_ptr<ColumnarBatch> VeloxRuntime::createOrGetEmptySchemaBatch(int32_t numRows) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/velox/compute/VeloxRuntime.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class VeloxRuntime final : public Runtime {
const std::vector<std::shared_ptr<ResultIterator>>& inputs = {},
const std::unordered_map<std::string, std::string>& sessionConf = {}) override;

std::shared_ptr<ColumnarToRowConverter> createColumnar2RowConverter() override;
std::shared_ptr<ColumnarToRowConverter> createColumnar2RowConverter(int64_t column2RowMemThreshold) override;

std::shared_ptr<ColumnarBatch> createOrGetEmptySchemaBatch(int32_t numRows) override;

Expand Down
51 changes: 33 additions & 18 deletions cpp/velox/operators/serializer/VeloxColumnarToRowConverter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,46 +16,61 @@
*/

#include "VeloxColumnarToRowConverter.h"
#include <velox/common/base/SuccinctPrinter.h>
#include <cstdint>

#include "memory/VeloxColumnarBatch.h"
#include "utils/exception.h"
#include "velox/row/UnsafeRowDeserializers.h"
#include "velox/row/UnsafeRowFast.h"

using namespace facebook;

namespace gluten {

void VeloxColumnarToRowConverter::refreshStates(facebook::velox::RowVectorPtr rowVector) {
numRows_ = rowVector->size();
void VeloxColumnarToRowConverter::refreshStates(facebook::velox::RowVectorPtr rowVector, int64_t startRow) {
auto vectorLength = rowVector->size();
numCols_ = rowVector->childrenSize();

fast_ = std::make_unique<velox::row::UnsafeRowFast>(rowVector);

size_t totalMemorySize = 0;
int64_t totalMemorySize;

if (auto fixedRowSize = velox::row::UnsafeRowFast::fixedRowSize(velox::asRowType(rowVector->type()))) {
totalMemorySize += fixedRowSize.value() * numRows_;
auto rowSize = fixedRowSize.value();
// make sure it has at least one row
numRows_ = std::max<int32_t>(1, std::min<int64_t>(memThreshold_ / rowSize, vectorLength - startRow));
totalMemorySize = numRows_ * rowSize;
} else {
for (auto i = 0; i < numRows_; ++i) {
totalMemorySize += fast_->rowSize(i);
// Calculate the first row size
totalMemorySize = fast_->rowSize(startRow);

auto endRow = startRow + 1;
for (; endRow < vectorLength; ++endRow) {
auto rowSize = fast_->rowSize(endRow);
if (UNLIKELY(totalMemorySize + rowSize > memThreshold_)) {
break;
} else {
totalMemorySize += rowSize;
}
}
// Make sure the threshold is larger than the first row size
numRows_ = endRow - startRow;
}

if (veloxBuffers_ == nullptr) {
// First allocate memory
if (nullptr == veloxBuffers_) {
veloxBuffers_ = velox::AlignedBuffer::allocate<uint8_t>(totalMemorySize, veloxPool_.get());
}

if (veloxBuffers_->capacity() < totalMemorySize) {
} else if (veloxBuffers_->capacity() < totalMemorySize) {
velox::AlignedBuffer::reallocate<uint8_t>(&veloxBuffers_, totalMemorySize);
}

bufferAddress_ = veloxBuffers_->asMutable<uint8_t>();
memset(bufferAddress_, 0, sizeof(int8_t) * totalMemorySize);
}

void VeloxColumnarToRowConverter::convert(std::shared_ptr<ColumnarBatch> cb) {
void VeloxColumnarToRowConverter::convert(std::shared_ptr<ColumnarBatch> cb, int64_t startRow) {
auto veloxBatch = VeloxColumnarBatch::from(veloxPool_.get(), cb);
refreshStates(veloxBatch->getRowVector());
refreshStates(veloxBatch->getRowVector(), startRow);

// Initialize the offsets_ , lengths_
lengths_.clear();
Expand All @@ -64,11 +79,11 @@ void VeloxColumnarToRowConverter::convert(std::shared_ptr<ColumnarBatch> cb) {
offsets_.resize(numRows_, 0);

size_t offset = 0;
for (auto rowIdx = 0; rowIdx < numRows_; ++rowIdx) {
auto rowSize = fast_->serialize(rowIdx, (char*)(bufferAddress_ + offset));
lengths_[rowIdx] = rowSize;
if (rowIdx > 0) {
offsets_[rowIdx] = offsets_[rowIdx - 1] + lengths_[rowIdx - 1];
for (auto i = 0; i < numRows_; ++i) {
auto rowSize = fast_->serialize(startRow + i, (char*)(bufferAddress_ + offset));
lengths_[i] = rowSize;
if (i > 0) {
offsets_[i] = offsets_[i - 1] + lengths_[i - 1];
}
offset += rowSize;
}
Expand Down
11 changes: 7 additions & 4 deletions cpp/velox/operators/serializer/VeloxColumnarToRowConverter.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,20 @@ namespace gluten {

class VeloxColumnarToRowConverter final : public ColumnarToRowConverter {
public:
explicit VeloxColumnarToRowConverter(std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool)
: ColumnarToRowConverter(), veloxPool_(veloxPool) {}
explicit VeloxColumnarToRowConverter(
std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool,
int64_t memThreshold)
: ColumnarToRowConverter(), veloxPool_(veloxPool), memThreshold_(memThreshold) {}

void convert(std::shared_ptr<ColumnarBatch> cb) override;
void convert(std::shared_ptr<ColumnarBatch> cb, int64_t startRow = 0) override;

private:
void refreshStates(facebook::velox::RowVectorPtr rowVector);
void refreshStates(facebook::velox::RowVectorPtr rowVector, int64_t startRow);

std::shared_ptr<facebook::velox::memory::MemoryPool> veloxPool_;
std::shared_ptr<facebook::velox::row::UnsafeRowFast> fast_;
facebook::velox::BufferPtr veloxBuffers_;
int64_t memThreshold_;
};

} // namespace gluten
2 changes: 1 addition & 1 deletion cpp/velox/tests/RuntimeTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class DummyRuntime final : public Runtime {
std::shared_ptr<ColumnarBatch> createOrGetEmptySchemaBatch(int32_t numRows) override {
throw GlutenException("Not yet implemented");
}
std::shared_ptr<ColumnarToRowConverter> createColumnar2RowConverter() override {
std::shared_ptr<ColumnarToRowConverter> createColumnar2RowConverter(int64_t column2RowMemThreshold) override {
throw GlutenException("Not yet implemented");
}
std::shared_ptr<RowToColumnarConverter> createRow2ColumnarConverter(struct ArrowSchema* cSchema) override {
Expand Down
2 changes: 1 addition & 1 deletion cpp/velox/tests/VeloxColumnarToRowTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class VeloxColumnarToRowTest : public ::testing::Test, public test::VectorTestBa
}

void testRowBufferAddr(velox::RowVectorPtr vector, uint8_t* expectArr, int32_t expectArrSize) {
auto columnarToRowConverter = std::make_shared<VeloxColumnarToRowConverter>(pool_);
auto columnarToRowConverter = std::make_shared<VeloxColumnarToRowConverter>(pool_, 64 << 10);

auto cb = std::make_shared<VeloxColumnarBatch>(vector);
columnarToRowConverter->convert(cb);
Expand Down
2 changes: 1 addition & 1 deletion cpp/velox/tests/VeloxRowToColumnarTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class VeloxRowToColumnarTest : public ::testing::Test, public test::VectorTestBa
}

void testRowVectorEqual(velox::RowVectorPtr vector) {
auto columnarToRowConverter = std::make_shared<VeloxColumnarToRowConverter>(pool_);
auto columnarToRowConverter = std::make_shared<VeloxColumnarToRowConverter>(pool_, 64 << 10);

auto columnarBatch = std::make_shared<VeloxColumnarBatch>(vector);
columnarToRowConverter->convert(columnarBatch);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ public long handle() {

public native long nativeColumnarToRowInit() throws RuntimeException;

public native NativeColumnarToRowInfo nativeColumnarToRowConvert(long c2rHandle, long batchHandle)
throws RuntimeException;
public native NativeColumnarToRowInfo nativeColumnarToRowConvert(
long c2rHandle, long batchHandle, long rowId) throws RuntimeException;

public native void nativeClose(long c2rHandle);
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,11 @@ case class ColumnarBuildSideRelation(output: Seq[Attribute], batches: Array[Arra
} else {
val cols = batch.numCols()
val rows = batch.numRows()
val info =
jniWrapper.nativeColumnarToRowConvert(c2rId, ColumnarBatches.getNativeHandle(batch))
var info =
jniWrapper.nativeColumnarToRowConvert(
c2rId,
ColumnarBatches.getNativeHandle(batch),
0)
batch.close()
val columnNames = key.flatMap {
case expression: AttributeReference =>
Expand Down Expand Up @@ -183,6 +186,7 @@ case class ColumnarBuildSideRelation(output: Seq[Attribute], batches: Array[Arra

new Iterator[InternalRow] {
var rowId = 0
var baseLength = 0
val row = new UnsafeRow(cols)

override def hasNext: Boolean = {
Expand All @@ -191,8 +195,12 @@ case class ColumnarBuildSideRelation(output: Seq[Attribute], batches: Array[Arra

override def next: UnsafeRow = {
if (rowId >= rows) throw new NoSuchElementException

val (offset, length) = (info.offsets(rowId), info.lengths(rowId))
if (rowId == baseLength + info.lengths.length) {
baseLength += info.lengths.length
info = jniWrapper.nativeColumnarToRowConvert(batchHandle, c2rId, rowId)
}
val (offset, length) =
(info.offsets(rowId - baseLength), info.lengths(rowId - baseLength))
row.pointTo(null, info.memoryAddress + offset, length.toInt)
rowId += 1
row
Expand Down
Loading
Loading