diff --git a/velox/core/QueryConfig.h b/velox/core/QueryConfig.h index 3910b5879466..b39df5397910 100644 --- a/velox/core/QueryConfig.h +++ b/velox/core/QueryConfig.h @@ -146,9 +146,20 @@ class QueryConfig { static constexpr const char* kAbandonPartialTopNRowNumberMinPct = "abandon_partial_topn_row_number_min_pct"; + /// The maximum number of bytes to buffer in PartitionedOutput operator to + /// avoid creating tiny SerializedPages. + /// + /// For PartitionedOutputNode::Kind::kPartitioned, PartitionedOutput operator + /// would buffer up to that number of bytes / number of destinations for each + /// destination before producing a SerializedPage. static constexpr const char* kMaxPartitionedOutputBufferSize = "max_page_partitioning_buffer_size"; + /// The maximum size in bytes for the task's buffered output. + /// + /// The producer Drivers are blocked when the buffered size exceeds + /// this. The Drivers are resumed when the buffered size goes below + /// OutputBufferManager::kContinuePct % of this. static constexpr const char* kMaxOutputBufferSize = "max_output_buffer_size"; /// Preferred size of batches in bytes to be returned by operators from @@ -203,7 +214,8 @@ class QueryConfig { static constexpr const char* kWindowSpillEnabled = "window_spill_enabled"; /// If true, the memory arbitrator will reclaim memory from table writer by - /// flushing its buffered data to disk. + /// flushing its buffered data to disk. only applies if "spill_enabled" flag + /// is set. static constexpr const char* kWriterSpillEnabled = "writer_spill_enabled"; /// RowNumber spilling flag, only applies if "spill_enabled" flag is set. @@ -260,21 +272,35 @@ class QueryConfig { static constexpr const char* kSpillFileCreateConfig = "spill_file_create_config"; - /// Default offset spill start partition bit. + /// Default offset spill start partition bit. It is used with + /// 'kJoinSpillPartitionBits' or 'kAggregationSpillPartitionBits' together to + /// calculate the spilling partition number for join spill or aggregation + /// spill. static constexpr const char* kSpillStartPartitionBit = "spiller_start_partition_bit"; - /// Default number of spill partition bits. + /// Default number of spill partition bits. It is the number of bits used to + /// calculate the spill partition number for hash join and RowNumber. The + /// number of spill partitions will be power of two. + /// + /// NOTE: as for now, we only support up to 8-way spill partitioning. static constexpr const char* kSpillNumPartitionBits = "spiller_num_partition_bits"; - /// !!! DEPRECATED: do not use. - static constexpr const char* kJoinSpillPartitionBits = - "join_spiller_partition_bits"; - + /// The minimal available spillable memory reservation in percentage of the + /// current memory usage. Suppose the current memory usage size of M, + /// available memory reservation size of N and min reservation percentage of + /// P, if M * P / 100 > N, then spiller operator needs to grow the memory + /// reservation with percentage of spillableReservationGrowthPct(). This + /// ensures we have sufficient amount of memory reservation to process the + /// large input outlier. static constexpr const char* kMinSpillableReservationPct = "min_spillable_reservation_pct"; + /// The spillable memory reservation growth percentage of the previous memory + /// reservation size. 10 means exponential growth along a series of integer + /// powers of 11/10. The reservation grows by this much until it no longer + /// can, after which it starts spilling. static constexpr const char* kSpillableReservationGrowthPct = "spillable_reservation_growth_pct"; @@ -287,7 +313,7 @@ class QueryConfig { static constexpr const char* kPrestoArrayAggIgnoreNulls = "presto.array_agg.ignore_nulls"; - // The default number of expected items for the bloomfilter. + /// The default number of expected items for the bloomfilter. static constexpr const char* kSparkBloomFilterExpectedNumItems = "spark.bloom_filter.expected_num_items"; @@ -341,10 +367,10 @@ class QueryConfig { static constexpr const char* kEnableExpressionEvaluationCache = "enable_expression_evaluation_cache"; - // For a given shared subexpression, the maximum distinct sets of inputs we - // cache results for. Lambdas can call the same expression with different - // inputs many times, causing the results we cache to explode in size. Putting - // a limit contains the memory usage. + /// For a given shared subexpression, the maximum distinct sets of inputs we + /// cache results for. Lambdas can call the same expression with different + /// inputs many times, causing the results we cache to explode in size. + /// Putting a limit contains the memory usage. static constexpr const char* kMaxSharedSubexprResultsCached = "max_shared_subexpr_results_cached"; @@ -474,22 +500,11 @@ class QueryConfig { return get(kMaxSpillBytes, kDefault); } - /// Returns the maximum number of bytes to buffer in PartitionedOutput - /// operator to avoid creating tiny SerializedPages. - /// - /// For PartitionedOutputNode::Kind::kPartitioned, PartitionedOutput operator - /// would buffer up to that number of bytes / number of destinations for each - /// destination before producing a SerializedPage. uint64_t maxPartitionedOutputBufferSize() const { static constexpr uint64_t kDefault = 32UL << 20; return get(kMaxPartitionedOutputBufferSize, kDefault); } - /// Returns the maximum size in bytes for the task's buffered output. - /// - /// The producer Drivers are blocked when the buffered size exceeds - /// this. The Drivers are resumed when the buffered size goes below - /// OutputBufferManager::kContinuePct % of this. uint64_t maxOutputBufferSize() const { static constexpr uint64_t kDefault = 32UL << 20; return get(kMaxOutputBufferSize, kDefault); @@ -580,49 +595,34 @@ class QueryConfig { return get(kExprEvalSimplified, false); } - /// Returns true if spilling is enabled. bool spillEnabled() const { return get(kSpillEnabled, false); } - /// Returns 'is aggregation spilling enabled' flag. Must also check the - /// spillEnabled()!g bool aggregationSpillEnabled() const { return get(kAggregationSpillEnabled, true); } - /// Returns 'is join spilling enabled' flag. Must also check the - /// spillEnabled()! bool joinSpillEnabled() const { return get(kJoinSpillEnabled, true); } - /// Returns 'is orderby spilling enabled' flag. Must also check the - /// spillEnabled()! bool orderBySpillEnabled() const { return get(kOrderBySpillEnabled, true); } - /// Returns true if spilling is enabled for Window operator. Must also - /// check the spillEnabled()! bool windowSpillEnabled() const { return get(kWindowSpillEnabled, true); } - /// Returns 'is writer spilling enabled' flag. Must also check the - /// spillEnabled()! bool writerSpillEnabled() const { return get(kWriterSpillEnabled, true); } - /// Returns true if spilling is enabled for RowNumber operator. Must also - /// check the spillEnabled()! bool rowNumberSpillEnabled() const { return get(kRowNumberSpillEnabled, true); } - /// Returns true if spilling is enabled for TopNRowNumber operator. Must also - /// check the spillEnabled()! bool topNRowNumberSpillEnabled() const { return get(kTopNRowNumberSpillEnabled, true); } @@ -631,32 +631,11 @@ class QueryConfig { return get(kMaxSpillLevel, 1); } - /// Returns the start partition bit which is used with - /// 'kJoinSpillPartitionBits' or 'kAggregationSpillPartitionBits' together to - /// calculate the spilling partition number for join spill or aggregation - /// spill. uint8_t spillStartPartitionBit() const { constexpr uint8_t kDefaultStartBit = 48; return get(kSpillStartPartitionBit, kDefaultStartBit); } - /// Returns the number of bits used to calculate the spill partition number - /// for hash join. The number of spill partitions will be power of two. - /// - /// NOTE: as for now, we only support up to 8-way spill partitioning. - /// - /// DEPRECATED. - uint8_t joinSpillPartitionBits() const { - constexpr uint8_t kDefaultBits = 3; - constexpr uint8_t kMaxBits = 3; - return std::min( - kMaxBits, get(kJoinSpillPartitionBits, kDefaultBits)); - } - - /// Returns the number of bits used to calculate the spill partition number - /// for hash join and RowNumber. The number of spill partitions will be power - /// of two. - /// NOTE: as for now, we only support up to 8-way spill partitioning. uint8_t spillNumPartitionBits() const { constexpr uint8_t kDefaultBits = 3; constexpr uint8_t kMaxBits = 3; @@ -691,28 +670,16 @@ class QueryConfig { return get(kSpillFileCreateConfig, ""); } - /// Returns the minimal available spillable memory reservation in percentage - /// of the current memory usage. Suppose the current memory usage size of M, - /// available memory reservation size of N and min reservation percentage of - /// P, if M * P / 100 > N, then spiller operator needs to grow the memory - /// reservation with percentage of spillableReservationGrowthPct(). This - /// ensures we have sufficient amount of memory reservation to process the - /// large input outlier. int32_t minSpillableReservationPct() const { constexpr int32_t kDefaultPct = 5; return get(kMinSpillableReservationPct, kDefaultPct); } - /// Returns the spillable memory reservation growth percentage of the previous - /// memory reservation size. 10 means exponential growth along a series of - /// integer powers of 11/10. The reservation grows by this much until it no - /// longer can, after which it starts spilling. int32_t spillableReservationGrowthPct() const { constexpr int32_t kDefaultPct = 10; return get(kSpillableReservationGrowthPct, kDefaultPct); } - /// Returns true if query tracing is enabled. bool queryTraceEnabled() const { return get(kQueryTraceEnabled, false); }