Skip to content

Commit

Permalink
fsync on BucketIndex writes
Browse files Browse the repository at this point in the history
  • Loading branch information
SirTyson committed Nov 20, 2024
1 parent 984a2b2 commit 62efd1f
Show file tree
Hide file tree
Showing 9 changed files with 191 additions and 63 deletions.
7 changes: 6 additions & 1 deletion src/bucket/BucketIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@

#include <cereal/archives/binary.hpp>

namespace asio
{
class io_context;
}

namespace stellar
{

Expand Down Expand Up @@ -87,7 +92,7 @@ class BucketIndex : public NonMovableOrCopyable
template <class BucketT>
static std::unique_ptr<BucketIndex const>
createIndex(BucketManager& bm, std::filesystem::path const& filename,
Hash const& hash);
Hash const& hash, asio::io_context& ctx);

// Loads index from given file. If file does not exist or if saved
// index does not have same parameters as current config, return null
Expand Down
29 changes: 16 additions & 13 deletions src/bucket/BucketIndexImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "ledger/LedgerTypeUtils.h"
#include "main/Config.h"
#include "util/BinaryFuseFilter.h"
#include "util/BufferedAsioCerealOutputArchive.h"
#include "util/Fs.h"
#include "util/LogSlowExecution.h"
#include "util/Logging.h"
Expand Down Expand Up @@ -74,6 +75,7 @@ BucketIndexImpl<IndexT>::BucketIndexImpl(BucketManager& bm,
std::filesystem::path const& filename,
std::streamoff pageSize,
Hash const& hash,
asio::io_context& ctx,
BucketEntryT const& typeTag)
: mBloomMissMeter(bm.getBloomMissMeter())
, mBloomLookupMeter(bm.getBloomLookupMeter())
Expand Down Expand Up @@ -104,7 +106,7 @@ BucketIndexImpl<IndexT>::BucketIndexImpl(BucketManager& bm,
std::streamoff pageUpperBound = 0;
BucketEntryT be;
size_t iter = 0;
size_t count = 0;
[[maybe_unused]] size_t count = 0;

std::vector<uint64_t> keyHashes;
auto seed = shortHash::getShortHashInitKey();
Expand Down Expand Up @@ -233,7 +235,7 @@ BucketIndexImpl<IndexT>::BucketIndexImpl(BucketManager& bm,

if (bm.getConfig().isPersistingBucketListDBIndexes())
{
saveToDisk(bm, hash);
saveToDisk(bm, hash, ctx);
}
}

Expand All @@ -242,14 +244,14 @@ BucketIndexImpl<IndexT>::BucketIndexImpl(BucketManager& bm,
template <>
void
BucketIndexImpl<BucketIndex::IndividualIndex>::saveToDisk(
BucketManager& bm, Hash const& hash) const
BucketManager& bm, Hash const& hash, asio::io_context& ctx) const
{
}

template <>
void
BucketIndexImpl<BucketIndex::RangeIndex>::saveToDisk(BucketManager& bm,
Hash const& hash) const
BucketIndexImpl<BucketIndex::RangeIndex>::saveToDisk(
BucketManager& bm, Hash const& hash, asio::io_context& ctx) const
{
ZoneScoped;
releaseAssert(bm.getConfig().isPersistingBucketListDBIndexes());
Expand All @@ -263,10 +265,9 @@ BucketIndexImpl<BucketIndex::RangeIndex>::saveToDisk(BucketManager& bm,
tmpFilename);

{
std::ofstream out;
out.exceptions(std::ios::failbit | std::ios::badbit);
out.open(tmpFilename, std::ios_base::binary | std::ios_base::trunc);
cereal::BinaryOutputArchive ar(out);
OutputFileStream out(ctx, !bm.getConfig().DISABLE_XDR_FSYNC);
out.open(tmpFilename);
cereal::BufferedAsioOutputArchive ar(out);
ar(mData);
}

Expand Down Expand Up @@ -360,7 +361,7 @@ template <class BucketT>
std::unique_ptr<BucketIndex const>
BucketIndex::createIndex(BucketManager& bm,
std::filesystem::path const& filename,
Hash const& hash)
Hash const& hash, asio::io_context& ctx)
{
BUCKET_TYPE_ASSERT(BucketT);

Expand All @@ -380,7 +381,7 @@ BucketIndex::createIndex(BucketManager& bm,
filename);
return std::unique_ptr<BucketIndexImpl<IndividualIndex> const>(
new BucketIndexImpl<IndividualIndex>(
bm, filename, 0, hash, typename BucketT::EntryT{}));
bm, filename, 0, hash, ctx, typename BucketT::EntryT{}));
}
else
{
Expand All @@ -391,6 +392,7 @@ BucketIndex::createIndex(BucketManager& bm,
pageSize, filename);
return std::unique_ptr<BucketIndexImpl<RangeIndex> const>(
new BucketIndexImpl<RangeIndex>(bm, filename, pageSize, hash,
ctx,
typename BucketT::EntryT{}));
}
}
Expand Down Expand Up @@ -642,8 +644,9 @@ BucketIndexImpl<IndexT>::getBucketEntryCounters() const
template std::unique_ptr<BucketIndex const>
BucketIndex::createIndex<LiveBucket>(BucketManager& bm,
std::filesystem::path const& filename,
Hash const& hash);
Hash const& hash, asio::io_context& ctx);
template std::unique_ptr<BucketIndex const>
BucketIndex::createIndex<HotArchiveBucket>(
BucketManager& bm, std::filesystem::path const& filename, Hash const& hash);
BucketManager& bm, std::filesystem::path const& filename, Hash const& hash,
asio::io_context& ctx);
}
6 changes: 4 additions & 2 deletions src/bucket/BucketIndexImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "util/BinaryFuseFilter.h"
#include "xdr/Stellar-types.h"

