Skip to content

Commit

Permalink
merge bitcoin#16981: Improve runtime performance of --reindex
Browse files Browse the repository at this point in the history
  • Loading branch information
kwvg committed Aug 6, 2024
1 parent 984bb19 commit ee16fa1
Show file tree
Hide file tree
Showing 6 changed files with 238 additions and 37 deletions.
1 change: 1 addition & 0 deletions src/Makefile.bench.include
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ bench_bench_dash_SOURCES = \
bench/ccoins_caching.cpp \
bench/gcs_filter.cpp \
bench/hashpadding.cpp \
bench/load_external.cpp \
bench/merkle_root.cpp \
bench/mempool_eviction.cpp \
bench/mempool_stress.cpp \
Expand Down
63 changes: 63 additions & 0 deletions src/bench/load_external.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright (c) 2022 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or https://www.opensource.org/licenses/mit-license.php.

#include <bench/bench.h>
#include <bench/data.h>
#include <chainparams.h>
#include <test/util/setup_common.h>
#include <validation.h>

/**
* The LoadExternalBlockFile() function is used during -reindex and -loadblock.
*
* Create a test file that's similar to a datadir/blocks/blk?????.dat file,
* It contains around 134 copies of the same block (typical size of real block files).
* For each block in the file, LoadExternalBlockFile() won't find its parent,
* and so will skip the block. (In the real system, it will re-read the block
* from disk later when it encounters its parent.)
*
* This benchmark measures the performance of deserializing the block (or just
* its header, beginning with PR 16981).
*/
static void LoadExternalBlockFile(benchmark::Bench& bench)
{
const auto testing_setup{MakeNoLogFileContext<const TestingSetup>(CBaseChainParams::MAIN)};

// Create a single block as in the blocks files (magic bytes, block size,
// block data) as a stream object.
const fs::path blkfile{testing_setup.get()->m_path_root / "blk.dat"};
CDataStream ss(SER_DISK, 0);
auto params{Params()};
ss << params.MessageStart();
ss << static_cast<uint32_t>(benchmark::data::block813851.size());
// We can't use the streaming serialization (ss << benchmark::data::block813851)
// because that first writes a compact size.
ss.write(MakeByteSpan(benchmark::data::block813851));

// Create the test file.
{
// "wb+" is "binary, O_RDWR | O_CREAT | O_TRUNC".
FILE* file{fsbridge::fopen(blkfile, "wb+")};
// Make the test block file about 128 MB in length.
for (size_t i = 0; i < MAX_BLOCKFILE_SIZE / ss.size(); ++i) {
if (fwrite(ss.data(), 1, ss.size(), file) != ss.size()) {
throw std::runtime_error("write to test file failed\n");
}
}
fclose(file);
}

CChainState& chainstate{testing_setup->m_node.chainman->ActiveChainstate()};
std::multimap<uint256, FlatFilePos> blocks_with_unknown_parent;
FlatFilePos pos;
bench.run([&] {
// "rb" is "binary, O_RDONLY", positioned to the start of the file.
// The file will be closed by LoadExternalBlockFile().
FILE* file{fsbridge::fopen(blkfile, "rb")};
chainstate.LoadExternalBlockFile(file, &pos, &blocks_with_unknown_parent);
});
fs::remove(blkfile);
}

BENCHMARK(LoadExternalBlockFile);
48 changes: 33 additions & 15 deletions src/streams.h
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,6 @@ class CBufferedFile
uint64_t nRewind; //!< how many bytes we guarantee to rewind
std::vector<std::byte> vchBuf; //!< the buffer

protected:
//! read data from the source to fill the buffer
bool Fill() {
unsigned int pos = nSrcPos % vchBuf.size();
Expand All @@ -659,6 +658,28 @@ class CBufferedFile
return true;
}

//! Advance the stream's read pointer (m_read_pos) by up to 'length' bytes,
//! filling the buffer from the file so that at least one byte is available.
//! Return a pointer to the available buffer data and the number of bytes
//! (which may be less than the requested length) that may be accessed
//! beginning at that pointer.
std::pair<std::byte*, size_t> AdvanceStream(size_t length)
{
assert(m_read_pos <= nSrcPos);
if (m_read_pos + length > nReadLimit) {
throw std::ios_base::failure("Attempt to position past buffer limit");
}
// If there are no bytes available, read from the file.
if (m_read_pos == nSrcPos && length > 0) Fill();

size_t buffer_offset{static_cast<size_t>(m_read_pos % vchBuf.size())};
size_t buffer_available{static_cast<size_t>(vchBuf.size() - buffer_offset)};
size_t bytes_until_source_pos{static_cast<size_t>(nSrcPos - m_read_pos)};
size_t advance{std::min({length, buffer_available, bytes_until_source_pos})};
m_read_pos += advance;
return std::make_pair(&vchBuf[buffer_offset], advance);
}

