Skip to content

Commit

Permalink
QueryConfig clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang committed Nov 2, 2024
1 parent 6c7bcd7 commit 894efb0
Showing 1 changed file with 38 additions and 71 deletions.
109 changes: 38 additions & 71 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,20 @@ class QueryConfig {
static constexpr const char* kAbandonPartialTopNRowNumberMinPct =
"abandon_partial_topn_row_number_min_pct";

/// The maximum number of bytes to buffer in PartitionedOutput operator to
/// avoid creating tiny SerializedPages.
///
/// For PartitionedOutputNode::Kind::kPartitioned, PartitionedOutput operator
/// would buffer up to that number of bytes / number of destinations for each
/// destination before producing a SerializedPage.
static constexpr const char* kMaxPartitionedOutputBufferSize =
"max_page_partitioning_buffer_size";

/// The maximum size in bytes for the task's buffered output.
///
/// The producer Drivers are blocked when the buffered size exceeds
/// this. The Drivers are resumed when the buffered size goes below
/// OutputBufferManager::kContinuePct % of this.
static constexpr const char* kMaxOutputBufferSize = "max_output_buffer_size";

/// Preferred size of batches in bytes to be returned by operators from
Expand Down Expand Up @@ -203,7 +214,8 @@ class QueryConfig {
static constexpr const char* kWindowSpillEnabled = "window_spill_enabled";

/// If true, the memory arbitrator will reclaim memory from table writer by
/// flushing its buffered data to disk.
/// flushing its buffered data to disk. only applies if "spill_enabled" flag
/// is set.
static constexpr const char* kWriterSpillEnabled = "writer_spill_enabled";

/// RowNumber spilling flag, only applies if "spill_enabled" flag is set.
Expand Down Expand Up @@ -260,21 +272,35 @@ class QueryConfig {
static constexpr const char* kSpillFileCreateConfig =
"spill_file_create_config";

/// Default offset spill start partition bit.
/// Default offset spill start partition bit. It is used with
/// 'kJoinSpillPartitionBits' or 'kAggregationSpillPartitionBits' together to
/// calculate the spilling partition number for join spill or aggregation
/// spill.
static constexpr const char* kSpillStartPartitionBit =
"spiller_start_partition_bit";

/// Default number of spill partition bits.
/// Default number of spill partition bits. It is the number of bits used to
/// calculate the spill partition number for hash join and RowNumber. The
/// number of spill partitions will be power of two.
///
/// NOTE: as for now, we only support up to 8-way spill partitioning.
static constexpr const char* kSpillNumPartitionBits =
"spiller_num_partition_bits";

/// !!! DEPRECATED: do not use.
static constexpr const char* kJoinSpillPartitionBits =
"join_spiller_partition_bits";

/// The minimal available spillable memory reservation in percentage of the
/// current memory usage. Suppose the current memory usage size of M,
/// available memory reservation size of N and min reservation percentage of
/// P, if M * P / 100 > N, then spiller operator needs to grow the memory
/// reservation with percentage of spillableReservationGrowthPct(). This
/// ensures we have sufficient amount of memory reservation to process the
/// large input outlier.
static constexpr const char* kMinSpillableReservationPct =
"min_spillable_reservation_pct";

/// The spillable memory reservation growth percentage of the previous memory
/// reservation size. 10 means exponential growth along a series of integer
/// powers of 11/10. The reservation grows by this much until it no longer
/// can, after which it starts spilling.
static constexpr const char* kSpillableReservationGrowthPct =
"spillable_reservation_growth_pct";

Expand All @@ -287,7 +313,7 @@ class QueryConfig {
static constexpr const char* kPrestoArrayAggIgnoreNulls =
"presto.array_agg.ignore_nulls";

// The default number of expected items for the bloomfilter.
/// The default number of expected items for the bloomfilter.
static constexpr const char* kSparkBloomFilterExpectedNumItems =
"spark.bloom_filter.expected_num_items";

Expand Down Expand Up @@ -341,10 +367,10 @@ class QueryConfig {
static constexpr const char* kEnableExpressionEvaluationCache =
"enable_expression_evaluation_cache";

// For a given shared subexpression, the maximum distinct sets of inputs we
// cache results for. Lambdas can call the same expression with different
// inputs many times, causing the results we cache to explode in size. Putting
// a limit contains the memory usage.
/// For a given shared subexpression, the maximum distinct sets of inputs we
/// cache results for. Lambdas can call the same expression with different
/// inputs many times, causing the results we cache to explode in size.
/// Putting a limit contains the memory usage.
static constexpr const char* kMaxSharedSubexprResultsCached =
"max_shared_subexpr_results_cached";

Expand Down Expand Up @@ -474,22 +500,11 @@ class QueryConfig {
return get<uint64_t>(kMaxSpillBytes, kDefault);
}

/// Returns the maximum number of bytes to buffer in PartitionedOutput
/// operator to avoid creating tiny SerializedPages.
///
/// For PartitionedOutputNode::Kind::kPartitioned, PartitionedOutput operator
/// would buffer up to that number of bytes / number of destinations for each
/// destination before producing a SerializedPage.
uint64_t maxPartitionedOutputBufferSize() const {
static constexpr uint64_t kDefault = 32UL << 20;
return get<uint64_t>(kMaxPartitionedOutputBufferSize, kDefault);
}

/// Returns the maximum size in bytes for the task's buffered output.
///
/// The producer Drivers are blocked when the buffered size exceeds
/// this. The Drivers are resumed when the buffered size goes below
/// OutputBufferManager::kContinuePct % of this.
uint64_t maxOutputBufferSize() const {
static constexpr uint64_t kDefault = 32UL << 20;
return get<uint64_t>(kMaxOutputBufferSize, kDefault);
Expand Down Expand Up @@ -580,49 +595,34 @@ class QueryConfig {
return get<bool>(kExprEvalSimplified, false);
}

/// Returns true if spilling is enabled.
bool spillEnabled() const {
return get<bool>(kSpillEnabled, false);
}

/// Returns 'is aggregation spilling enabled' flag. Must also check the
/// spillEnabled()!g
bool aggregationSpillEnabled() const {
return get<bool>(kAggregationSpillEnabled, true);
}

/// Returns 'is join spilling enabled' flag. Must also check the
/// spillEnabled()!
bool joinSpillEnabled() const {
return get<bool>(kJoinSpillEnabled, true);
}

/// Returns 'is orderby spilling enabled' flag. Must also check the
/// spillEnabled()!
bool orderBySpillEnabled() const {
return get<bool>(kOrderBySpillEnabled, true);
}

/// Returns true if spilling is enabled for Window operator. Must also
/// check the spillEnabled()!
bool windowSpillEnabled() const {
return get<bool>(kWindowSpillEnabled, true);
}

/// Returns 'is writer spilling enabled' flag. Must also check the
/// spillEnabled()!
bool writerSpillEnabled() const {
return get<bool>(kWriterSpillEnabled, true);
}

/// Returns true if spilling is enabled for RowNumber operator. Must also
/// check the spillEnabled()!
bool rowNumberSpillEnabled() const {
return get<bool>(kRowNumberSpillEnabled, true);
}

/// Returns true if spilling is enabled for TopNRowNumber operator. Must also
/// check the spillEnabled()!
bool topNRowNumberSpillEnabled() const {
return get<bool>(kTopNRowNumberSpillEnabled, true);
}
Expand All @@ -631,32 +631,11 @@ class QueryConfig {
return get<int32_t>(kMaxSpillLevel, 1);
}

/// Returns the start partition bit which is used with
/// 'kJoinSpillPartitionBits' or 'kAggregationSpillPartitionBits' together to
/// calculate the spilling partition number for join spill or aggregation
/// spill.
uint8_t spillStartPartitionBit() const {
constexpr uint8_t kDefaultStartBit = 48;
return get<uint8_t>(kSpillStartPartitionBit, kDefaultStartBit);
}

/// Returns the number of bits used to calculate the spill partition number
/// for hash join. The number of spill partitions will be power of two.
///
/// NOTE: as for now, we only support up to 8-way spill partitioning.
///
/// DEPRECATED.
uint8_t joinSpillPartitionBits() const {
constexpr uint8_t kDefaultBits = 3;
constexpr uint8_t kMaxBits = 3;
return std::min(
kMaxBits, get<uint8_t>(kJoinSpillPartitionBits, kDefaultBits));
}

/// Returns the number of bits used to calculate the spill partition number
/// for hash join and RowNumber. The number of spill partitions will be power
/// of two.
/// NOTE: as for now, we only support up to 8-way spill partitioning.
uint8_t spillNumPartitionBits() const {
constexpr uint8_t kDefaultBits = 3;
constexpr uint8_t kMaxBits = 3;
Expand Down Expand Up @@ -691,28 +670,16 @@ class QueryConfig {
return get<std::string>(kSpillFileCreateConfig, "");
}

/// Returns the minimal available spillable memory reservation in percentage
/// of the current memory usage. Suppose the current memory usage size of M,
/// available memory reservation size of N and min reservation percentage of
/// P, if M * P / 100 > N, then spiller operator needs to grow the memory
/// reservation with percentage of spillableReservationGrowthPct(). This
/// ensures we have sufficient amount of memory reservation to process the
/// large input outlier.
int32_t minSpillableReservationPct() const {
constexpr int32_t kDefaultPct = 5;
return get<int32_t>(kMinSpillableReservationPct, kDefaultPct);
}

/// Returns the spillable memory reservation growth percentage of the previous
/// memory reservation size. 10 means exponential growth along a series of
/// integer powers of 11/10. The reservation grows by this much until it no
/// longer can, after which it starts spilling.
int32_t spillableReservationGrowthPct() const {
constexpr int32_t kDefaultPct = 10;
return get<int32_t>(kSpillableReservationGrowthPct, kDefaultPct);
}

/// Returns true if query tracing is enabled.
bool queryTraceEnabled() const {
return get<bool>(kQueryTraceEnabled, false);
}
Expand Down

0 comments on commit 894efb0

Please sign in to comment.