Skip to content

Commit

Permalink
update default conf value
Browse files Browse the repository at this point in the history
fix coredump

modify log level

fix

format
  • Loading branch information
wangxinshuo.db committed Jul 1, 2024
1 parent 1975659 commit 2731284
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 11 deletions.
1 change: 1 addition & 0 deletions cpp/core/config/GlutenConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ const std::string kGzipWindowSize4k = "4096";
const std::string kParquetCompressionCodec = "spark.sql.parquet.compression.codec";

const std::string kColumnToRowMemoryThreshold = "spark.gluten.sql.columnToRowMemoryThreshold";
const std::string kColumnToRowMemoryDefaultThreshold = "67108864"; // 64MB

const std::string kUGIUserName = "spark.gluten.ugi.username";
const std::string kUGITokens = "spark.gluten.ugi.tokens";
Expand Down
18 changes: 13 additions & 5 deletions cpp/core/jni/JniWrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -550,11 +550,19 @@ Java_org_apache_gluten_vectorized_NativeColumnarToRowJniWrapper_nativeColumnarTo
columnarToRowConverter->convert(cb);

auto& conf = ctx->getConfMap();
int64_t column2RowMemThreshold = 256 * 1024 * 1024;
if (auto it = conf.find(kColumnToRowMemoryThreshold); it != conf.end()) {
if (std::all_of(it->second.begin(), it->second.end(), [](unsigned char c) { return std::isdigit(c); })) {
column2RowMemThreshold = std::stoll(it->second);
}
int64_t column2RowMemThreshold;
auto it = conf.find(kColumnToRowMemoryThreshold);
bool confIsLeagal =
((it == conf.end()) ? false : std::all_of(it->second.begin(), it->second.end(), [](unsigned char c) {
return std::isdigit(c);
}));
if (confIsLeagal) {
column2RowMemThreshold = std::stoll(it->second);
} else {
LOG(INFO)
<< "Because the spark.gluten.sql.columnToRowMemoryThreshold configuration item is invalid, the kColumnToRowMemoryDefaultThreshold default value is used, which is "
<< kColumnToRowMemoryDefaultThreshold << " byte";
column2RowMemThreshold = std::stoll(kColumnToRowMemoryDefaultThreshold);
}

columnarToRowConverter->convert(cb, rowId, column2RowMemThreshold);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ void VeloxColumnarToRowConverter::refreshStates(
if (UNLIKELY(totalMemorySize + rowSize > memoryThreshold)) {
if (i == rowId) {
memoryThreshold = rowSize;
LOG(ERROR) << "spark.gluten.sql.columnarToRowMemoryThreshold(" + velox::succinctBytes(memoryThreshold) +
LOG(INFO) << "spark.gluten.sql.columnarToRowMemoryThreshold(" + velox::succinctBytes(memoryThreshold) +
") is too small, it can't hold even one row(" + velox::succinctBytes(rowSize) + ")";
}
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ public long handle() {

public native long nativeColumnarToRowInit() throws RuntimeException;

public native NativeColumnarToRowInfo nativeColumnarToRowConvert(long c2rHandle, long batchHandle, long rowId)
throws RuntimeException;
public native NativeColumnarToRowInfo nativeColumnarToRowConvert(
long c2rHandle, long batchHandle, long rowId) throws RuntimeException;

public native void nativeClose(long c2rHandle);
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,10 @@ case class ColumnarBuildSideRelation(output: Seq[Attribute], batches: Array[Arra
val cols = batch.numCols()
val rows = batch.numRows()
var info =
jniWrapper.nativeColumnarToRowConvert(c2rId, ColumnarBatches.getNativeHandle(batch), 0)
jniWrapper.nativeColumnarToRowConvert(
c2rId,
ColumnarBatches.getNativeHandle(batch),
0)
batch.close()
val columnNames = key.flatMap {
case expression: AttributeReference =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1093,8 +1093,8 @@ object GlutenConfig {
val GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD =
buildConf(GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD_KEY)
.internal()
.longConf
.createWithDefault(256 * 1024 * 1024)
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("64MB")

// if not set, use COLUMNAR_MAX_BATCH_SIZE instead
val SHUFFLE_WRITER_BUFFER_SIZE =
Expand Down

0 comments on commit 2731284

Please sign in to comment.