diff --git a/velox/common/base/tests/StatsReporterTest.cpp b/velox/common/base/tests/StatsReporterTest.cpp index bba41daf0c43..10a2df6165fa 100644 --- a/velox/common/base/tests/StatsReporterTest.cpp +++ b/velox/common/base/tests/StatsReporterTest.cpp @@ -324,6 +324,10 @@ class TestMemoryPool : public memory::MemoryPool { return 0; } + int64_t releasableReservation() const override { + return 0; + } + int64_t reservedBytes() const override { return 0; } diff --git a/velox/common/memory/MemoryPool.cpp b/velox/common/memory/MemoryPool.cpp index 186b16a380ee..8c4d3504522b 100644 --- a/velox/common/memory/MemoryPool.cpp +++ b/velox/common/memory/MemoryPool.cpp @@ -703,6 +703,23 @@ int64_t MemoryPoolImpl::usedBytes() const { return usedBytes; } +int64_t MemoryPoolImpl::releasableReservation() const { + if (isLeaf()) { + std::lock_guard l(mutex_); + return std::max( + 0, reservationBytes_ - quantizedSize(usedReservationBytes_)); + } + if (reservedBytes() == 0) { + return 0; + } + int64_t releasableBytes{0}; + visitChildren([&](MemoryPool* pool) { + releasableBytes += pool->releasableReservation(); + return true; + }); + return releasableBytes; +} + std::shared_ptr MemoryPoolImpl::genChild( std::shared_ptr parent, const std::string& name, diff --git a/velox/common/memory/MemoryPool.h b/velox/common/memory/MemoryPool.h index ce087c7616a9..b37cc4816399 100644 --- a/velox/common/memory/MemoryPool.h +++ b/velox/common/memory/MemoryPool.h @@ -208,11 +208,11 @@ class MemoryPool : public std::enable_shared_from_this { return threadSafe_; } - /// Invoked to traverse the memory pool subtree rooted at this, and calls - /// 'visitor' on each visited child memory pool with the parent pool's - /// 'poolMutex_' reader lock held. The 'visitor' must not access the - /// parent memory pool to avoid the potential recursive locking issues. Note - /// that the traversal stops if 'visitor' returns false. + /// Invoked to visit the memory pool's direct children, and calls 'visitor' on + /// each visited child memory pool with the parent pool's 'poolMutex_' reader + /// lock held. The 'visitor' must not access the parent memory pool to avoid + /// the potential recursive locking issues. Note that the traversal stops if + /// 'visitor' returns false. virtual void visitChildren( const std::function& visitor) const; @@ -328,15 +328,22 @@ class MemoryPool : public std::enable_shared_from_this { /// Returns the peak memory usage in bytes of this memory pool. virtual int64_t peakBytes() const = 0; - /// Returns the reserved but not used memory reservation in bytes of this - /// memory pool. + /// Returns the reserved but not used memory in bytes of this memory pool. /// - /// NOTE: this is always zero for non-leaf memory pool as it only aggregate + /// NOTE: this is always zero for non-leaf memory pool as it only aggregates /// the memory reservations from its child memory pools but not /// differentiating whether the aggregated reservations have been actually /// used in child pools or not. virtual int64_t availableReservation() const = 0; + /// Returns the reserved but not used memory in bytes that can be released by + /// calling 'release()'. This might be different from 'availableReservation()' + /// because leaf memory pool makes quantized memory reservation. + /// + /// NOTE: For non-leaf memory pool, it returns the aggregated releasable + /// memory reservations from all its leaf memory pool. + virtual int64_t releasableReservation() const = 0; + /// Returns the reserved memory reservation in bytes including both used and /// unused reservations. virtual int64_t reservedBytes() const = 0; @@ -632,6 +639,8 @@ class MemoryPoolImpl : public MemoryPool { return availableReservationLocked(); } + int64_t releasableReservation() const override; + int64_t reservedBytes() const override { return reservationBytes_; } @@ -1002,7 +1011,7 @@ class MemoryPoolImpl : public MemoryPool { // name matches the specified regular expression 'debugPoolNameRegex_'. const std::string debugPoolNameRegex_; - // Serializes updates on 'grantedReservationBytes_', 'usedReservationBytes_' + // Serializes updates on 'reservationBytes_', 'usedReservationBytes_' // and 'minReservationBytes_' to make reservation decision on a consistent // read/write of those counters. incrementReservation()/decrementReservation() // work based on atomic 'reservationBytes_' without mutex as children updating diff --git a/velox/common/memory/tests/MemoryPoolTest.cpp b/velox/common/memory/tests/MemoryPoolTest.cpp index a3494a093813..6cc65bf4a103 100644 --- a/velox/common/memory/tests/MemoryPoolTest.cpp +++ b/velox/common/memory/tests/MemoryPoolTest.cpp @@ -524,6 +524,49 @@ TEST_P(MemoryPoolTest, grow) { leaf->free(buf, 1 * MB); } +TEST_P(MemoryPoolTest, releasableMemory) { + struct TestParam { + int64_t usedBytes; + int64_t reservedBytes; + }; + std::vector testParams{ + {2345, 98760}, + {1, 1024}, + {4096, 4096}, + {1 * MB, 16 * MB}, + {6 * MB, 7 * MB}, + {123 * MB, 200 * MB}, + {100 * MB, 50 * MB}}; + auto root = getMemoryManager()->addRootPool("releasableMemory", 4 * GB); + for (auto i = 0; i < testParams.size() - 1; i++) { + auto leaf0 = root->addLeafChild("leafPool-0"); + leaf0->maybeReserve(testParams[i].reservedBytes); + void* buffer0 = leaf0->allocate(testParams[i].usedBytes); + const auto reservedBytes0 = leaf0->reservedBytes(); + const auto releasableBytes0 = leaf0->releasableReservation(); + + auto leaf1 = root->addLeafChild("leafPool-1"); + leaf1->maybeReserve(testParams[i + 1].reservedBytes); + void* buffer1 = leaf1->allocate(testParams[i + 1].usedBytes); + const auto reservedBytes1 = leaf1->reservedBytes(); + const auto releasableBytes1 = leaf1->releasableReservation(); + + const auto releasableBytesRoot = root->releasableReservation(); + const auto reservedBytesRoot = root->reservedBytes(); + ASSERT_EQ(releasableBytesRoot, releasableBytes0 + releasableBytes1); + + leaf0->release(); + ASSERT_EQ(reservedBytes0 - leaf0->reservedBytes(), releasableBytes0); + ASSERT_EQ(reservedBytesRoot - root->reservedBytes(), releasableBytes0); + leaf1->release(); + ASSERT_EQ(reservedBytes1 - leaf1->reservedBytes(), releasableBytes1); + ASSERT_EQ(reservedBytesRoot - root->reservedBytes(), releasableBytesRoot); + + leaf0->free(buffer0, testParams[i].usedBytes); + leaf1->free(buffer1, testParams[i + 1].usedBytes); + } +} + TEST_P(MemoryPoolTest, ReallocTestSameSize) { auto manager = getMemoryManager(); auto root = manager->addRootPool(); diff --git a/velox/connectors/hive/tests/HiveDataSinkTest.cpp b/velox/connectors/hive/tests/HiveDataSinkTest.cpp index 9ab1650e6f1f..8e13658e7d2d 100644 --- a/velox/connectors/hive/tests/HiveDataSinkTest.cpp +++ b/velox/connectors/hive/tests/HiveDataSinkTest.cpp @@ -24,6 +24,7 @@ #include "velox/common/testutil/TestValue.h" #include "velox/core/Config.h" #include "velox/dwio/common/Options.h" +#include "velox/dwio/dwrf/writer/Writer.h" #include "velox/exec/tests/utils/PlanBuilder.h" #include "velox/exec/tests/utils/TempDirectoryPath.h" #include "velox/vector/fuzzer/VectorFuzzer.h" @@ -621,7 +622,7 @@ TEST_F(HiveDataSinkTest, abort) { } } -TEST_F(HiveDataSinkTest, memoryReclaim) { +DEBUG_ONLY_TEST_F(HiveDataSinkTest, memoryReclaim) { const int numBatches = 200; auto vectors = createVectors(500, 200); @@ -644,7 +645,7 @@ TEST_F(HiveDataSinkTest, memoryReclaim) { expectedWriterReclaimed); } } testSettings[] = { - // {dwio::common::FileFormat::DWRF, true, true, 1 << 30, true, true}, + {dwio::common::FileFormat::DWRF, true, true, 1 << 30, true, true}, {dwio::common::FileFormat::DWRF, true, true, 1, true, true}, {dwio::common::FileFormat::DWRF, true, false, 1 << 30, false, false}, {dwio::common::FileFormat::DWRF, true, false, 1, false, false}, @@ -664,6 +665,13 @@ TEST_F(HiveDataSinkTest, memoryReclaim) { {dwio::common::FileFormat::PARQUET, false, false, 1, false, false} #endif }; + SCOPED_TESTVALUE_SET( + "facebook::velox::dwrf::Writer::MemoryReclaimer::reclaimableBytes", + std::function([&](dwrf::Writer* writer) { + // Release before reclaim to make it not able to reclaim from reserved + // memory. + writer->getContext().releaseMemoryReservation(); + })); for (const auto& testData : testSettings) { SCOPED_TRACE(testData.debugString()); setupMemoryPools(); diff --git a/velox/dwio/dwrf/test/E2EWriterTest.cpp b/velox/dwio/dwrf/test/E2EWriterTest.cpp index 93f7463c340f..0ade64b93477 100644 --- a/velox/dwio/dwrf/test/E2EWriterTest.cpp +++ b/velox/dwio/dwrf/test/E2EWriterTest.cpp @@ -2040,7 +2040,14 @@ DEBUG_ONLY_TEST_F(E2EWriterTest, memoryReclaimDuringInit) { } } -TEST_F(E2EWriterTest, memoryReclaimThreshold) { +DEBUG_ONLY_TEST_F(E2EWriterTest, memoryReclaimThreshold) { + SCOPED_TESTVALUE_SET( + "facebook::velox::dwrf::Writer::MemoryReclaimer::reclaimableBytes", + std::function([&](dwrf::Writer* writer) { + // Release before reclaim to make it not able to reclaim from reserved + // memory. + writer->getContext().releaseMemoryReservation(); + })); const auto type = ROW( {{"int_val", INTEGER()}, {"string_val", VARCHAR()}, diff --git a/velox/dwio/dwrf/test/WriterFlushTest.cpp b/velox/dwio/dwrf/test/WriterFlushTest.cpp index a964e75a6382..76ef171717b9 100644 --- a/velox/dwio/dwrf/test/WriterFlushTest.cpp +++ b/velox/dwio/dwrf/test/WriterFlushTest.cpp @@ -57,10 +57,14 @@ class MockMemoryPool : public velox::memory::MemoryPool { VELOX_NYI("{} unsupported", __FUNCTION__); } - int64_t reservedBytes() const override { + int64_t releasableReservation() const override { VELOX_NYI("{} unsupported", __FUNCTION__); } + int64_t reservedBytes() const override { + return localMemoryUsage_; + } + bool maybeReserve(uint64_t size) override { VELOX_NYI("{} unsupported", __FUNCTION__); } diff --git a/velox/dwio/dwrf/writer/Writer.cpp b/velox/dwio/dwrf/writer/Writer.cpp index 1b1807faaaf2..11ef5be99050 100644 --- a/velox/dwio/dwrf/writer/Writer.cpp +++ b/velox/dwio/dwrf/writer/Writer.cpp @@ -289,11 +289,11 @@ bool Writer::maybeReserveMemory( auto& context = getContext(); auto& pool = context.getMemoryPool(memoryUsageCategory); const uint64_t availableReservation = pool.availableReservation(); - const uint64_t usedReservationBytes = pool.usedBytes(); + const uint64_t usedBytes = pool.usedBytes(); const uint64_t minReservationBytes = - usedReservationBytes * spillConfig_->minSpillableReservationPct / 100; + usedBytes * spillConfig_->minSpillableReservationPct / 100; const uint64_t estimatedIncrementBytes = - usedReservationBytes * estimatedMemoryGrowthRatio; + usedBytes * estimatedMemoryGrowthRatio; if ((availableReservation > minReservationBytes) && (availableReservation > 2 * estimatedIncrementBytes)) { return true; @@ -301,15 +301,15 @@ bool Writer::maybeReserveMemory( const uint64_t bytesToReserve = std::max( estimatedIncrementBytes * 2, - usedReservationBytes * spillConfig_->spillableReservationGrowthPct / 100); + usedBytes * spillConfig_->spillableReservationGrowthPct / 100); return pool.maybeReserve(bytesToReserve); } -void Writer::releaseMemory() { +int64_t Writer::releaseMemory() { if (!canReclaim()) { - return; + return 0; } - getContext().releaseMemoryReservation(); + return getContext().releaseMemoryReservation(); } uint64_t Writer::flushTimeMemoryUsageEstimate( @@ -406,7 +406,7 @@ void Writer::flushStripe(bool close) { createRowIndexEntry(); } - const auto preFlushMem = context.getTotalMemoryUsage(); + const auto preFlushTotalMemBytes = context.getTotalMemoryUsage(); ensureStripeFlushFits(); // NOTE: ensureStripeFlushFits() might trigger memory arbitration that have // flushed the current stripe. @@ -434,7 +434,7 @@ void Writer::flushStripe(bool close) { metrics.flushOverhead = static_cast(flushOverhead); context.recordFlushOverhead(metrics.flushOverhead); - const auto postFlushMem = context.getTotalMemoryUsage(); + const auto postFlushTotalMemBytes = context.getTotalMemoryUsage(); auto& sink = writerBase_->getSink(); auto stripeOffset = sink.size(); @@ -559,8 +559,8 @@ void Writer::flushStripe(bool close) { metrics.stripeIndex, metrics.flushOverhead, metrics.stripeSize, - preFlushMem, - postFlushMem, + preFlushTotalMemBytes, + postFlushTotalMemBytes, metrics.close); addThreadLocalRuntimeStat( "stripeSize", @@ -721,11 +721,18 @@ bool Writer::MemoryReclaimer::reclaimableBytes( if (!writer_->canReclaim()) { return false; } - const uint64_t memoryUsage = writer_->getContext().getTotalMemoryUsage(); - if (memoryUsage < writer_->spillConfig_->writerFlushThresholdSize) { + TestValue::adjust( + "facebook::velox::dwrf::Writer::MemoryReclaimer::reclaimableBytes", + writer_); + const auto& context = writer_->getContext(); + const auto usedBytes = context.getTotalMemoryUsage(); + const auto releasableBytes = context.releasableMemoryReservation(); + const bool flushable = + usedBytes >= writer_->spillConfig_->writerFlushThresholdSize; + if (releasableBytes == 0 && !flushable) { return false; } - reclaimableBytes = memoryUsage; + reclaimableBytes = (flushable ? usedBytes : 0) + releasableBytes; return true; } @@ -741,7 +748,8 @@ uint64_t Writer::MemoryReclaimer::reclaim( if (*writer_->nonReclaimableSection_) { RECORD_METRIC_VALUE(kMetricMemoryNonReclaimableCount); LOG(WARNING) - << "Can't reclaim from dwrf writer which is under non-reclaimable section: " + << "Can't reclaim from dwrf writer which is under non-reclaimable " + "section: " << pool->name(); ++stats.numNonReclaimableAttempts; return 0; @@ -752,24 +760,34 @@ uint64_t Writer::MemoryReclaimer::reclaim( ++stats.numNonReclaimableAttempts; return 0; } - const uint64_t memoryUsage = writer_->getContext().getTotalMemoryUsage(); - if (memoryUsage < writer_->spillConfig_->writerFlushThresholdSize) { - RECORD_METRIC_VALUE(kMetricMemoryNonReclaimableCount); - LOG(WARNING) - << "Can't reclaim memory from dwrf writer pool " << pool->name() - << " which doesn't have sufficient memory to flush, writer memory usage: " - << succinctBytes(memoryUsage) << ", writer flush memory threshold: " - << succinctBytes(writer_->spillConfig_->writerFlushThresholdSize); - ++stats.numNonReclaimableAttempts; - return 0; - } return memory::MemoryReclaimer::run( [&]() { int64_t reclaimedBytes{0}; { memory::ScopedReclaimedBytesRecorder recorder(pool, &reclaimedBytes); - writer_->flushInternal(false); + const auto& context = writer_->getContext(); + const auto usedBytes = context.getTotalMemoryUsage(); + const auto releasedBytes = writer_->releaseMemory(); + if (releasedBytes == 0 && + usedBytes < writer_->spillConfig_->writerFlushThresholdSize) { + RECORD_METRIC_VALUE(kMetricMemoryNonReclaimableCount); + LOG(WARNING) + << "Can't reclaim memory from dwrf writer pool " << pool->name() + << " which doesn't have sufficient memory to release or flush, " + "writer memory usage: " + << succinctBytes(usedBytes) + << ", writer memory available reservation: " + << succinctBytes(context.availableMemoryReservation()) + << ", writer flush memory threshold: " + << succinctBytes( + writer_->spillConfig_->writerFlushThresholdSize); + ++stats.numNonReclaimableAttempts; + } else { + if (usedBytes >= writer_->spillConfig_->writerFlushThresholdSize) { + writer_->flushInternal(false); + } + } } return reclaimedBytes; }, diff --git a/velox/dwio/dwrf/writer/Writer.h b/velox/dwio/dwrf/writer/Writer.h index 4e5529d7d025..d342fce1bf64 100644 --- a/velox/dwio/dwrf/writer/Writer.h +++ b/velox/dwio/dwrf/writer/Writer.h @@ -188,8 +188,9 @@ class Writer : public dwio::common::Writer { MemoryUsageCategory memoryUsageCategory, double estimatedMemoryGrowthRatio); - // Releases the unused memory reservations after we flush a stripe. - void releaseMemory(); + // Releases the unused memory reservations after we flush a stripe. Returns + // the total number of released bytes. + int64_t releaseMemory(); // Create a new stripe. No-op if there is no data written. void flushInternal(bool close = false); diff --git a/velox/dwio/dwrf/writer/WriterContext.cpp b/velox/dwio/dwrf/writer/WriterContext.cpp index 2be8bee0bce9..68cb916d55e2 100644 --- a/velox/dwio/dwrf/writer/WriterContext.cpp +++ b/velox/dwio/dwrf/writer/WriterContext.cpp @@ -127,9 +127,8 @@ int64_t WriterContext::getMemoryUsage( } int64_t WriterContext::getTotalMemoryUsage() const { - return getMemoryUsage(MemoryUsageCategory::OUTPUT_STREAM) + - getMemoryUsage(MemoryUsageCategory::DICTIONARY) + - getMemoryUsage(MemoryUsageCategory::GENERAL); + return generalPool_->usedBytes() + dictionaryPool_->usedBytes() + + outputStreamPool_->usedBytes(); } int64_t WriterContext::availableMemoryReservation() const { @@ -138,10 +137,20 @@ int64_t WriterContext::availableMemoryReservation() const { generalPool_->availableReservation(); } -void WriterContext::releaseMemoryReservation() { +int64_t WriterContext::releasableMemoryReservation() const { + return generalPool_->parent()->releasableReservation(); +} + +int64_t WriterContext::releaseMemoryReservation() { + const auto* aggregatePool = dictionaryPool_->parent(); + const auto beforeTotalReservation = aggregatePool->reservedBytes(); dictionaryPool_->release(); outputStreamPool_->release(); generalPool_->release(); + const auto releasedMemory = + beforeTotalReservation - aggregatePool->reservedBytes(); + VELOX_CHECK_GE(releasedMemory, 0); + return releasedMemory; } void WriterContext::abort() { diff --git a/velox/dwio/dwrf/writer/WriterContext.h b/velox/dwio/dwrf/writer/WriterContext.h index ca8037f42fe9..4c93409d8d03 100644 --- a/velox/dwio/dwrf/writer/WriterContext.h +++ b/velox/dwio/dwrf/writer/WriterContext.h @@ -192,12 +192,15 @@ class WriterContext : public CompressionBufferPool { return pool_->maxCapacity(); } - /// Returns the available memory reservations aggregated from all the memory - /// pools. + /// Returns the available memory reservations from all the memory pools. int64_t availableMemoryReservation() const; - /// Releases unused memory reservation aggregated from all the memory pools. - void releaseMemoryReservation(); + /// Returns the amount of unused memory reservation that could be released. + int64_t releasableMemoryReservation() const; + + /// Releases unused memory reservation from all the memory pools. Returns + /// total released bytes. + int64_t releaseMemoryReservation(); const encryption::EncryptionHandler& getEncryptionHandler() const { return *handler_; @@ -335,7 +338,9 @@ class WriterContext : public CompressionBufferPool { /// O(k * raw data size). The actual coefficient k can differ /// from encoding to encoding, and thus should be schema aware. size_t getEstimatedFlushOverhead(size_t dataRawSize) const { - return ceil(flushOverheadRatioTracker_.getEstimatedRatio() * dataRawSize); + return ceil( + flushOverheadRatioTracker_.getEstimatedRatio() * + (dataRawSize + getMemoryUsage(MemoryUsageCategory::DICTIONARY))); } /// We currently use previous stripe raw size as the proxy for the expected diff --git a/velox/exec/tests/TableWriteTest.cpp b/velox/exec/tests/TableWriteTest.cpp index 2f9c372726a2..24cb3ebb5bdf 100644 --- a/velox/exec/tests/TableWriteTest.cpp +++ b/velox/exec/tests/TableWriteTest.cpp @@ -3643,10 +3643,17 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, writerFlushThreshold) { createVectors(numBatches, rowType_, options); createDuckDbTable(vectors); - const std::vector writerFlushThresholds{0, 1UL << 30}; - for (uint64_t writerFlushThreshold : writerFlushThresholds) { + struct TestParam { + uint64_t bytesToReserve{0}; + uint64_t writerFlushThreshold{0}; + }; + const std::vector testParams{ + {0, 0}, {0, 1UL << 30}, {64UL << 20, 1UL << 30}}; + for (const auto& testParam : testParams) { SCOPED_TRACE(fmt::format( - "writerFlushThreshold: {}", succinctBytes(writerFlushThreshold))); + "bytesToReserve: {}, writerFlushThreshold: {}", + succinctBytes(testParam.bytesToReserve), + succinctBytes(testParam.writerFlushThreshold))); auto memoryManager = createMemoryManager(); auto arbitrator = memoryManager->arbitrator(); @@ -3654,6 +3661,17 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, writerFlushThreshold) { newQueryCtx(memoryManager.get(), executor_.get(), kMemoryCapacity); ASSERT_EQ(queryCtx->pool()->capacity(), kMemoryPoolInitCapacity); + memory::MemoryPool* compressionPool{nullptr}; + SCOPED_TESTVALUE_SET( + "facebook::velox::dwrf::Writer::write", + std::function([&](dwrf::Writer* writer) { + if (testParam.bytesToReserve == 0 || compressionPool != nullptr) { + return; + } + compressionPool = &(writer->getContext().getMemoryPool( + dwrf::MemoryUsageCategory::OUTPUT_STREAM)); + })); + std::atomic numInputs{0}; SCOPED_TESTVALUE_SET( "facebook::velox::exec::Driver::runInternal::addInput", @@ -3665,14 +3683,17 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, writerFlushThreshold) { return; } + if (testParam.bytesToReserve > 0) { + ASSERT_TRUE(compressionPool != nullptr); + compressionPool->maybeReserve(testParam.bytesToReserve); + } + const auto fakeAllocationSize = arbitrator->stats().maxCapacityBytes - - op->pool()->parent()->reservedBytes(); - if (writerFlushThreshold == 0) { + op->pool()->parent()->usedBytes(); + if (testParam.writerFlushThreshold == 0) { auto* buffer = op->pool()->allocate(fakeAllocationSize); op->pool()->free(buffer, fakeAllocationSize); } else { - // The injected memory allocation fail if we set very high - // memory flush threshold. VELOX_ASSERT_THROW( op->pool()->allocate(fakeAllocationSize), "Exceeded memory pool"); @@ -3699,16 +3720,18 @@ DEBUG_ONLY_TEST_F(TableWriterArbitrationTest, writerFlushThreshold) { .config(core::QueryConfig::kSpillEnabled, true) .config(core::QueryConfig::kWriterSpillEnabled, true) .config( - core::QueryConfig::kWriterFlushThresholdBytes, writerFlushThreshold) + core::QueryConfig::kWriterFlushThresholdBytes, + testParam.writerFlushThreshold) .plan(std::move(writerPlan)) .assertResults(fmt::format("SELECT {}", numRows)); ASSERT_EQ( - - arbitrator->stats().numFailures, writerFlushThreshold == 0 ? 0 : 1); + arbitrator->stats().numFailures, + testParam.writerFlushThreshold == 0 ? 0 : 1); // We don't trigger reclaim on a writer if it doesn't meet the writer flush // threshold. ASSERT_EQ(arbitrator->stats().numNonReclaimableAttempts, 0); + ASSERT_GE(arbitrator->stats().numReclaimedBytes, testParam.bytesToReserve); waitForAllTasksToBeDeleted(3'000'000); queryCtx.reset(); ASSERT_EQ(arbitrator->stats().numReserves, 1);