public:
CBufferedFile(FILE* fileIn, uint64_t nBufSize, uint64_t nRewindIn, int nTypeIn, int nVersionIn)
: nType(nTypeIn), nVersion(nVersionIn), nSrcPos(0), m_read_pos(0), nReadLimit(std::numeric_limits<uint64_t>::max()), nRewind(nRewindIn), vchBuf(nBufSize, std::byte{0})
Expand Down Expand Up @@ -696,24 +717,21 @@ class CBufferedFile
//! read a number of bytes
void read(Span<std::byte> dst)
{
if (dst.size() + m_read_pos > nReadLimit) {
throw std::ios_base::failure("Read attempted past buffer limit");
}
while (dst.size() > 0) {
if (m_read_pos == nSrcPos)
Fill();
unsigned int pos = m_read_pos % vchBuf.size();
size_t nNow = dst.size();
if (nNow + pos > vchBuf.size())
nNow = vchBuf.size() - pos;
if (nNow + m_read_pos > nSrcPos)
nNow = nSrcPos - m_read_pos;
memcpy(dst.data(), &vchBuf[pos], nNow);
m_read_pos += nNow;
dst = dst.subspan(nNow);
auto [buffer_pointer, length]{AdvanceStream(dst.size())};
memcpy(dst.data(), buffer_pointer, length);
dst = dst.subspan(length);
}
}

//! Move the read position ahead in the stream to the given position.
//! Use SetPos() to back up in the stream, not SkipTo().
void SkipTo(const uint64_t file_pos)
{
assert(file_pos >= m_read_pos);
while (m_read_pos < file_pos) AdvanceStream(file_pos - m_read_pos);
}

//! return the current reading position
uint64_t GetPos() const {
return m_read_pos;
Expand Down
67 changes: 63 additions & 4 deletions src/test/streams_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ BOOST_AUTO_TEST_CASE(streams_buffered_file)
BOOST_CHECK(false);
} catch (const std::exception& e) {
BOOST_CHECK(strstr(e.what(),
"Read attempted past buffer limit") != nullptr);
"Attempt to position past buffer limit") != nullptr);
}
// The default argument removes the limit completely.
BOOST_CHECK(bf.SetLimit());
Expand Down Expand Up @@ -319,14 +319,63 @@ BOOST_AUTO_TEST_CASE(streams_buffered_file)
BOOST_CHECK(!bf.SetPos(0));
// But we should now be positioned at least as far back as allowed
// by the rewind window (relative to our farthest read position, 40).
BOOST_CHECK(bf.GetPos() <= 30);
BOOST_CHECK(bf.GetPos() <= 30U);

// We can explicitly close the file, or the destructor will do it.
bf.fclose();

fs::remove("streams_test_tmp");
}

BOOST_AUTO_TEST_CASE(streams_buffered_file_skip)
{
fs::path streams_test_filename = m_args.GetDataDirBase() / "streams_test_tmp";
FILE* file = fsbridge::fopen(streams_test_filename, "w+b");
// The value at each offset is the byte offset (e.g. byte 1 in the file has the value 0x01).
for (uint8_t j = 0; j < 40; ++j) {
fwrite(&j, 1, 1, file);
}
rewind(file);

// The buffer is 25 bytes, allow rewinding 10 bytes.
CBufferedFile bf(file, 25, 10, 222, 333);

uint8_t i;
// This is like bf >> (7-byte-variable), in that it will cause data
// to be read from the file into memory, but it's not copied to us.
bf.SkipTo(7);
BOOST_CHECK_EQUAL(bf.GetPos(), 7U);
bf >> i;
BOOST_CHECK_EQUAL(i, 7);

// The bytes in the buffer up to offset 7 are valid and can be read.
BOOST_CHECK(bf.SetPos(0));
bf >> i;
BOOST_CHECK_EQUAL(i, 0);
bf >> i;
BOOST_CHECK_EQUAL(i, 1);

bf.SkipTo(11);
bf >> i;
BOOST_CHECK_EQUAL(i, 11);

// SkipTo() honors the transfer limit; we can't position beyond the limit.
bf.SetLimit(13);
try {
bf.SkipTo(14);
BOOST_CHECK(false);
} catch (const std::exception& e) {
BOOST_CHECK(strstr(e.what(), "Attempt to position past buffer limit") != nullptr);
}

// We can position exactly to the transfer limit.
bf.SkipTo(13);
BOOST_CHECK_EQUAL(bf.GetPos(), 13U);

bf.fclose();
fs::remove(streams_test_filename);
}

