From 62efd1f903412cb3a1d8f3b761da86deef3b0266 Mon Sep 17 00:00:00 2001 From: Garand Tyson Date: Wed, 20 Nov 2024 11:17:45 -0800 Subject: [PATCH] fsync on BucketIndex writes --- src/bucket/BucketIndex.h | 7 +- src/bucket/BucketIndexImpl.cpp | 29 ++++--- src/bucket/BucketIndexImpl.h | 6 +- src/bucket/BucketOutputIterator.cpp | 4 +- src/bucket/BucketOutputIterator.h | 1 + src/catchup/IndexBucketsWork.cpp | 26 ++++-- src/catchup/IndexBucketsWork.h | 1 + src/util/BufferedAsioCerealOutputArchive.h | 81 ++++++++++++++++++ src/util/XDRStream.h | 99 +++++++++++++--------- 9 files changed, 191 insertions(+), 63 deletions(-) create mode 100644 src/util/BufferedAsioCerealOutputArchive.h diff --git a/src/bucket/BucketIndex.h b/src/bucket/BucketIndex.h index 83f42424c0..ee15031fc1 100644 --- a/src/bucket/BucketIndex.h +++ b/src/bucket/BucketIndex.h @@ -15,6 +15,11 @@ #include +namespace asio +{ +class io_context; +} + namespace stellar { @@ -87,7 +92,7 @@ class BucketIndex : public NonMovableOrCopyable template static std::unique_ptr createIndex(BucketManager& bm, std::filesystem::path const& filename, - Hash const& hash); + Hash const& hash, asio::io_context& ctx); // Loads index from given file. If file does not exist or if saved // index does not have same parameters as current config, return null diff --git a/src/bucket/BucketIndexImpl.cpp b/src/bucket/BucketIndexImpl.cpp index eef7638fed..762d481687 100644 --- a/src/bucket/BucketIndexImpl.cpp +++ b/src/bucket/BucketIndexImpl.cpp @@ -12,6 +12,7 @@ #include "ledger/LedgerTypeUtils.h" #include "main/Config.h" #include "util/BinaryFuseFilter.h" +#include "util/BufferedAsioCerealOutputArchive.h" #include "util/Fs.h" #include "util/LogSlowExecution.h" #include "util/Logging.h" @@ -74,6 +75,7 @@ BucketIndexImpl::BucketIndexImpl(BucketManager& bm, std::filesystem::path const& filename, std::streamoff pageSize, Hash const& hash, + asio::io_context& ctx, BucketEntryT const& typeTag) : mBloomMissMeter(bm.getBloomMissMeter()) , mBloomLookupMeter(bm.getBloomLookupMeter()) @@ -104,7 +106,7 @@ BucketIndexImpl::BucketIndexImpl(BucketManager& bm, std::streamoff pageUpperBound = 0; BucketEntryT be; size_t iter = 0; - size_t count = 0; + [[maybe_unused]] size_t count = 0; std::vector keyHashes; auto seed = shortHash::getShortHashInitKey(); @@ -233,7 +235,7 @@ BucketIndexImpl::BucketIndexImpl(BucketManager& bm, if (bm.getConfig().isPersistingBucketListDBIndexes()) { - saveToDisk(bm, hash); + saveToDisk(bm, hash, ctx); } } @@ -242,14 +244,14 @@ BucketIndexImpl::BucketIndexImpl(BucketManager& bm, template <> void BucketIndexImpl::saveToDisk( - BucketManager& bm, Hash const& hash) const + BucketManager& bm, Hash const& hash, asio::io_context& ctx) const { } template <> void -BucketIndexImpl::saveToDisk(BucketManager& bm, - Hash const& hash) const +BucketIndexImpl::saveToDisk( + BucketManager& bm, Hash const& hash, asio::io_context& ctx) const { ZoneScoped; releaseAssert(bm.getConfig().isPersistingBucketListDBIndexes()); @@ -263,10 +265,9 @@ BucketIndexImpl::saveToDisk(BucketManager& bm, tmpFilename); { - std::ofstream out; - out.exceptions(std::ios::failbit | std::ios::badbit); - out.open(tmpFilename, std::ios_base::binary | std::ios_base::trunc); - cereal::BinaryOutputArchive ar(out); + OutputFileStream out(ctx, !bm.getConfig().DISABLE_XDR_FSYNC); + out.open(tmpFilename); + cereal::BufferedAsioOutputArchive ar(out); ar(mData); } @@ -360,7 +361,7 @@ template std::unique_ptr BucketIndex::createIndex(BucketManager& bm, std::filesystem::path const& filename, - Hash const& hash) + Hash const& hash, asio::io_context& ctx) { BUCKET_TYPE_ASSERT(BucketT); @@ -380,7 +381,7 @@ BucketIndex::createIndex(BucketManager& bm, filename); return std::unique_ptr const>( new BucketIndexImpl( - bm, filename, 0, hash, typename BucketT::EntryT{})); + bm, filename, 0, hash, ctx, typename BucketT::EntryT{})); } else { @@ -391,6 +392,7 @@ BucketIndex::createIndex(BucketManager& bm, pageSize, filename); return std::unique_ptr const>( new BucketIndexImpl(bm, filename, pageSize, hash, + ctx, typename BucketT::EntryT{})); } } @@ -642,8 +644,9 @@ BucketIndexImpl::getBucketEntryCounters() const template std::unique_ptr BucketIndex::createIndex(BucketManager& bm, std::filesystem::path const& filename, - Hash const& hash); + Hash const& hash, asio::io_context& ctx); template std::unique_ptr BucketIndex::createIndex( - BucketManager& bm, std::filesystem::path const& filename, Hash const& hash); + BucketManager& bm, std::filesystem::path const& filename, Hash const& hash, + asio::io_context& ctx); } diff --git a/src/bucket/BucketIndexImpl.h b/src/bucket/BucketIndexImpl.h index 4a42c2eb46..52630a70e6 100644 --- a/src/bucket/BucketIndexImpl.h +++ b/src/bucket/BucketIndexImpl.h @@ -10,6 +10,7 @@ #include "util/BinaryFuseFilter.h" #include "xdr/Stellar-types.h" +#include "util/BufferedAsioCerealOutputArchive.h" #include #include #include @@ -65,14 +66,15 @@ template class BucketIndexImpl : public BucketIndex template BucketIndexImpl(BucketManager& bm, std::filesystem::path const& filename, std::streamoff pageSize, Hash const& hash, - BucketEntryT const& typeTag); + asio::io_context& ctx, BucketEntryT const& typeTag); template BucketIndexImpl(BucketManager const& bm, Archive& ar, std::streamoff pageSize); // Saves index to disk, overwriting any preexisting file for this index - void saveToDisk(BucketManager& bm, Hash const& hash) const; + void saveToDisk(BucketManager& bm, Hash const& hash, + asio::io_context& ctx) const; // Returns [lowFileOffset, highFileOffset) that contain the key ranges // [lowerBound, upperBound]. If no file offsets exist, returns [0, 0] diff --git a/src/bucket/BucketOutputIterator.cpp b/src/bucket/BucketOutputIterator.cpp index 36d3521991..7dc1b98359 100644 --- a/src/bucket/BucketOutputIterator.cpp +++ b/src/bucket/BucketOutputIterator.cpp @@ -29,6 +29,7 @@ BucketOutputIterator::BucketOutputIterator(std::string const& tmpDir, bool doFsync) : mFilename(BucketBase::randomBucketName(tmpDir)) , mOut(ctx, doFsync) + , mCtx(ctx) , mBuf(nullptr) , mKeepTombstoneEntries(keepTombstoneEntries) , mMeta(meta) @@ -199,7 +200,8 @@ BucketOutputIterator::getBucket(BucketManager& bucketManager, !b || !b->isIndexed()) { index = BucketIndex::createIndex(bucketManager, mFilename, - hash); + hash, mCtx); + releaseAssertOrThrow(index); } } diff --git a/src/bucket/BucketOutputIterator.h b/src/bucket/BucketOutputIterator.h index 8131bef0cd..b5b9cd07a7 100644 --- a/src/bucket/BucketOutputIterator.h +++ b/src/bucket/BucketOutputIterator.h @@ -27,6 +27,7 @@ template class BucketOutputIterator protected: std::filesystem::path mFilename; XDROutputFileStream mOut; + asio::io_context& mCtx; BucketEntryIdCmp mCmp; std::unique_ptr mBuf; SHA256 mHasher; diff --git a/src/catchup/IndexBucketsWork.cpp b/src/catchup/IndexBucketsWork.cpp index 51c85d6e30..79913bb5e0 100644 --- a/src/catchup/IndexBucketsWork.cpp +++ b/src/catchup/IndexBucketsWork.cpp @@ -21,6 +21,11 @@ IndexBucketsWork::IndexWork::IndexWork(Application& app, BasicWork::State IndexBucketsWork::IndexWork::onRun() { + if (mFailed) + { + return State::WORK_FAILURE; + } + if (mDone) { return State::WORK_SUCCESS; @@ -40,11 +45,12 @@ void IndexBucketsWork::IndexWork::postWork() { Application& app = this->mApp; + asio::io_context& ctx = app.getWorkerIOContext(); std::weak_ptr weak( std::static_pointer_cast(shared_from_this())); app.postOnBackgroundThread( - [&app, weak]() { + [&app, &ctx, weak]() { auto self = weak.lock(); if (!self || self->isAborting()) { @@ -80,7 +86,8 @@ IndexBucketsWork::IndexWork::postWork() { // TODO: Fix this when archive BucketLists assume state self->mIndex = BucketIndex::createIndex( - bm, self->mBucket->getFilename(), self->mBucket->getHash()); + bm, self->mBucket->getFilename(), self->mBucket->getHash(), + ctx); } app.postOnMainThread( @@ -88,11 +95,18 @@ IndexBucketsWork::IndexWork::postWork() auto self = weak.lock(); if (self) { - self->mDone = true; - if (!self->isAborting()) + if (self->mIndex) + { + self->mDone = true; + if (!self->isAborting()) + { + self->mApp.getBucketManager().maybeSetIndex( + self->mBucket, std::move(self->mIndex)); + } + } + else { - self->mApp.getBucketManager().maybeSetIndex( - self->mBucket, std::move(self->mIndex)); + self->mFailed = true; } self->wakeUp(); } diff --git a/src/catchup/IndexBucketsWork.h b/src/catchup/IndexBucketsWork.h index ed44289c4e..284a36cf24 100644 --- a/src/catchup/IndexBucketsWork.h +++ b/src/catchup/IndexBucketsWork.h @@ -22,6 +22,7 @@ class IndexBucketsWork : public Work std::shared_ptr mBucket; std::unique_ptr mIndex; bool mDone{false}; + bool mFailed{false}; void postWork(); diff --git a/src/util/BufferedAsioCerealOutputArchive.h b/src/util/BufferedAsioCerealOutputArchive.h new file mode 100644 index 0000000000..f79913ddfc --- /dev/null +++ b/src/util/BufferedAsioCerealOutputArchive.h @@ -0,0 +1,81 @@ +#pragma once + +#include "util/XDRStream.h" +#include +#include + +namespace cereal +{ + +// Mirrors CEREAL_ARCHIVE_RESTRICT from cereal/details/traits.hpp for single +// types +#define CEREAL_ARCHIVE_RESTRICT_SINGLE_TYPE(TYPE) \ + typename std::enable_if< \ + cereal::traits::is_same_archive::value, void>::type + +// This is a basic reimplementation of BinaryOutputArchive +// (cereal/archives/binary.hpp) that uses our own OutputFileStream instead of +// std::ofstream for writes in order to support fsync. For input we can just use +// cereal's BinaryInputArchive because we don't care about fsync for reads. +class BufferedAsioOutputArchive + : public OutputArchive +{ + public: + // Construct, outputting to the provided stream + // @param stream The stream to output to. Can be a stringstream, a file + // stream, or even cout! + BufferedAsioOutputArchive(stellar::OutputFileStream& stream) + : OutputArchive(this) + , itsStream(stream) + { + } + + ~BufferedAsioOutputArchive() CEREAL_NOEXCEPT = default; + + // Writes size bytes of data to the output stream + void + saveBinary(const void* data, std::streamsize size) + { + itsStream.writeBytes(static_cast(data), size); + } + + private: + stellar::OutputFileStream& itsStream; +}; + +// Saving for POD types to binary +template +inline typename std::enable_if::value, void>::type +CEREAL_SAVE_FUNCTION_NAME(BufferedAsioOutputArchive& ar, T const& t) +{ + ar.saveBinary(std::addressof(t), sizeof(t)); +} + +// Serializing NVP types to binary +template + +inline CEREAL_ARCHIVE_RESTRICT_SINGLE_TYPE(BufferedAsioOutputArchive) + CEREAL_SERIALIZE_FUNCTION_NAME(Archive& ar, NameValuePair& t) +{ + ar(t.value); +} + +// Serializing SizeTags to binary +template +inline CEREAL_ARCHIVE_RESTRICT_SINGLE_TYPE(BufferedAsioOutputArchive) + CEREAL_SERIALIZE_FUNCTION_NAME(Archive& ar, SizeTag& t) +{ + ar(t.size); +} + +// Saving binary data +template +inline void +CEREAL_SAVE_FUNCTION_NAME(BufferedAsioOutputArchive& ar, + BinaryData const& bd) +{ + ar.saveBinary(bd.data, static_cast(bd.size)); +} +} + +CEREAL_REGISTER_ARCHIVE(cereal::BufferedAsioOutputArchive) diff --git a/src/util/XDRStream.h b/src/util/XDRStream.h index 8187d6d191..69ab747057 100644 --- a/src/util/XDRStream.h +++ b/src/util/XDRStream.h @@ -224,10 +224,11 @@ class XDRInputFileStream } }; -// XDROutputFileStream needs access to a file descriptor to do fsync, so we use +// OutputFileStream needs access to a file descriptor to do fsync, so we use // asio's synchronous stream types here rather than fstreams. -class XDROutputFileStream +class OutputFileStream { + protected: std::vector mBuf; const bool mFsyncOnClose; @@ -241,7 +242,7 @@ class XDROutputFileStream #endif public: - XDROutputFileStream(asio::io_context& ctx, bool fsyncOnClose) + OutputFileStream(asio::io_context& ctx, bool fsyncOnClose) : mFsyncOnClose(fsyncOnClose) #ifndef WIN32 , mBufferedWriteStream(ctx, stellar::fs::bufsz()) @@ -249,7 +250,7 @@ class XDROutputFileStream { } - ~XDROutputFileStream() + ~OutputFileStream() { if (isOpen()) { @@ -377,50 +378,21 @@ class XDROutputFileStream return isOpen(); } - template - void - durableWriteOne(T const& t, SHA256* hasher = nullptr, - size_t* bytesPut = nullptr) - { - writeOne(t, hasher, bytesPut); - flush(); - fs::flushFileChanges(getHandle()); - } - - template void - writeOne(T const& t, SHA256* hasher = nullptr, size_t* bytesPut = nullptr) + writeBytes(char const* buf, size_t const sizeBytes) { ZoneScoped; if (!isOpen()) { FileSystemException::failWith( - "XDROutputFileStream::writeOne() on non-open stream"); - } - - uint32_t sz = (uint32_t)xdr::xdr_size(t); - releaseAssertOrThrow(sz < 0x80000000); - - if (mBuf.size() < sz + 4) - { - mBuf.resize(sz + 4); + "OutputFileStream::writeBytes() on non-open stream"); } - // Write 4 bytes of size, big-endian, with XDR 'continuation' bit set on - // high bit of high byte. - mBuf[0] = static_cast((sz >> 24) & 0xFF) | '\x80'; - mBuf[1] = static_cast((sz >> 16) & 0xFF); - mBuf[2] = static_cast((sz >> 8) & 0xFF); - mBuf[3] = static_cast(sz & 0xFF); - xdr::xdr_put p(mBuf.data() + 4, mBuf.data() + 4 + sz); - xdr_argpack_archive(p, t); - - size_t const to_write = sz + 4; size_t written = 0; - while (written < to_write) + while (written < sizeBytes) { #ifdef WIN32 - auto w = fwrite(mBuf.data() + written, 1, to_write - written, mOut); + auto w = fwrite(buf + written, 1, sizeBytes - written, mOut); if (w == 0) { FileSystemException::failWith( @@ -429,8 +401,8 @@ class XDROutputFileStream written += w; #else asio::error_code ec; - auto buf = asio::buffer(mBuf.data() + written, to_write - written); - written += asio::write(mBufferedWriteStream, buf, ec); + auto asioBuf = asio::buffer(buf + written, sizeBytes - written); + written += asio::write(mBufferedWriteStream, asioBuf, ec); if (ec) { if (ec == asio::error::interrupted) @@ -447,9 +419,56 @@ class XDROutputFileStream } #endif } + } +}; + +class XDROutputFileStream : public OutputFileStream +{ + public: + XDROutputFileStream(asio::io_context& ctx, bool fsyncOnClose) + : OutputFileStream(ctx, fsyncOnClose) + { + } + + template + void + durableWriteOne(T const& t, SHA256* hasher = nullptr, + size_t* bytesPut = nullptr) + { + writeOne(t, hasher, bytesPut); + flush(); + fs::flushFileChanges(getHandle()); + } + + template + void + writeOne(T const& t, SHA256* hasher = nullptr, size_t* bytesPut = nullptr) + { + ZoneScoped; + uint32_t sz = (uint32_t)xdr::xdr_size(t); + releaseAssertOrThrow(sz < 0x80000000); + + if (mBuf.size() < sz + 4) + { + mBuf.resize(sz + 4); + } + + // Write 4 bytes of size, big-endian, with XDR 'continuation' bit set on + // high bit of high byte. + mBuf[0] = static_cast((sz >> 24) & 0xFF) | '\x80'; + mBuf[1] = static_cast((sz >> 16) & 0xFF); + mBuf[2] = static_cast((sz >> 8) & 0xFF); + mBuf[3] = static_cast(sz & 0xFF); + xdr::xdr_put p(mBuf.data() + 4, mBuf.data() + 4 + sz); + xdr_argpack_archive(p, t); + + // Buffer is 4 bytes of encoded size, followed by encoded object + size_t const toWrite = sz + 4; + writeBytes(mBuf.data(), toWrite); + if (hasher) { - hasher->add(ByteSlice(mBuf.data(), sz + 4)); + hasher->add(ByteSlice(mBuf.data(), toWrite)); } if (bytesPut) {