Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new catchup mode to use transaction results to skip failed transaction and signature verification #4536

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 83 additions & 1 deletion src/catchup/ApplyCheckpointWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,42 @@ ApplyCheckpointWork::openInputFiles()
mTxIn.open(ti.localPath_nogz());
mTxHistoryEntry = TransactionHistoryEntry();
mHeaderHistoryEntry = LedgerHeaderHistoryEntry();
#ifdef BUILD_TESTS
if (mApp.getConfig().CATCHUP_SKIP_KNOWN_RESULTS_FOR_TESTING)
{
mTxResultIn = std::make_optional<XDRInputFileStream>();
FileTransferInfo tri(mDownloadDir, FileType::HISTORY_FILE_TYPE_RESULTS,
mCheckpoint);
if (!tri.localPath_nogz().empty() &&
std::filesystem::exists(tri.localPath_nogz()))
{
CLOG_DEBUG(History, "Replaying transaction results from {}",
tri.localPath_nogz());

try
{
mTxResultIn->open(tri.localPath_nogz());
}
catch (std::exception const& e)
{
CLOG_DEBUG(History,
"Failed to open transaction results file: {}. All "
"transactions will be applied.",
e.what());
}
mTxHistoryResultEntry =
std::make_optional<TransactionHistoryResultEntry>();
}
else
{
CLOG_DEBUG(History,
"Results file {} not found for checkpoint {} . All "
"transactions will be applied for this checkpoint.",
tri.localPath_nogz(), mCheckpoint);
mTxHistoryResultEntry = std::nullopt;
}
}
#endif
mFilesOpen = true;
}

Expand Down Expand Up @@ -141,6 +177,44 @@ ApplyCheckpointWork::getCurrentTxSet()
return TxSetXDRFrame::makeEmpty(lm.getLastClosedLedgerHeader());
}

#ifdef BUILD_TESTS
std::optional<TransactionResultSet>
ApplyCheckpointWork::getCurrentTxResultSet()
{
ZoneScoped;
auto& lm = mApp.getLedgerManager();
auto seq = lm.getLastClosedLedgerNum() + 1;
// Check mTxResultSet prior to loading next result set.
// This order is important because it accounts for ledger "gaps"
// in the history archives (which are caused by ledgers with empty tx
// sets, as those are not uploaded).
while (mTxResultIn && mTxResultIn->readOne(*mTxHistoryResultEntry))
{
if (mTxHistoryResultEntry)

{
if (mTxHistoryResultEntry->ledgerSeq < seq)
{
CLOG_DEBUG(History, "Advancing past txresultset for ledger {}",
mTxHistoryResultEntry->ledgerSeq);
}
else if (mTxHistoryResultEntry->ledgerSeq > seq)
{
break;
}
else
{
releaseAssert(mTxHistoryResultEntry->ledgerSeq == seq);
CLOG_DEBUG(History, "Loaded txresultset for ledger {}", seq);
return std::make_optional(mTxHistoryResultEntry->txResultSet);
}
}
}
CLOG_DEBUG(History, "No txresultset for ledger {}", seq);
return std::nullopt;
}
#endif // BUILD_TESTS

std::shared_ptr<LedgerCloseData>
ApplyCheckpointWork::getNextLedgerCloseData()
{
Expand Down Expand Up @@ -219,6 +293,14 @@ ApplyCheckpointWork::getNextLedgerCloseData()
CLOG_DEBUG(History, "Ledger {} has {} transactions", header.ledgerSeq,
txset->sizeTxTotal());

std::optional<TransactionResultSet> txres = std::nullopt;
#ifdef BUILD_TESTS
if (mApp.getConfig().CATCHUP_SKIP_KNOWN_RESULTS_FOR_TESTING)
{
txres = getCurrentTxResultSet();
}
#endif

// We've verified the ledgerHeader (in the "trusted part of history"
// sense) in CATCHUP_VERIFY phase; we now need to check that the
// txhash we're about to apply is the one denoted by that ledger
Expand Down Expand Up @@ -249,7 +331,7 @@ ApplyCheckpointWork::getNextLedgerCloseData()

return std::make_shared<LedgerCloseData>(
header.ledgerSeq, txset, header.scpValue,
std::make_optional<Hash>(mHeaderHistoryEntry.hash));
std::make_optional<Hash>(mHeaderHistoryEntry.hash), txres);
}