#include "util/BufferedAsioCerealOutputArchive.h"
#include <cereal/types/map.hpp>
#include <map>
#include <memory>
Expand Down Expand Up @@ -65,14 +66,15 @@ template <class IndexT> class BucketIndexImpl : public BucketIndex
template <class BucketEntryT>
BucketIndexImpl(BucketManager& bm, std::filesystem::path const& filename,
std::streamoff pageSize, Hash const& hash,
BucketEntryT const& typeTag);
asio::io_context& ctx, BucketEntryT const& typeTag);

template <class Archive>
BucketIndexImpl(BucketManager const& bm, Archive& ar,
std::streamoff pageSize);

// Saves index to disk, overwriting any preexisting file for this index
void saveToDisk(BucketManager& bm, Hash const& hash) const;
void saveToDisk(BucketManager& bm, Hash const& hash,
asio::io_context& ctx) const;

// Returns [lowFileOffset, highFileOffset) that contain the key ranges
// [lowerBound, upperBound]. If no file offsets exist, returns [0, 0]
Expand Down
4 changes: 3 additions & 1 deletion src/bucket/BucketOutputIterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ BucketOutputIterator<BucketT>::BucketOutputIterator(std::string const& tmpDir,
bool doFsync)
: mFilename(BucketBase::randomBucketName(tmpDir))
, mOut(ctx, doFsync)
, mCtx(ctx)
, mBuf(nullptr)
, mKeepTombstoneEntries(keepTombstoneEntries)
, mMeta(meta)
Expand Down Expand Up @@ -199,7 +200,8 @@ BucketOutputIterator<BucketT>::getBucket(BucketManager& bucketManager,
!b || !b->isIndexed())
{
index = BucketIndex::createIndex<BucketT>(bucketManager, mFilename,
hash);
hash, mCtx);
releaseAssertOrThrow(index);
}
}

