Skip to content

Commit

Permalink
fix ut
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Sep 5, 2024
1 parent 1c0146a commit 2a1385c
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 4 deletions.
3 changes: 3 additions & 0 deletions cpp/velox/shuffle/VeloxSortShuffleWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() {
}

arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_t begin, size_t end) {
VELOX_CHECK(begin < end);
// Count copy row time into sortTime_.
Timer sortTime{};
// Serialize [begin, end)
Expand Down Expand Up @@ -303,6 +304,7 @@ arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_
RETURN_NOT_OK(evictPartition0(partitionId, 0, buffer + bytes, rawLength));
bytes += rawLength;
}
begin++;
sortTime.start();
} else {
// Copy small rows.
Expand All @@ -313,6 +315,7 @@ arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_
}
sortTime.stop();
if (offset > 0) {
VELOX_CHECK(index > begin);
RETURN_NOT_OK(evictPartition0(partitionId, index - begin, rawBuffer_, offset));
}
sortTime_ += sortTime.realTimeUsed();
Expand Down
2 changes: 1 addition & 1 deletion cpp/velox/tests/VeloxShuffleWriterTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ std::vector<ShuffleTestParams> createShuffleTestParams() {
std::vector<int32_t> mergeBufferSizes = {0, 3, 4, 10, 4096};

for (const auto& compression : compressions) {
for (const auto compressionBufferSize : {1, 56, 32 * 1024}) {
for (const auto compressionBufferSize : {4, 56, 32 * 1024}) {
for (auto useRadixSort : {true, false}) {
params.push_back(ShuffleTestParams{
.shuffleWriterType = ShuffleWriterType::kSortShuffle,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,13 @@ object GlutenShuffleUtils {

def getCompressionBufferSize(conf: SparkConf, codec: String): Int = {
if ("lz4" == codec) {
conf.get(IO_COMPRESSION_LZ4_BLOCKSIZE).toInt
Math.max(
conf.get(IO_COMPRESSION_LZ4_BLOCKSIZE).toInt,
GlutenConfig.GLUTEN_SHUFFLE_COMPRESSION_BUFFER_MIN_SIZE)
} else if ("zstd" == codec) {
conf.get(IO_COMPRESSION_ZSTD_BUFFERSIZE).toInt
Math.max(
conf.get(IO_COMPRESSION_ZSTD_BUFFERSIZE).toInt,
GlutenConfig.GLUTEN_SHUFFLE_COMPRESSION_BUFFER_MIN_SIZE)
} else {
32 * 1024
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -588,8 +588,8 @@ object GlutenConfig {

// Shuffle Writer buffer size.
val GLUTEN_SHUFFLE_WRITER_BUFFER_SIZE = "spark.gluten.shuffleWriter.bufferSize"

val GLUTEN_SHUFFLE_WRITER_MERGE_THRESHOLD = "spark.gluten.sql.columnar.shuffle.merge.threshold"
val GLUTEN_SHUFFLE_COMPRESSION_BUFFER_MIN_SIZE = 64

// Controls whether to load DLL from jars. User can get dependent native libs packed into a jar
// by executing dev/package.sh. Then, with that jar configured, Gluten can load the native libs
Expand Down

0 comments on commit 2a1385c

Please sign in to comment.