BOOST_AUTO_TEST_CASE(streams_buffered_file_rand)
{
// Make this test deterministic.
Expand Down Expand Up @@ -357,7 +406,7 @@ BOOST_AUTO_TEST_CASE(streams_buffered_file_rand)
// sizes; the boundaries of the objects can interact arbitrarily
// with the CBufferFile's internal buffer. These first three
// cases simulate objects of various sizes (1, 2, 5 bytes).
switch (InsecureRandRange(5)) {
switch (InsecureRandRange(6)) {
case 0: {
uint8_t a[1];
if (currentPos + 1 > fileSize)
Expand Down Expand Up @@ -395,6 +444,16 @@ BOOST_AUTO_TEST_CASE(streams_buffered_file_rand)
break;
}
case 3: {
// SkipTo is similar to the "read" cases above, except
// we don't receive the data.
size_t skip_length{static_cast<size_t>(InsecureRandRange(5))};
if (currentPos + skip_length > fileSize) continue;
bf.SetLimit(currentPos + skip_length);
bf.SkipTo(currentPos + skip_length);
currentPos += skip_length;
break;
}
case 4: {
// Find a byte value (that is at or ahead of the current position).
size_t find = currentPos + InsecureRandRange(8);
if (find >= fileSize)
Expand All @@ -411,7 +470,7 @@ BOOST_AUTO_TEST_CASE(streams_buffered_file_rand)
currentPos++;
break;
}
case 4: {
case 5: {
size_t requestPos = InsecureRandRange(maxPos + 4);
bool okay = bf.SetPos(requestPos);
// The new position may differ from the requested position
Expand Down
46 changes: 28 additions & 18 deletions src/validation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4537,6 +4537,8 @@ void CChainState::LoadExternalBlockFile(
unsigned int nMaxBlockSize = MaxBlockSize();
// This takes over fileIn and calls fclose() on it in the CBufferedFile destructor
CBufferedFile blkdat(fileIn, 2*nMaxBlockSize, nMaxBlockSize+8, SER_DISK, CLIENT_VERSION);
// nRewind indicates where to resume scanning in case something goes wrong,
// such as a block fails to deserialize.
uint64_t nRewind = blkdat.GetPos();
while (!blkdat.eof()) {
if (ShutdownRequested()) return;
Expand All @@ -4560,42 +4562,50 @@ void CChainState::LoadExternalBlockFile(
continue;
} catch (const std::exception&) {
// no valid block header found; don't complain
// (this happens at the end of every blk.dat file)
break;
}
try {
// read block
uint64_t nBlockPos = blkdat.GetPos();
// read block header
const uint64_t nBlockPos{blkdat.GetPos()};
if (dbp)
dbp->nPos = nBlockPos;
blkdat.SetLimit(nBlockPos + nSize);
std::shared_ptr<CBlock> pblock = std::make_shared<CBlock>();
CBlock& block = *pblock;
blkdat >> block;
nRewind = blkdat.GetPos();

uint256 hash = block.GetHash();
CBlockHeader header;
blkdat >> header;
const uint256 hash{header.GetHash()};
// Skip the rest of this block (this may read from disk into memory); position to the marker before the
// next block, but it's still possible to rewind to the start of the current block (without a disk read).
nRewind = nBlockPos + nSize;
blkdat.SkipTo(nRewind);
{
LOCK(cs_main);
// detect out of order blocks, and store them for later
if (hash != m_params.GetConsensus().hashGenesisBlock && !m_blockman.LookupBlockIndex(block.hashPrevBlock)) {
if (hash != m_params.GetConsensus().hashGenesisBlock && !m_blockman.LookupBlockIndex(header.hashPrevBlock)) {
LogPrint(BCLog::REINDEX, "%s: Out of order block %s, parent %s not known\n", __func__, hash.ToString(),
block.hashPrevBlock.ToString());
header.hashPrevBlock.ToString());
if (dbp && blocks_with_unknown_parent) {
blocks_with_unknown_parent->emplace(block.hashPrevBlock, *dbp);
blocks_with_unknown_parent->emplace(header.hashPrevBlock, *dbp);
}
continue;
}

// process in case the block isn't known yet
const CBlockIndex* pindex = m_blockman.LookupBlockIndex(hash);
if (!pindex || (pindex->nStatus & BLOCK_HAVE_DATA) == 0) {
BlockValidationState state;
if (AcceptBlock(pblock, state, nullptr, true, dbp, nullptr)) {
nLoaded++;
}
if (state.IsError()) {
break;
}
// This block can be processed immediately; rewind to its start, read and deserialize it.
blkdat.SetPos(nBlockPos);
std::shared_ptr<CBlock> pblock{std::make_shared<CBlock>()};
blkdat >> *pblock;
nRewind = blkdat.GetPos();

BlockValidationState state;
if (AcceptBlock(pblock, state, nullptr, true, dbp, nullptr)) {
nLoaded++;
}
if (state.IsError()) {
break;
}
} else if (hash != m_params.GetConsensus().hashGenesisBlock && pindex->nHeight % 1000 == 0) {
LogPrint(BCLog::REINDEX, "Block Import: already had block %s at height %d\n", hash.ToString(), pindex->nHeight);
}
Expand Down
Loading

0 comments on commit ee16fa1

Please sign in to comment.