Skip to content

Commit

Permalink
Close ledgers in parallel when flag is enabled
Browse files Browse the repository at this point in the history
  • Loading branch information
marta-lokhova committed Nov 16, 2024
1 parent e0c5ff1 commit 6d1ce1f
Show file tree
Hide file tree
Showing 66 changed files with 562 additions and 395 deletions.
2 changes: 1 addition & 1 deletion lib/libmedida
8 changes: 3 additions & 5 deletions src/bucket/BucketList.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "crypto/SHA.h"
#include "ledger/LedgerManager.h"
#include "ledger/LedgerTxn.h"
#include "ledger/NetworkConfig.h"
#include "main/Application.h"
#include "util/GlobalChecks.h"
#include "util/Logging.h"
Expand Down Expand Up @@ -56,7 +57,6 @@ BucketLevel::getNext()
void
BucketLevel::setNext(FutureBucket const& fb)
{
releaseAssert(threadIsMain());
mNextCurr = fb;
}

Expand All @@ -75,7 +75,6 @@ BucketLevel::getSnap() const
void
BucketLevel::setCurr(std::shared_ptr<Bucket> b)
{
releaseAssert(threadIsMain());
mNextCurr.clear();
mCurr = b;
}
Expand Down Expand Up @@ -776,7 +775,8 @@ void
BucketList::scanForEvictionLegacy(Application& app, AbstractLedgerTxn& ltx,
uint32_t ledgerSeq,
EvictionCounters& counters,
std::shared_ptr<EvictionStatistics> stats)
std::shared_ptr<EvictionStatistics> stats,
SorobanNetworkConfig& networkConfig)
{
releaseAssert(stats);

Expand All @@ -785,8 +785,6 @@ BucketList::scanForEvictionLegacy(Application& app, AbstractLedgerTxn& ltx,
return iter.isCurrBucket ? level.getCurr() : level.getSnap();
};

auto const& networkConfig =
app.getLedgerManager().getSorobanNetworkConfig();
auto const firstScanLevel =
networkConfig.stateArchivalSettings().startingEvictionScanLevel;
auto evictionIter = networkConfig.evictionIterator();
Expand Down
4 changes: 3 additions & 1 deletion src/bucket/BucketList.h
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ struct BucketEntryCounters;
class Config;
struct EvictionCounters;
struct InflationWinner;
class SorobanNetworkConfig;

namespace testutil
{
Expand Down Expand Up @@ -478,7 +479,8 @@ class BucketList

void scanForEvictionLegacy(Application& app, AbstractLedgerTxn& ltx,
uint32_t ledgerSeq, EvictionCounters& counters,
std::shared_ptr<EvictionStatistics> stats);
std::shared_ptr<EvictionStatistics> stats,
SorobanNetworkConfig& networkConfig);

// Restart any merges that might be running on background worker threads,
// merging buckets between levels. This needs to be called after forcing a
Expand Down
2 changes: 0 additions & 2 deletions src/bucket/BucketListSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ BucketListSnapshot::BucketListSnapshot(BucketList const& bl,
LedgerHeader header)
: mHeader(std::move(header))
{
releaseAssert(threadIsMain());

for (uint32_t i = 0; i < BucketList::kNumLevels; ++i)
{
auto const& level = bl.getLevel(i);
Expand Down
10 changes: 7 additions & 3 deletions src/bucket/BucketManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ struct HistoryArchiveState;
struct InflationWinner;
struct LedgerHeader;
struct MergeKey;
class SorobanNetworkConfig;

// A fine-grained merge-operation-counter structure for tracking various
// events during merges. These are not medida counters because we do not
Expand Down Expand Up @@ -288,12 +289,15 @@ class BucketManager : NonMovableOrCopyable
// pointed to by EvictionIterator. Scans until `maxEntriesToEvict` entries
// have been evicted or maxEvictionScanSize bytes have been scanned.
virtual void scanForEvictionLegacy(AbstractLedgerTxn& ltx,
uint32_t ledgerSeq) = 0;
uint32_t ledgerSeq,
SorobanNetworkConfig& networkConfig) = 0;

virtual void startBackgroundEvictionScan(uint32_t ledgerSeq) = 0;
virtual void startBackgroundEvictionScan(uint32_t ledgerSeq,
bool callFromLedgerClose) = 0;
virtual void
resolveBackgroundEvictionScan(AbstractLedgerTxn& ltx, uint32_t ledgerSeq,
LedgerKeySet const& modifiedKeys) = 0;
LedgerKeySet const& modifiedKeys,
SorobanNetworkConfig& networkConfig) = 0;

virtual medida::Meter& getBloomMissMeter() const = 0;
virtual medida::Meter& getBloomLookupMeter() const = 0;
Expand Down
83 changes: 42 additions & 41 deletions src/bucket/BucketManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ void
BucketManagerImpl::initialize()
{
ZoneScoped;
std::string d = mApp.getConfig().BUCKET_DIR_PATH;
std::string d = mConfig.BUCKET_DIR_PATH;

if (!fs::exists(d))
{
Expand Down Expand Up @@ -122,17 +122,17 @@ BucketManagerImpl::initialize()
mLockedBucketDir = std::make_unique<std::string>(d);
mTmpDirManager = std::make_unique<TmpDirManager>(d + "/tmp");

if (mApp.getConfig().MODE_ENABLES_BUCKETLIST)
if (mConfig.MODE_ENABLES_BUCKETLIST)
{
mBucketList = std::make_unique<BucketList>();

if (mApp.getConfig().isUsingBucketListDB())
if (mConfig.isUsingBucketListDB())
{
mSnapshotManager = std::make_unique<BucketSnapshotManager>(
mApp,
std::make_unique<BucketListSnapshot>(*mBucketList,
LedgerHeader()),
mApp.getConfig().QUERY_SNAPSHOT_LEDGERS);
mConfig.QUERY_SNAPSHOT_LEDGERS);
}
}

Expand All @@ -141,11 +141,10 @@ BucketManagerImpl::initialize()
// BUCKET_DIR_PATH, HISTORY_FILE_TYPE_SCP is persisted to the database
// so create the remaining ledger header, transactions and results
// directories
createPublishDir(FileType::HISTORY_FILE_TYPE_LEDGER, mApp.getConfig());
createPublishDir(FileType::HISTORY_FILE_TYPE_TRANSACTIONS,
mApp.getConfig());
createPublishDir(FileType::HISTORY_FILE_TYPE_RESULTS, mApp.getConfig());
HistoryManager::createPublishQueueDir(mApp.getConfig());
createPublishDir(FileType::HISTORY_FILE_TYPE_LEDGER, mConfig);
createPublishDir(FileType::HISTORY_FILE_TYPE_TRANSACTIONS, mConfig);
createPublishDir(FileType::HISTORY_FILE_TYPE_RESULTS, mConfig);
HistoryManager::createPublishQueueDir(mConfig);
}

void
Expand Down Expand Up @@ -178,6 +177,7 @@ EvictionCounters::EvictionCounters(Application& app)

BucketManagerImpl::BucketManagerImpl(Application& app)
: mApp(app)
, mConfig(app.getConfig())
, mBucketList(nullptr)
, mSnapshotManager(nullptr)
, mTmpDirManager(nullptr)
Expand Down Expand Up @@ -299,7 +299,7 @@ void
BucketManagerImpl::deleteEntireBucketDir()
{
ZoneScoped;
std::string d = mApp.getConfig().BUCKET_DIR_PATH;
std::string d = mConfig.BUCKET_DIR_PATH;
if (fs::exists(d))
{
// First clean out the contents of the tmpdir, as usual.
Expand Down Expand Up @@ -332,7 +332,7 @@ BucketManagerImpl::deleteTmpDirAndUnlockBucketDir()
// Then delete the lockfile $BUCKET_DIR_PATH/stellar-core.lock
if (mLockedBucketDir)
{
std::string d = mApp.getConfig().BUCKET_DIR_PATH;
std::string d = mConfig.BUCKET_DIR_PATH;
std::string lock = d + "/" + kLockFilename;
releaseAssert(fs::exists(lock));
fs::unlockFile(lock);
Expand All @@ -343,14 +343,14 @@ BucketManagerImpl::deleteTmpDirAndUnlockBucketDir()
BucketList&
BucketManagerImpl::getBucketList()
{
releaseAssertOrThrow(mApp.getConfig().MODE_ENABLES_BUCKETLIST);
releaseAssertOrThrow(mConfig.MODE_ENABLES_BUCKETLIST);
return *mBucketList;
}

BucketSnapshotManager&
BucketManagerImpl::getBucketSnapshotManager() const
{
releaseAssertOrThrow(mApp.getConfig().isUsingBucketListDB());
releaseAssertOrThrow(mConfig.isUsingBucketListDB());
releaseAssert(mSnapshotManager);
return *mSnapshotManager;
}
Expand Down Expand Up @@ -476,7 +476,7 @@ BucketManagerImpl::renameBucketDirFile(std::filesystem::path const& src,
std::filesystem::path const& dst)
{
ZoneScoped;
if (mApp.getConfig().DISABLE_XDR_FSYNC)
if (mConfig.DISABLE_XDR_FSYNC)
{
return rename(src.string().c_str(), dst.string().c_str()) == 0;
}
Expand All @@ -492,7 +492,7 @@ BucketManagerImpl::adoptFileAsBucket(std::string const& filename,
std::unique_ptr<BucketIndex const> index)
{
ZoneScoped;
releaseAssertOrThrow(mApp.getConfig().MODE_ENABLES_BUCKETLIST);
releaseAssertOrThrow(mConfig.MODE_ENABLES_BUCKETLIST);
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);

if (mergeKey)
Expand Down Expand Up @@ -566,7 +566,7 @@ BucketManagerImpl::adoptFileAsBucket(std::string const& filename,
void
BucketManagerImpl::noteEmptyMergeOutput(MergeKey const& mergeKey)
{
releaseAssertOrThrow(mApp.getConfig().MODE_ENABLES_BUCKETLIST);
releaseAssertOrThrow(mConfig.MODE_ENABLES_BUCKETLIST);

// We _do_ want to remove the mergeKey from mLiveFutures, both so that that
// map does not grow without bound and more importantly so that we drop the
Expand Down Expand Up @@ -681,7 +681,7 @@ BucketManagerImpl::putMergeFuture(
MergeKey const& key, std::shared_future<std::shared_ptr<Bucket>> wp)
{
ZoneScoped;
releaseAssertOrThrow(mApp.getConfig().MODE_ENABLES_BUCKETLIST);
releaseAssertOrThrow(mConfig.MODE_ENABLES_BUCKETLIST);
std::lock_guard<std::recursive_mutex> lock(mBucketMutex);
CLOG_TRACE(
Bucket,
Expand All @@ -704,7 +704,7 @@ BucketManagerImpl::getBucketListReferencedBuckets() const
{
ZoneScoped;
std::set<Hash> referenced;
if (!mApp.getConfig().MODE_ENABLES_BUCKETLIST)
if (!mConfig.MODE_ENABLES_BUCKETLIST)
{
return referenced;
}
Expand Down Expand Up @@ -743,7 +743,7 @@ BucketManagerImpl::getAllReferencedBuckets() const
{
ZoneScoped;
auto referenced = getBucketListReferencedBuckets();
if (!mApp.getConfig().MODE_ENABLES_BUCKETLIST)
if (!mConfig.MODE_ENABLES_BUCKETLIST)
{
return referenced;
}
Expand Down Expand Up @@ -788,7 +788,7 @@ void
BucketManagerImpl::cleanupStaleFiles()
{
ZoneScoped;
if (mApp.getConfig().DISABLE_BUCKET_GC)
if (mConfig.DISABLE_BUCKET_GC)
{
return;
}
Expand Down Expand Up @@ -867,7 +867,7 @@ BucketManagerImpl::forgetUnreferencedBuckets()
CLOG_TRACE(Bucket,
"BucketManager::forgetUnreferencedBuckets dropping {}",
filename);
if (!filename.empty() && !mApp.getConfig().DISABLE_BUCKET_GC)
if (!filename.empty() && !mConfig.DISABLE_BUCKET_GC)
{
CLOG_TRACE(Bucket, "removing bucket file: {}", filename);
std::filesystem::remove(filename);
Expand Down Expand Up @@ -974,7 +974,7 @@ BucketManagerImpl::snapshotLedger(LedgerHeader& currentHeader)
{
ZoneScoped;
Hash hash;
if (mApp.getConfig().MODE_ENABLES_BUCKETLIST)
if (mConfig.MODE_ENABLES_BUCKETLIST)
{
hash = mBucketList->getHash();
}
Expand Down Expand Up @@ -1005,25 +1005,29 @@ BucketManagerImpl::maybeSetIndex(std::shared_ptr<Bucket> b,

void
BucketManagerImpl::scanForEvictionLegacy(AbstractLedgerTxn& ltx,
uint32_t ledgerSeq)
uint32_t ledgerSeq,
SorobanNetworkConfig& networkConfig)
{
ZoneScoped;
releaseAssert(protocolVersionStartsFrom(ltx.getHeader().ledgerVersion,
SOROBAN_PROTOCOL_VERSION));
mBucketList->scanForEvictionLegacy(
mApp, ltx, ledgerSeq, mBucketListEvictionCounters, mEvictionStatistics);
mBucketList->scanForEvictionLegacy(mApp, ltx, ledgerSeq,
mBucketListEvictionCounters,
mEvictionStatistics, networkConfig);
}

void
BucketManagerImpl::startBackgroundEvictionScan(uint32_t ledgerSeq)
BucketManagerImpl::startBackgroundEvictionScan(uint32_t ledgerSeq,
bool callFromLedgerClose)
{
releaseAssert(mApp.getConfig().isUsingBucketListDB());
releaseAssert(!threadIsMain() || !mConfig.parallelLedgerClose());
releaseAssert(mConfig.isUsingBucketListDB());
releaseAssert(mSnapshotManager);
releaseAssert(!mEvictionFuture.valid());
releaseAssert(mEvictionStatistics);

auto searchableBL = mSnapshotManager->copySearchableBucketListSnapshot();
auto const& cfg = mApp.getLedgerManager().getSorobanNetworkConfig();
auto cfg = mApp.getLedgerManager().getSorobanNetworkConfig();
auto const& sas = cfg.stateArchivalSettings();

using task_t = std::packaged_task<EvictionResult()>;
Expand All @@ -1045,28 +1049,25 @@ BucketManagerImpl::startBackgroundEvictionScan(uint32_t ledgerSeq)
void
BucketManagerImpl::resolveBackgroundEvictionScan(
AbstractLedgerTxn& ltx, uint32_t ledgerSeq,
LedgerKeySet const& modifiedKeys)
LedgerKeySet const& modifiedKeys, SorobanNetworkConfig& networkConfig)
{
ZoneScoped;
releaseAssert(threadIsMain());
releaseAssert(!threadIsMain() || !mConfig.parallelLedgerClose());
releaseAssert(mEvictionStatistics);

if (!mEvictionFuture.valid())
{
startBackgroundEvictionScan(ledgerSeq);
startBackgroundEvictionScan(ledgerSeq, false);
}

auto evictionCandidates = mEvictionFuture.get();

auto const& networkConfig =
mApp.getLedgerManager().getSorobanNetworkConfig();

// If eviction related settings changed during the ledger, we have to
// restart the scan
if (!evictionCandidates.isValid(ledgerSeq,
networkConfig.stateArchivalSettings()))
{
startBackgroundEvictionScan(ledgerSeq);
startBackgroundEvictionScan(ledgerSeq, false);
evictionCandidates = mEvictionFuture.get();
}

Expand Down Expand Up @@ -1176,7 +1177,7 @@ BucketManagerImpl::assumeState(HistoryArchiveState const& has,
uint32_t maxProtocolVersion, bool restartMerges)
{
ZoneScoped;
releaseAssertOrThrow(mApp.getConfig().MODE_ENABLES_BUCKETLIST);
releaseAssertOrThrow(mConfig.MODE_ENABLES_BUCKETLIST);

for (uint32_t i = 0; i < BucketList::kNumLevels; ++i)
{
Expand All @@ -1203,7 +1204,7 @@ BucketManagerImpl::assumeState(HistoryArchiveState const& has,

// Buckets on the BucketList should always be indexed when
// BucketListDB enabled
if (mApp.getConfig().isUsingBucketListDB())
if (mConfig.isUsingBucketListDB())
{
releaseAssert(curr->isEmpty() || curr->isIndexed());
releaseAssert(snap->isEmpty() || snap->isIndexed());
Expand Down Expand Up @@ -1328,7 +1329,7 @@ BucketManagerImpl::mergeBuckets(HistoryArchiveState const& has)
BucketMetadata meta;
MergeCounters mc;
auto& ctx = mApp.getClock().getIOContext();
meta.ledgerVersion = mApp.getConfig().LEDGER_PROTOCOL_VERSION;
meta.ledgerVersion = mConfig.LEDGER_PROTOCOL_VERSION;
BucketOutputIterator out(getTmpDir(), /*keepDeadEntries=*/false, meta, mc,
ctx, /*doFsync=*/true);
for (auto const& pair : ledgerMap)
Expand Down Expand Up @@ -1539,13 +1540,13 @@ BucketManagerImpl::scheduleVerifyReferencedBucketsWork()
Config const&
BucketManagerImpl::getConfig() const
{
return mApp.getConfig();
return mConfig;
}

std::shared_ptr<SearchableBucketListSnapshot>
BucketManagerImpl::getSearchableBucketListSnapshot()
{
releaseAssert(mApp.getConfig().isUsingBucketListDB());
releaseAssert(mConfig.isUsingBucketListDB());
// Any other threads must maintain their own snapshot
releaseAssert(threadIsMain());
if (!mSearchableBucketListSnapshot)
Expand All @@ -1560,7 +1561,7 @@ BucketManagerImpl::getSearchableBucketListSnapshot()
void
BucketManagerImpl::reportBucketEntryCountMetrics()
{
if (!mApp.getConfig().isUsingBucketListDB())
if (!mConfig.isUsingBucketListDB())
{
return;
}
Expand Down
Loading

0 comments on commit 6d1ce1f

Please sign in to comment.