BasicWork::State
Expand Down
31 changes: 21 additions & 10 deletions src/catchup/ApplyCheckpointWork.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,24 @@ class TmpDir;
struct LedgerHeaderHistoryEntry;

/**
* This class is responsible for applying transactions stored in files on
* temporary directory (downloadDir) to local ledger. It requires two sets of
* files - ledgers and transactions - int .xdr format. Transaction files are
* used to read transactions that will be used and ledger files are used to
* This class is responsible for applying transactions stored in files in the
* temporary directory (downloadDir) to local the ledger. It requires two sets
* of files - ledgers and transactions - in .xdr format. Transaction files are
* used to read transactions that will be applied and ledger files are used to
* check if ledger hashes are matching.
*
* It may also require a third set of files - transaction results - to use in
* accelerated replay, where failed transactions are not applied and successful
* transactions are applied without verifying their signatures.
*
* In each run it skips or applies transactions from one ledger. Skipping occurs
* when ledger to be applied is older than LCL from local ledger. At LCL
* boundary checks are made to confirm that ledgers from files knit up with
* LCL. If everything is OK, an apply ledger operation is performed. Then
* another check is made - if new local ledger matches corresponding ledger from
* file.
* when the ledger to be applied is older than the LCL of the local ledger. At
* LCL, boundary checks are made to confirm that the ledgers from the files knit
* up with LCL. If everything is OK, an apply ledger operation is performed.
* Then another check is made - if the new local ledger matches corresponding
* the ledger from file.
*
* Constructor of this class takes some important parameters:
* The constructor of this class takes some important parameters:
* * downloadDir - directory containing ledger and transaction files
* * range - LedgerRange to apply, must be checkpoint-aligned,
* and cover at most one checkpoint.
Expand All @@ -49,6 +53,10 @@ class ApplyCheckpointWork : public BasicWork
XDRInputFileStream mHdrIn;
XDRInputFileStream mTxIn;
TransactionHistoryEntry mTxHistoryEntry;
#ifdef BUILD_TESTS
std::optional<XDRInputFileStream> mTxResultIn;
std::optional<TransactionHistoryResultEntry> mTxHistoryResultEntry;
#endif // BUILD_TESTS
LedgerHeaderHistoryEntry mHeaderHistoryEntry;
OnFailureCallback mOnFailure;

Expand All @@ -57,6 +65,9 @@ class ApplyCheckpointWork : public BasicWork
std::shared_ptr<ConditionalWork> mConditionalWork;

TxSetXDRFrameConstPtr getCurrentTxSet();
#ifdef BUILD_TESTS
std::optional<TransactionResultSet> getCurrentTxResultSet();
#endif // BUILD_TESTS
void openInputFiles();

std::shared_ptr<LedgerCloseData> getNextLedgerCloseData();
Expand Down
15 changes: 8 additions & 7 deletions src/catchup/CatchupConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
namespace stellar
{

// Each catchup can be configured by two parameters destination ledger
// Each catchup can be configured by two parameters: destination ledger
// (and its hash, if known) and count of ledgers to apply.
// Value of count can be adjusted in different ways during catchup. If applying
// count ledgers would mean going before the last closed ledger - it is
Expand All @@ -31,12 +31,13 @@ namespace stellar
// and catchup to that instead of destination ledger. This is useful when
// doing offline commandline catchups with stellar-core catchup command.
//
// Catchup can be done in two modes - ONLINE nad OFFLINE. In ONLINE mode node
// is connected to the network. If receives ledgers during catchup and applies
// them after history is applied. Also additional closing ledger is required
// to mark catchup as complete and node as synced. In OFFLINE mode node is not
// connected to network, so new ledgers are not being externalized. Only
// buckets and transactions from history archives are applied.
// Catchup can be done in two modes - ONLINE and OFFLINE. In ONLINE mode, the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did the formatting change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reworded it a bit, which changed the line lengths and formatting.

// node is connected to the network. It receives ledgers during catchup and
// applies them after history is applied. Also, an additional closing ledger is
// required to mark catchup as complete and the node as synced. In OFFLINE mode,
// the node is not connected to network, so new ledgers are not being
// externalized. Only buckets and transactions from history archives are
// applied.
class CatchupConfiguration
{
public:
Expand Down
14 changes: 7 additions & 7 deletions src/catchup/CatchupWork.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,22 @@ using WorkSeqPtr = std::shared_ptr<WorkSequence>;

// CatchupWork does all the necessary work to perform any type of catchup.
// It accepts CatchupConfiguration structure to know from which ledger to which
// one do the catchup and if it involves only applying ledgers or ledgers and
// one to do the catchup and if it involves only applying ledgers or ledgers and
// buckets.
//
// First thing it does is to get a history state which allows to calculate
// proper destination ledger (in case CatchupConfiguration::CURRENT) was used
// and to get list of buckets that should be in database on that ledger.
// First, it gets a history state, which allows it to calculate a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for documentation fixes!

// proper destination ledger (in case CatchupConfiguration::CURRENT)
// and get a list of buckets that should be in the database on that ledger.
//
// Next step is downloading and verifying ledgers (if verifyMode is set to
// VERIFY_BUFFERED_LEDGERS it can also verify against ledgers currently
// Next, it downloads and verifies ledgers (if verifyMode is set to
// VERIFY_BUFFERED_LEDGERS, it can also verify against ledgers currently
// buffered in LedgerManager).
//
// Then, depending on configuration, it can download, verify and apply buckets
// (as in MINIMAL and RECENT catchups), and then download and apply
// transactions (as in COMPLETE and RECENT catchups).
//
// After that, catchup is done and node can replay buffered ledgers and take
// After that, catchup is done and the node can replay buffered ledgers and take
// part in consensus protocol.

class CatchupWork : public Work
Expand Down
90 changes: 71 additions & 19 deletions src/catchup/DownloadApplyTxsWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ DownloadApplyTxsWork::yieldMoreWork()
{
throw std::runtime_error("Work has no more children to iterate over!");
}

CLOG_INFO(History,
"Downloading, unzipping and applying {} for checkpoint {}",
typeString(FileType::HISTORY_FILE_TYPE_TRANSACTIONS),
Expand Down Expand Up @@ -80,6 +79,53 @@ DownloadApplyTxsWork::yieldMoreWork()
mApp, mDownloadDir, LedgerRange::inclusive(low, high), cb);

std::vector<std::shared_ptr<BasicWork>> seq{getAndUnzip};
std::vector<FileTransferInfo> filesToTransfer{ft};
std::vector<std::shared_ptr<BasicWork>> optionalDownloads;
#ifdef BUILD_TESTS
if (mApp.getConfig().CATCHUP_SKIP_KNOWN_RESULTS_FOR_TESTING)
{
CLOG_INFO(History,
"Downloading, unzipping and applying {} for checkpoint {}",
typeString(FileType::HISTORY_FILE_TYPE_RESULTS),
mCheckpointToQueue);

FileTransferInfo resultsFile(mDownloadDir,
FileType::HISTORY_FILE_TYPE_RESULTS,
mCheckpointToQueue);
auto getResultsWork = std::make_shared<GetAndUnzipRemoteFileWork>(
mApp, resultsFile, mArchive, /*logErrorOnFailure=*/false);
std::weak_ptr<GetAndUnzipRemoteFileWork> getResultsWorkWeak =
getResultsWork;
seq.emplace_back(getResultsWork);
seq.emplace_back(std::make_shared<WorkWithCallback>(
mApp, "get-results-" + std::to_string(mCheckpointToQueue),
[apply, getResultsWorkWeak, checkpoint, &dir](Application& app) {
auto getResults = getResultsWorkWeak.lock();
if (getResults && getResults->getState() != State::WORK_SUCCESS)
{
auto archive = getResults->getArchive();
if (archive)
{
FileTransferInfo ti(dir,
FileType::HISTORY_FILE_TYPE_RESULTS,
checkpoint);
CLOG_WARNING(
History,
"Archive {} maybe contains corrupt results file "
"{}. "
"This is not fatal as long as the archive contains "
"valid transaction history. Catchup will proceed "
"but"
"the node will not be able to skip known results.",
archive->getName(), ti.remoteName());
}
}
return true;
}));

