Skip to content

Commit

Permalink
Spill prefix sort related code cleanup plus test improvement (#11508)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #11508

The followup is to add session property at Prestissimo to allow configure
in Presto

Reviewed By: tanjialiang

Differential Revision: D65791663

fbshipit-source-id: 59c37c543aa763030968fe9dea94e6c6ad820175
  • Loading branch information
xiaoxmeng authored and facebook-github-bot committed Nov 12, 2024
1 parent afa6572 commit d7c9a50
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 22 deletions.
12 changes: 6 additions & 6 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,10 @@ class QueryConfig {
"spill_compression_codec";

/// Enable the prefix sort or fallback to timsort in spill. The prefix sort is
/// faster than timsort but requires the memory to build prefix data, which
/// may cause out of memory.
static constexpr const char* kSpillEnablePrefixSort =
"spill_enable_prefix_sort";
/// faster than std::sort but requires the memory to build normalized prefix
/// keys, which might have potential risk of running out of server memory.
static constexpr const char* kSpillPrefixSortEnabled =
"spill_prefixsort_enabled";

/// Specifies spill write buffer size in bytes. The spiller tries to buffer
/// serialized spill data up to the specified size before write to storage
Expand Down Expand Up @@ -641,8 +641,8 @@ class QueryConfig {
return get<std::string>(kSpillCompressionKind, "none");
}

bool spillEnablePrefixSort() const {
return get<bool>(kSpillEnablePrefixSort, false);
bool spillPrefixSortEnabled() const {
return get<bool>(kSpillPrefixSortEnabled, false);
}

uint64_t spillWriteBufferSize() const {
Expand Down
6 changes: 3 additions & 3 deletions velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -349,11 +349,11 @@ Spilling
- Specifies the compression algorithm type to compress the spilled data before write to disk to trade CPU for IO
efficiency. The supported compression codecs are: ZLIB, SNAPPY, LZO, ZSTD, LZ4 and GZIP.
NONE means no compression.
* - spill_enable_prefix_sort
* - spill_prefixsort_enabled
- bool
- false
- Enable the prefix sort or fallback to timsort in spill. The prefix sort is faster than timsort but requires the
memory to build prefix data, which might have potential risk of running out of server memory.
- Enable the prefix sort or fallback to timsort in spill. The prefix sort is faster than std::sort but requires the
memory to build normalized prefix keys, which might have potential risk of running out of server memory.
* - spiller_start_partition_bit
- integer
- 29
Expand Down
2 changes: 1 addition & 1 deletion velox/exec/Driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ std::optional<common::SpillConfig> DriverCtx::makeSpillConfig(
queryConfig.maxSpillRunRows(),
queryConfig.writerFlushThresholdBytes(),
queryConfig.spillCompressionKind(),
queryConfig.spillEnablePrefixSort()
queryConfig.spillPrefixSortEnabled()
? std::optional<common::PrefixSortConfig>(prefixSortConfig())
: std::nullopt,
queryConfig.spillFileCreateConfig());
Expand Down
59 changes: 47 additions & 12 deletions velox/exec/tests/SpillTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,30 @@ class TestRuntimeStatWriter : public BaseRuntimeStatWriter {
} // namespace

struct TestParam {
common::CompressionKind compressionKind;
bool enablePrefixSort;
const common::CompressionKind compressionKind;
const bool enablePrefixSort;

TestParam(common::CompressionKind _compressionKind, bool _enablePrefixSort)
: compressionKind(_compressionKind),
enablePrefixSort(_enablePrefixSort) {}

TestParam(uint32_t value)
: compressionKind(static_cast<common::CompressionKind>(value >> 1)),
enablePrefixSort(!!(value & 1)) {}

uint32_t value() const {
return static_cast<uint32_t>(compressionKind) << 1 | enablePrefixSort;
}

std::string toString() const {
return fmt::format(
"compressionKind: {}, enablePrefixSort: {}",
compressionKind,
enablePrefixSort);
}
};

class SpillTest : public ::testing::TestWithParam<common::CompressionKind>,
class SpillTest : public ::testing::TestWithParam<uint32_t>,
public facebook::velox::test::VectorTestBase {
public:
explicit SpillTest()
Expand All @@ -75,13 +90,33 @@ class SpillTest : public ::testing::TestWithParam<common::CompressionKind>,
setThreadLocalRunTimeStatWriter(nullptr);
}

static std::vector<common::CompressionKind> getTestParams() {
std::vector<common::CompressionKind> testParams;
testParams.emplace_back(common::CompressionKind::CompressionKind_NONE);
testParams.emplace_back(common::CompressionKind::CompressionKind_ZLIB);
testParams.emplace_back(common::CompressionKind::CompressionKind_SNAPPY);
testParams.emplace_back(common::CompressionKind::CompressionKind_ZSTD);
testParams.emplace_back(common::CompressionKind::CompressionKind_LZ4);
static std::vector<uint32_t> getTestParams() {
std::vector<uint32_t> testParams;
testParams.emplace_back(
TestParam{common::CompressionKind::CompressionKind_NONE, false}
.value());
testParams.emplace_back(
TestParam{common::CompressionKind::CompressionKind_ZLIB, false}
.value());
testParams.emplace_back(
TestParam{common::CompressionKind::CompressionKind_SNAPPY, false}
.value());
testParams.emplace_back(
TestParam{common::CompressionKind::CompressionKind_ZSTD, false}
.value());
testParams.emplace_back(
TestParam{common::CompressionKind::CompressionKind_LZ4, false}.value());
testParams.emplace_back(
TestParam{common::CompressionKind::CompressionKind_NONE, true}.value());
testParams.emplace_back(
TestParam{common::CompressionKind::CompressionKind_ZLIB, true}.value());
testParams.emplace_back(
TestParam{common::CompressionKind::CompressionKind_SNAPPY, true}
.value());
testParams.emplace_back(
TestParam{common::CompressionKind::CompressionKind_ZSTD, true}.value());
testParams.emplace_back(
TestParam{common::CompressionKind::CompressionKind_LZ4, true}.value());
return testParams;
}

Expand All @@ -103,8 +138,8 @@ class SpillTest : public ::testing::TestWithParam<common::CompressionKind>,
tempDir_ = exec::test::TempDirectoryPath::create();
filesystems::registerLocalFileSystem();
rng_.seed(1);
compressionKind_ = GetParam();
enablePrefixSort_ = true;
compressionKind_ = TestParam{GetParam()}.compressionKind;
enablePrefixSort_ = TestParam{GetParam()}.enablePrefixSort;
}

uint8_t randPartitionBitOffset() {
Expand Down

0 comments on commit d7c9a50

Please sign in to comment.