Expand Down
1 change: 1 addition & 0 deletions src/bucket/BucketOutputIterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ template <typename BucketT> class BucketOutputIterator
protected:
std::filesystem::path mFilename;
XDROutputFileStream mOut;
asio::io_context& mCtx;
BucketEntryIdCmp<BucketT> mCmp;
std::unique_ptr<typename BucketT::EntryT> mBuf;
SHA256 mHasher;
Expand Down
26 changes: 20 additions & 6 deletions src/catchup/IndexBucketsWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ IndexBucketsWork::IndexWork::IndexWork(Application& app,
BasicWork::State
IndexBucketsWork::IndexWork::onRun()
{
if (mFailed)
{
return State::WORK_FAILURE;
}

if (mDone)
{
return State::WORK_SUCCESS;
Expand All @@ -40,11 +45,12 @@ void
IndexBucketsWork::IndexWork::postWork()
{
Application& app = this->mApp;
asio::io_context& ctx = app.getWorkerIOContext();

std::weak_ptr<IndexWork> weak(
std::static_pointer_cast<IndexWork>(shared_from_this()));
app.postOnBackgroundThread(
[&app, weak]() {
[&app, &ctx, weak]() {
auto self = weak.lock();
if (!self || self->isAborting())
{
Expand Down Expand Up @@ -80,19 +86,27 @@ IndexBucketsWork::IndexWork::postWork()
{
// TODO: Fix this when archive BucketLists assume state
self->mIndex = BucketIndex::createIndex<LiveBucket>(
bm, self->mBucket->getFilename(), self->mBucket->getHash());
bm, self->mBucket->getFilename(), self->mBucket->getHash(),
ctx);
}

app.postOnMainThread(
[weak]() {
auto self = weak.lock();
if (self)
{
self->mDone = true;
if (!self->isAborting())
if (self->mIndex)
{
self->mDone = true;
if (!self->isAborting())
{
self->mApp.getBucketManager().maybeSetIndex(
self->mBucket, std::move(self->mIndex));
}
}
else
{
self->mApp.getBucketManager().maybeSetIndex(
self->mBucket, std::move(self->mIndex));
self->mFailed = true;
}
self->wakeUp();
}
Expand Down
1 change: 1 addition & 0 deletions src/catchup/IndexBucketsWork.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class IndexBucketsWork : public Work
std::shared_ptr<LiveBucket> mBucket;
std::unique_ptr<BucketIndex const> mIndex;
bool mDone{false};
bool mFailed{false};

void postWork();

Expand Down
81 changes: 81 additions & 0 deletions src/util/BufferedAsioCerealOutputArchive.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#pragma once

#include "util/XDRStream.h"
#include <cereal/archives/binary.hpp>
#include <cereal/cereal.hpp>

namespace cereal
{

// Mirrors CEREAL_ARCHIVE_RESTRICT from cereal/details/traits.hpp for single
// types
#define CEREAL_ARCHIVE_RESTRICT_SINGLE_TYPE(TYPE) \
typename std::enable_if< \
cereal::traits::is_same_archive<Archive, TYPE>::value, void>::type

// This is a basic reimplementation of BinaryOutputArchive
// (cereal/archives/binary.hpp) that uses our own OutputFileStream instead of
// std::ofstream for writes in order to support fsync. For input we can just use
// cereal's BinaryInputArchive because we don't care about fsync for reads.
class BufferedAsioOutputArchive
: public OutputArchive<BufferedAsioOutputArchive, AllowEmptyClassElision>
{
public:
// Construct, outputting to the provided stream
// @param stream The stream to output to. Can be a stringstream, a file
// stream, or even cout!
BufferedAsioOutputArchive(stellar::OutputFileStream& stream)
: OutputArchive<BufferedAsioOutputArchive, AllowEmptyClassElision>(this)
, itsStream(stream)
{
}

~BufferedAsioOutputArchive() CEREAL_NOEXCEPT = default;

// Writes size bytes of data to the output stream
void
saveBinary(const void* data, std::streamsize size)
{
itsStream.writeBytes(static_cast<char const*>(data), size);
}

private:
stellar::OutputFileStream& itsStream;
};

// Saving for POD types to binary
template <class T>
inline typename std::enable_if<std::is_arithmetic<T>::value, void>::type
CEREAL_SAVE_FUNCTION_NAME(BufferedAsioOutputArchive& ar, T const& t)
{
ar.saveBinary(std::addressof(t), sizeof(t));
}

// Serializing NVP types to binary
template <class Archive, class T>

inline CEREAL_ARCHIVE_RESTRICT_SINGLE_TYPE(BufferedAsioOutputArchive)
CEREAL_SERIALIZE_FUNCTION_NAME(Archive& ar, NameValuePair<T>& t)
{
ar(t.value);
}

// Serializing SizeTags to binary
template <class Archive, class T>
inline CEREAL_ARCHIVE_RESTRICT_SINGLE_TYPE(BufferedAsioOutputArchive)
CEREAL_SERIALIZE_FUNCTION_NAME(Archive& ar, SizeTag<T>& t)
{
ar(t.size);
}

// Saving binary data
template <class T>
inline void
CEREAL_SAVE_FUNCTION_NAME(BufferedAsioOutputArchive& ar,
BinaryData<T> const& bd)
{
ar.saveBinary(bd.data, static_cast<std::streamsize>(bd.size));
}
}

CEREAL_REGISTER_ARCHIVE(cereal::BufferedAsioOutputArchive)
Loading

0 comments on commit 62efd1f

Please sign in to comment.