Skip to content

Commit

Permalink
feat: Allow fixed random seed in approx_percentile for debug purpose (f…
Browse files Browse the repository at this point in the history
…acebookincubator#11697)

Summary:
Pull Request resolved: facebookincubator#11697

Also reduce the memory usage by removing redundant information from
accumulator.

Reviewed By: amitkdutta

Differential Revision: D66602894

fbshipit-source-id: a55d75c0d0e48c4c47a53c555d7da3fab07a0b9e
  • Loading branch information
Yuhta authored and facebook-github-bot committed Dec 2, 2024
1 parent 036ea7d commit 480f989
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 40 deletions.
13 changes: 13 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,14 @@ class QueryConfig {
static constexpr const char* kDebugDisableExpressionWithLazyInputs =
"debug_disable_expression_with_lazy_inputs";

/// Fix the random seed used to create data structure used in
/// approx_percentile. This makes the query result deterministic on single
/// node; multi-node partial aggregation is still subject to non-determinism
/// due to non-deterministic merge order.
static constexpr const char*
kDebugAggregationApproxPercentileFixedRandomSeed =
"debug_aggregation_approx_percentile_fixed_random_seed";

/// Temporary flag to control whether selective Nimble reader should be used
/// in this query or not. Will be removed after the selective Nimble reader
/// is fully rolled out.
Expand All @@ -449,6 +457,11 @@ class QueryConfig {
return get<bool>(kDebugDisableExpressionWithLazyInputs, false);
}

std::optional<uint32_t> debugAggregationApproxPercentileFixedRandomSeed()
const {
return get<uint32_t>(kDebugAggregationApproxPercentileFixedRandomSeed);
}

uint64_t queryMaxMemoryPerNode() const {
return config::toCapacity(
get<std::string>(kQueryMaxMemoryPerNode, "0B"),
Expand Down
4 changes: 4 additions & 0 deletions velox/functions/lib/KllSketch.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ struct KllSketch {
return n_;
}

uint32_t k() const {
return k_;
}

/// Calculate the size needed for serialization.
size_t serializedByteSize() const;

Expand Down
85 changes: 53 additions & 32 deletions velox/functions/prestosql/aggregates/ApproxPercentileAggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,28 +71,36 @@ using KllSketch = typename KllSketchTypeTraits<T, Allocator>::KllSketchType;
template <typename T>
using KllView = functions::kll::detail::View<T>;

unsigned getRandomSeed(std::optional<uint32_t> fixedRandomSeed) {
return fixedRandomSeed.has_value() ? *fixedRandomSeed : random::getSeed();
}

// Accumulator to buffer large count values in addition to the KLL
// sketch itself.
template <typename T>
struct KllSketchAccumulator {
explicit KllSketchAccumulator(HashStringAllocator* allocator)
: allocator_(allocator),
sketch_(
explicit KllSketchAccumulator(
HashStringAllocator* allocator,
std::optional<uint32_t> fixedRandomSeed)
: sketch_(
functions::kll::kDefaultK,
StlAllocator<T>(allocator),
random::getSeed()),
getRandomSeed(fixedRandomSeed)),
largeCountValues_(StlAllocator<std::pair<T, int64_t>>(allocator)) {}

void setAccuracy(double value) {
k_ = functions::kll::kFromEpsilon(value);
sketch_.setK(k_);
sketch_.setK(functions::kll::kFromEpsilon(value));
}

void append(T value) {
sketch_.insert(value);
}

void append(T value, int64_t count) {
void append(
T value,
int64_t count,
HashStringAllocator* allocator,
std::optional<uint32_t> fixedRandomSeed) {
constexpr size_t kMaxBufferSize = 4096;
constexpr int64_t kMinCountToBuffer = 512;
if (count < kMinCountToBuffer) {
Expand All @@ -102,7 +110,7 @@ struct KllSketchAccumulator {
} else {
largeCountValues_.emplace_back(value, count);
if (largeCountValues_.size() >= kMaxBufferSize) {
flush();
flush(allocator, fixedRandomSeed);
}
}
}
Expand All @@ -121,12 +129,16 @@ struct KllSketchAccumulator {
// during spilling which may run in parallel. HashStringAllocator is not
// thread safe, so merging into/compacting the original KllSketch which
// depends on it can lead to concurrency bugs.
KllSketch<T, std::allocator<T>> compact() const {
KllSketch<T, std::allocator<T>> compact(
std::optional<uint32_t> fixedRandomSeed) const {
KllSketch<T, std::allocator<T>> newSketch =
KllSketch<T, std::allocator<T>>::fromView(
sketch_.toView(), std::allocator<T>(), random::getSeed());
sketch_.toView(),
std::allocator<T>(),
getRandomSeed(fixedRandomSeed));

mergeLargeCountValuesIntoSketch(std::allocator<T>(), newSketch);
mergeLargeCountValuesIntoSketch(
std::allocator<T>(), newSketch, fixedRandomSeed);

newSketch.compact();

Expand All @@ -139,8 +151,11 @@ struct KllSketchAccumulator {

// This must be called before the KllSketch can be used for estimateQuantile()
// or estimateQuantiles().
void flush() {
mergeLargeCountValuesIntoSketch(StlAllocator<T>(allocator_), sketch_);
void flush(
HashStringAllocator* allocator,
std::optional<uint32_t> fixedRandomSeed) {
mergeLargeCountValuesIntoSketch(
StlAllocator<T>(allocator), sketch_, fixedRandomSeed);
largeCountValues_.clear();

sketch_.finish();
Expand All @@ -150,21 +165,20 @@ struct KllSketchAccumulator {
template <typename Allocator, typename Compare>
void mergeLargeCountValuesIntoSketch(
const Allocator& allocator,
functions::kll::KllSketch<T, Allocator, Compare>& sketch) const {
functions::kll::KllSketch<T, Allocator, Compare>& sketch,
std::optional<uint32_t> fixedRandomSeed) const {
if (!largeCountValues_.empty()) {
std::vector<functions::kll::KllSketch<T, Allocator, Compare>> sketches;
sketches.reserve(largeCountValues_.size());
for (auto [x, n] : largeCountValues_) {
sketches.push_back(
functions::kll::KllSketch<T, Allocator, Compare>::fromRepeatedValue(
x, n, k_, allocator, random::getSeed()));
x, n, sketch_.k(), allocator, getRandomSeed(fixedRandomSeed)));
}
sketch.merge(folly::Range(sketches.begin(), sketches.end()));
}
}

uint16_t k_;
HashStringAllocator* allocator_;
KllSketch<T> sketch_;
std::vector<std::pair<T, int64_t>, StlAllocator<std::pair<T, int64_t>>>
largeCountValues_;
Expand All @@ -186,10 +200,12 @@ class ApproxPercentileAggregate : public exec::Aggregate {
ApproxPercentileAggregate(
bool hasWeight,
bool hasAccuracy,
const TypePtr& resultType)
const TypePtr& resultType,
std::optional<uint32_t> fixedRandomSeed)
: exec::Aggregate(resultType),
hasWeight_{hasWeight},
hasAccuracy_(hasAccuracy) {}
hasAccuracy_(hasAccuracy),
fixedRandomSeed_(fixedRandomSeed) {}

int32_t accumulatorFixedWidthSize() const override {
return sizeof(KllSketchAccumulator<T>);
Expand All @@ -202,7 +218,8 @@ class ApproxPercentileAggregate : public exec::Aggregate {
void extractValues(char** groups, int32_t numGroups, VectorPtr* result)
override {
for (auto i = 0; i < numGroups; ++i) {
value<KllSketchAccumulator<T>>(groups[i])->flush();
value<KllSketchAccumulator<T>>(groups[i])->flush(
allocator_, fixedRandomSeed_);
}

VELOX_CHECK(result);
Expand Down Expand Up @@ -262,7 +279,8 @@ class ApproxPercentileAggregate : public exec::Aggregate {
std::vector<KllSketch<T, std::allocator<T>>> sketches;
sketches.reserve(numGroups);
for (auto i = 0; i < numGroups; ++i) {
sketches.push_back(value<KllSketchAccumulator<T>>(groups[i])->compact());
sketches.push_back(
value<KllSketchAccumulator<T>>(groups[i])->compact(fixedRandomSeed_));
}

VELOX_CHECK(result);
Expand Down Expand Up @@ -385,7 +403,7 @@ class ApproxPercentileAggregate : public exec::Aggregate {
auto value = decodedValue_.valueAt<T>(row);
auto weight = decodedWeight_.valueAt<int64_t>(row);
checkWeight(weight);
accumulator->append(value, weight);
accumulator->append(value, weight, allocator_, fixedRandomSeed_);
});
} else {
if (decodedValue_.mayHaveNulls()) {
Expand Down Expand Up @@ -433,7 +451,7 @@ class ApproxPercentileAggregate : public exec::Aggregate {
auto value = decodedValue_.valueAt<T>(row);
auto weight = decodedWeight_.valueAt<int64_t>(row);
checkWeight(weight);
accumulator->append(value, weight);
accumulator->append(value, weight, allocator_, fixedRandomSeed_);
});
} else {
if (decodedValue_.mayHaveNulls()) {
Expand Down Expand Up @@ -467,7 +485,8 @@ class ApproxPercentileAggregate : public exec::Aggregate {
exec::Aggregate::setAllNulls(groups, indices);
for (auto i : indices) {
auto group = groups[i];
new (group + offset_) KllSketchAccumulator<T>(allocator_);
new (group + offset_)
KllSketchAccumulator<T>(allocator_, fixedRandomSeed_);
}
}

Expand Down Expand Up @@ -666,6 +685,7 @@ class ApproxPercentileAggregate : public exec::Aggregate {
static constexpr double kMissingNormalizedValue = -1;
const bool hasWeight_;
const bool hasAccuracy_;
const std::optional<uint32_t> fixedRandomSeed_;
std::optional<Percentiles> percentiles_;
double accuracy_{kMissingNormalizedValue};
DecodedVector decodedValue_;
Expand Down Expand Up @@ -885,12 +905,13 @@ void registerApproxPercentileAggregate(
core::AggregationNode::Step step,
const std::vector<TypePtr>& argTypes,
const TypePtr& resultType,
const core::QueryConfig& /*config*/)
-> std::unique_ptr<exec::Aggregate> {
const core::QueryConfig& config) -> std::unique_ptr<exec::Aggregate> {
auto isRawInput = exec::isRawInput(step);
auto hasWeight =
argTypes.size() >= 2 && argTypes[1]->kind() == TypeKind::BIGINT;
bool hasAccuracy = argTypes.size() == (hasWeight ? 4 : 3);
auto fixedRandomSeed =
config.debugAggregationApproxPercentileFixedRandomSeed();

if (isRawInput) {
VELOX_USER_CHECK_EQ(
Expand Down Expand Up @@ -943,22 +964,22 @@ void registerApproxPercentileAggregate(
switch (type->kind()) {
case TypeKind::TINYINT:
return std::make_unique<ApproxPercentileAggregate<int8_t>>(
hasWeight, hasAccuracy, resultType);
hasWeight, hasAccuracy, resultType, fixedRandomSeed);
case TypeKind::SMALLINT:
return std::make_unique<ApproxPercentileAggregate<int16_t>>(
hasWeight, hasAccuracy, resultType);
hasWeight, hasAccuracy, resultType, fixedRandomSeed);
case TypeKind::INTEGER:
return std::make_unique<ApproxPercentileAggregate<int32_t>>(
hasWeight, hasAccuracy, resultType);
hasWeight, hasAccuracy, resultType, fixedRandomSeed);
case TypeKind::BIGINT:
return std::make_unique<ApproxPercentileAggregate<int64_t>>(
hasWeight, hasAccuracy, resultType);
hasWeight, hasAccuracy, resultType, fixedRandomSeed);
case TypeKind::REAL:
return std::make_unique<ApproxPercentileAggregate<float>>(
hasWeight, hasAccuracy, resultType);
hasWeight, hasAccuracy, resultType, fixedRandomSeed);
case TypeKind::DOUBLE:
return std::make_unique<ApproxPercentileAggregate<double>>(
hasWeight, hasAccuracy, resultType);
hasWeight, hasAccuracy, resultType, fixedRandomSeed);
default:
VELOX_USER_FAIL(
"Unsupported input type for {} aggregation {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ class ApproxPercentileTest : public AggregationTestBase {
void SetUp() override {
AggregationTestBase::SetUp();
random::setSeed(0);
queryConfig_
[core::QueryConfig::kDebugAggregationApproxPercentileFixedRandomSeed] =
"0";
}

template <typename T>
Expand Down Expand Up @@ -142,12 +145,14 @@ class ApproxPercentileTest : public AggregationTestBase {
{rows},
{},
{functionCall(false, weights.get(), percentile, accuracy, -1)},
expected);
expected,
queryConfig_);
testAggregations(
{rows},
{},
{functionCall(false, weights.get(), percentile, accuracy, 3)},
expectedArray);
expectedArray,
queryConfig_);

// Companion functions of approx_percentile do not support test streaming
// because intermediate results are KLL that has non-deterministic shape.
Expand All @@ -159,15 +164,17 @@ class ApproxPercentileTest : public AggregationTestBase {
{functionCall(false, weights.get(), percentile, accuracy, -1)},
{getArgTypes(values->type(), weights.get(), accuracy, -1)},
{},
expected);
expected,
queryConfig_);
testAggregationsWithCompanion(
{rows},
[](auto& /*builder*/) {},
{},
{functionCall(false, weights.get(), percentile, accuracy, 3)},
{getArgTypes(values->type(), weights.get(), accuracy, 3)},
{},
expectedArray);
expectedArray,
queryConfig_);
}

template <typename T>
Expand Down Expand Up @@ -208,7 +215,8 @@ class ApproxPercentileTest : public AggregationTestBase {
{rows},
{"c0"},
{functionCall(true, weights.get(), percentile, accuracy, -1)},
{expectedResult});
{expectedResult},
queryConfig_);

// Companion functions of approx_percentile do not support test streaming
// because intermediate results are KLL that has non-deterministic shape.
Expand All @@ -220,7 +228,8 @@ class ApproxPercentileTest : public AggregationTestBase {
{functionCall(true, weights.get(), percentile, accuracy, -1)},
{getArgTypes(values->type(), weights.get(), accuracy, -1)},
{},
{expectedResult});
{expectedResult},
queryConfig_);

{
SCOPED_TRACE("Percentile array");
Expand Down Expand Up @@ -264,7 +273,8 @@ class ApproxPercentileTest : public AggregationTestBase {
{rows},
{"c0"},
{functionCall(true, weights.get(), percentile, accuracy, 3)},
{expected});
{expected},
queryConfig_);

// Companion functions of approx_percentile do not support test streaming
// because intermediate results are KLL that has non-deterministic shape.
Expand All @@ -276,9 +286,13 @@ class ApproxPercentileTest : public AggregationTestBase {
{functionCall(true, weights.get(), percentile, accuracy, 3)},
{getArgTypes(values->type(), weights.get(), accuracy, 3)},
{},
{expected});
{expected},
queryConfig_);
}
}

private:
std::unordered_map<std::string, std::string> queryConfig_;
};

TEST_F(ApproxPercentileTest, globalAgg) {
Expand Down

0 comments on commit 480f989

Please sign in to comment.