From 6fffb625f23b9e64e1440609bf19451d7193f837 Mon Sep 17 00:00:00 2001 From: marta-lokhova Date: Thu, 21 Nov 2024 15:40:00 -0800 Subject: [PATCH 1/3] Gracefully handle enabling publishing after genesis --- src/history/CheckpointBuilder.cpp | 36 ++++++++++++++++++++---------- src/history/CheckpointBuilder.h | 3 ++- src/history/HistoryManagerImpl.cpp | 8 +++---- src/main/CommandLine.cpp | 1 + 4 files changed, 31 insertions(+), 17 deletions(-) diff --git a/src/history/CheckpointBuilder.cpp b/src/history/CheckpointBuilder.cpp index edbd6fda7d..8d2aa47c5e 100644 --- a/src/history/CheckpointBuilder.cpp +++ b/src/history/CheckpointBuilder.cpp @@ -7,7 +7,7 @@ namespace stellar { -void +bool CheckpointBuilder::ensureOpen(uint32_t ledgerSeq) { ZoneScoped; @@ -17,6 +17,15 @@ CheckpointBuilder::ensureOpen(uint32_t ledgerSeq) releaseAssert(!mTxResults); releaseAssert(!mTxs); releaseAssert(!mLedgerHeaders); + // Don't start writing checkpoint until proper checkpoint boundary + // This can occur if a node enabled publish mid-checkpoint + if (mPublishWasDisabled && + !mApp.getHistoryManager().isFirstLedgerInCheckpoint(ledgerSeq)) + { + return false; + } + + mPublishWasDisabled = false; auto checkpoint = mApp.getHistoryManager().checkpointContainingLedger(ledgerSeq); @@ -41,6 +50,7 @@ CheckpointBuilder::ensureOpen(uint32_t ledgerSeq) mLedgerHeaders->open(ledger.localPath_nogz_dirty()); mOpen = true; } + return true; } void @@ -124,7 +134,11 @@ CheckpointBuilder::appendTransactionSet(uint32_t ledgerSeq, { throw std::runtime_error("Startup validation not performed"); } - ensureOpen(ledgerSeq); + + if (!ensureOpen(ledgerSeq)) + { + return; + } if (!resultSet.results.empty()) { @@ -147,7 +161,11 @@ CheckpointBuilder::appendLedgerHeader(LedgerHeader const& header, { throw std::runtime_error("Startup validation not performed"); } - ensureOpen(header.ledgerSeq); + + if (!ensureOpen(header.ledgerSeq)) + { + return; + } LedgerHeaderHistoryEntry lhe; lhe.header = header; @@ -225,17 +243,11 @@ CheckpointBuilder::cleanup(uint32_t lcl) if (!fs::exists(ft.localPath_nogz_dirty())) { - // No dirty file exists, nothing to do (this can only happen on a - // checkpoint boundary) - if (!mApp.getHistoryManager().isLastLedgerInCheckpoint(lcl)) - { - throw std::runtime_error( - fmt::format("Missing dirty checkpoint file {}", - ft.localPath_nogz_dirty())); - } CLOG_INFO(History, - "Skipping recovery of file {}, does not exist yet", + "Skipping recovery of file {}, does not exist. This can " + "occur if publish was previously disabled.", ft.localPath_nogz_dirty()); + mPublishWasDisabled = true; return; } diff --git a/src/history/CheckpointBuilder.h b/src/history/CheckpointBuilder.h index 9225e96ab0..3178bd6e49 100644 --- a/src/history/CheckpointBuilder.h +++ b/src/history/CheckpointBuilder.h @@ -57,8 +57,9 @@ class CheckpointBuilder std::unique_ptr mLedgerHeaders; bool mOpen{false}; bool mStartupValidationComplete{false}; + bool mPublishWasDisabled{false}; - void ensureOpen(uint32_t ledgerSeq); + bool ensureOpen(uint32_t ledgerSeq); public: CheckpointBuilder(Application& app); diff --git a/src/history/HistoryManagerImpl.cpp b/src/history/HistoryManagerImpl.cpp index b7d799dc4c..6c20c0d207 100644 --- a/src/history/HistoryManagerImpl.cpp +++ b/src/history/HistoryManagerImpl.cpp @@ -80,13 +80,13 @@ HistoryManager::createPublishQueueDir(Config const& cfg) std::filesystem::path publishQueueFileName(uint32_t seq) { - return fs::hexStr(seq) + ".json"; + return fs::hexStr(seq) + ".checkpoint"; } std::filesystem::path publishQueueTmpFileName(uint32_t seq) { - return fs::hexStr(seq) + ".json.dirty"; + return fs::hexStr(seq) + ".checkpoint.dirty"; } void @@ -271,7 +271,7 @@ HistoryManagerImpl::logAndUpdatePublishStatus() bool isPublishFile(std::string const& name) { - std::regex re("^[a-z0-9]{8}\\.json$"); + std::regex re("^[a-z0-9]{8}\\.checkpoint$"); auto a = regex_match(name, re); return a; } @@ -279,7 +279,7 @@ isPublishFile(std::string const& name) bool isPublishTmpFile(std::string const& name) { - std::regex re("^[a-z0-9]{8}\\.json.dirty$"); + std::regex re("^[a-z0-9]{8}\\.checkpoint.dirty$"); auto a = regex_match(name, re); return a; } diff --git a/src/main/CommandLine.cpp b/src/main/CommandLine.cpp index be58b992c2..e3c0610d24 100644 --- a/src/main/CommandLine.cpp +++ b/src/main/CommandLine.cpp @@ -39,6 +39,7 @@ #include #include #include +#include #ifdef BUILD_TESTS #include "simulation/ApplyLoad.h" From 5c63e7b84b31edf9298c551ace60b4b896caac88 Mon Sep 17 00:00:00 2001 From: marta-lokhova Date: Fri, 22 Nov 2024 12:09:29 -0800 Subject: [PATCH 2/3] Harden durability of publish files --- src/history/HistoryManagerImpl.cpp | 37 ++++++++++++++++++---- src/main/CommandLine.cpp | 22 +++++++++++++ src/main/dumpxdr.cpp | 2 +- src/util/BufferedAsioCerealOutputArchive.h | 1 + src/util/XDRStream.h | 15 +++++++-- 5 files changed, 66 insertions(+), 11 deletions(-) diff --git a/src/history/HistoryManagerImpl.cpp b/src/history/HistoryManagerImpl.cpp index 6c20c0d207..50c1153333 100644 --- a/src/history/HistoryManagerImpl.cpp +++ b/src/history/HistoryManagerImpl.cpp @@ -12,6 +12,10 @@ #include "crypto/Hex.h" #include "crypto/SHA.h" #include "herder/HerderImpl.h" +#include +#include +#include + #include "history/HistoryArchive.h" #include "history/HistoryArchiveManager.h" #include "history/HistoryManagerImpl.h" @@ -30,6 +34,7 @@ #include "overlay/StellarXDR.h" #include "process/ProcessManager.h" #include "transactions/TransactionSQL.h" +#include "util/BufferedAsioCerealOutputArchive.h" #include "util/GlobalChecks.h" #include "util/Logging.h" #include "util/Math.h" @@ -97,7 +102,13 @@ writeCheckpointFile(Application& app, HistoryArchiveState const& has, app.getHistoryManager().isLastLedgerInCheckpoint(has.currentLedger)); auto filename = publishQueueFileName(has.currentLedger); auto tmpOut = app.getHistoryManager().getTmpDir() / filename; - has.save(tmpOut.string()); + { + OutputFileStream out(app.getClock().getIOContext(), + /* fsyncOnClose */ true); + out.open(tmpOut.string()); + cereal::BufferedAsioOutputArchive ar(out); + has.serialize(ar); + } // Immediately produce a final checkpoint JSON (suitable for confirmed // ledgers) @@ -467,6 +478,22 @@ HistoryManagerImpl::takeSnapshotAndPublish(HistoryArchiveState const& has) "delay-publishing-to-archive", delayTimeout, publishWork); } +HistoryArchiveState +loadCheckpointHAS(std::string const& filename) +{ + HistoryArchiveState has; + std::ifstream in(filename, std::ios::binary); + if (!in) + { + throw std::runtime_error( + fmt::format(FMT_STRING("Error opening file {}"), filename)); + } + in.exceptions(std::ios::badbit); + cereal::BinaryInputArchive ar(in); + has.serialize(ar); + return has; +} + size_t HistoryManagerImpl::publishQueuedHistory() { @@ -485,17 +512,14 @@ HistoryManagerImpl::publishQueuedHistory() #endif ZoneScoped; - HistoryArchiveState has; auto seq = getMinLedgerQueuedToPublish(); - if (seq == std::numeric_limits::max()) { return 0; } auto file = publishQueuePath(mApp.getConfig()) / publishQueueFileName(seq); - has.load(file.string()); - takeSnapshotAndPublish(has); + takeSnapshotAndPublish(loadCheckpointHAS(file.string())); return 1; } @@ -541,8 +565,7 @@ HistoryManagerImpl::getPublishQueueStates() HistoryArchiveState has; auto fullPath = publishQueuePath(mApp.getConfig()) / f; - has.load(fullPath.string()); - states.push_back(has); + states.push_back(loadCheckpointHAS(fullPath)); }); return states; } diff --git a/src/main/CommandLine.cpp b/src/main/CommandLine.cpp index e3c0610d24..c35bce6d37 100644 --- a/src/main/CommandLine.cpp +++ b/src/main/CommandLine.cpp @@ -1385,6 +1385,26 @@ getSettingsUpgradeTransactions(CommandLineArgs const& args) }); } +int +runPrintPublishQueue(CommandLineArgs const& args) +{ + CommandLine::ConfigOption configOption; + + return runWithHelp(args, {configurationParser(configOption)}, [&] { + auto cfg = configOption.getConfig(); + VirtualClock clock(VirtualClock::REAL_TIME); + cfg.setNoListen(); + Application::pointer app = Application::create(clock, cfg, false); + cereal::JSONOutputArchive archive(std::cout); + archive.makeArray(); + for (auto const& has : app->getHistoryManager().getPublishQueueStates()) + { + has.serialize(archive); + } + return 0; + }); +} + int runCheckQuorumIntersection(CommandLineArgs const& args) { @@ -2058,6 +2078,8 @@ handleCommandLine(int argc, char* const* argv) "check that a given network specified as a JSON file enjoys a quorum " "intersection", runCheckQuorumIntersection}, + {"print-publish-queue", "print all checkpoints scheduled for publish", + runPrintPublishQueue}, #ifdef BUILD_TESTS {"load-xdr", "load an XDR bucket file, for testing", runLoadXDR}, {"rebuild-ledger-from-buckets", diff --git a/src/main/dumpxdr.cpp b/src/main/dumpxdr.cpp index 5c59f783a7..d077501d65 100644 --- a/src/main/dumpxdr.cpp +++ b/src/main/dumpxdr.cpp @@ -66,7 +66,7 @@ void dumpXdrStream(std::string const& filename, bool compact) { std::regex rx( - R"(.*\b(debug-tx-set|(?:(ledger|bucket|transactions|results|meta-debug|scp)-.+))\.xdr$)"); + R"(.*\b(debug-tx-set|(?:(ledger|bucket|transactions|results|meta-debug|scp)-.+))\.xdr(?:\.dirty)?$)"); std::smatch sm; if (std::regex_match(filename, sm, rx)) { diff --git a/src/util/BufferedAsioCerealOutputArchive.h b/src/util/BufferedAsioCerealOutputArchive.h index f79913ddfc..6c32b0a7cc 100644 --- a/src/util/BufferedAsioCerealOutputArchive.h +++ b/src/util/BufferedAsioCerealOutputArchive.h @@ -3,6 +3,7 @@ #include "util/XDRStream.h" #include #include +#include namespace cereal { diff --git a/src/util/XDRStream.h b/src/util/XDRStream.h index 69ab747057..583dd211c4 100644 --- a/src/util/XDRStream.h +++ b/src/util/XDRStream.h @@ -224,8 +224,16 @@ class XDRInputFileStream } }; -// OutputFileStream needs access to a file descriptor to do fsync, so we use -// asio's synchronous stream types here rather than fstreams. +/* +IMPORTANT: some areas of core require durable writes that +are resistant to application and system crashes. If you need durable writes: +1. Use a stream implementation that supports fsync, e.g. OutputFileStream +2. Write to a temp file first. If you don't intent to persist temp files across +runs, fsyncing on close is sufficient. Otherwise, use durableWriteOne to flush +and fsync after every write. +3. Close the temp stream to make sure flush and fsync are called. +4. Rename the temp file to the final location using durableRename. +*/ class OutputFileStream { protected: @@ -237,7 +245,8 @@ class OutputFileStream fs::native_handle_t mHandle; FILE* mOut{nullptr}; #else - // buffered stream + // use buffered stream which supports accessing a file descriptor needed to + // fsync asio::buffered_write_stream mBufferedWriteStream; #endif From 71b31edab7a514e471032f280b49d479f31fc793 Mon Sep 17 00:00:00 2001 From: marta-lokhova Date: Fri, 22 Nov 2024 14:03:43 -0800 Subject: [PATCH 3/3] Allow disabling fsync in tests --- src/history/HistoryManagerImpl.cpp | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/history/HistoryManagerImpl.cpp b/src/history/HistoryManagerImpl.cpp index 50c1153333..92196023c5 100644 --- a/src/history/HistoryManagerImpl.cpp +++ b/src/history/HistoryManagerImpl.cpp @@ -103,8 +103,16 @@ writeCheckpointFile(Application& app, HistoryArchiveState const& has, auto filename = publishQueueFileName(has.currentLedger); auto tmpOut = app.getHistoryManager().getTmpDir() / filename; { - OutputFileStream out(app.getClock().getIOContext(), - /* fsyncOnClose */ true); + // Always fsync in prod paths, but allow disabling for tests for + // performance + OutputFileStream out( + app.getClock().getIOContext(), +#ifdef BUILD_TESTS + /* fsyncOnClose */ !app.getConfig().DISABLE_XDR_FSYNC +#else + /* fsyncOnClose */ true +#endif + ); out.open(tmpOut.string()); cereal::BufferedAsioOutputArchive ar(out); has.serialize(ar);