filesToTransfer.push_back(resultsFile);
}
#endif // BUILD_TESTS

auto maybeWaitForMerges = [](Application& app) {
if (app.getConfig().CATCHUP_WAIT_MERGES_TX_APPLY_FOR_TESTING)
Expand Down Expand Up @@ -139,28 +185,34 @@ DownloadApplyTxsWork::yieldMoreWork()
mApp, "wait-merges" + apply->getName(), maybeWaitForMerges, apply));
}

seq.push_back(std::make_shared<WorkWithCallback>(
mApp, "delete-transactions-" + std::to_string(mCheckpointToQueue),
[ft](Application& app) {
try
{
std::filesystem::remove(
std::filesystem::path(ft.localPath_nogz()));
CLOG_DEBUG(History, "Deleted transactions {}",
for (auto const& ft : filesToTransfer)
{
auto deleteWorkName = "delete-" + ft.getTypeString() + "-" +
std::to_string(mCheckpointToQueue);
seq.push_back(std::make_shared<WorkWithCallback>(
mApp, deleteWorkName, [ft](Application& app) {
CLOG_DEBUG(History, "Deleting {} {}", ft.getTypeString(),
ft.localPath_nogz());
try
{
std::filesystem::remove(
std::filesystem::path(ft.localPath_nogz()));
CLOG_DEBUG(History, "Deleted {} {}", ft.getTypeString(),
ft.localPath_nogz());
}
catch (std::filesystem::filesystem_error const& e)
{
CLOG_ERROR(History, "Could not delete {} {}: {}",
ft.getTypeString(), ft.localPath_nogz(),
e.what());
return false;
}
return true;
}
catch (std::filesystem::filesystem_error const& e)
{
CLOG_ERROR(History, "Could not delete transactions {}: {}",
ft.localPath_nogz(), e.what());
return false;
}
}));

}));
}
auto nextWork = std::make_shared<WorkSequence>(
mApp, "download-apply-" + std::to_string(mCheckpointToQueue), seq,
BasicWork::RETRY_NEVER);
BasicWork::RETRY_NEVER, true /*stop at first failure*/);
mCheckpointToQueue += mApp.getHistoryManager().getCheckpointFrequency();
mLastYieldedWork = nextWork;
return nextWork;
Expand Down
9 changes: 5 additions & 4 deletions src/herder/LedgerCloseData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ using namespace std;
namespace stellar
{

LedgerCloseData::LedgerCloseData(uint32_t ledgerSeq,
TxSetXDRFrameConstPtr txSet,
StellarValue const& v,
std::optional<Hash> const& expectedLedgerHash)
LedgerCloseData::LedgerCloseData(
uint32_t ledgerSeq, TxSetXDRFrameConstPtr txSet, StellarValue const& v,
std::optional<Hash> const& expectedLedgerHash,
std::optional<TransactionResultSet> const& expectedResults)
: mLedgerSeq(ledgerSeq)
, mTxSet(txSet)
, mValue(v)
, mExpectedLedgerHash(expectedLedgerHash)
, mExpectedResults(expectedResults)
{
releaseAssert(txSet->getContentsHash() == mValue.txSetHash);
}
Expand Down
12 changes: 11 additions & 1 deletion src/herder/LedgerCloseData.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ class LedgerCloseData
public:
LedgerCloseData(
uint32_t ledgerSeq, TxSetXDRFrameConstPtr txSet, StellarValue const& v,
std::optional<Hash> const& expectedLedgerHash = std::nullopt);
std::optional<Hash> const& expectedLedgerHash = std::nullopt,
std::optional<TransactionResultSet> const& expectedResults =
std::nullopt);

uint32_t
getLedgerSeq() const
Expand All @@ -48,6 +50,13 @@ class LedgerCloseData
{
return mExpectedLedgerHash;
}
#ifdef BUILD_TESTS
std::optional<TransactionResultSet> const&
getExpectedResults() const
{
return mExpectedResults;
}
#endif // BUILD_TESTS

StoredDebugTransactionSet
toXDR() const
Expand Down Expand Up @@ -82,6 +91,7 @@ class LedgerCloseData
TxSetXDRFrameConstPtr mTxSet;
StellarValue mValue;
std::optional<Hash> mExpectedLedgerHash;
std::optional<TransactionResultSet> mExpectedResults;
};

std::string stellarValueToString(Config const& c, StellarValue const& sv);
Expand Down
Loading