Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
ThomasBrady committed Nov 16, 2024
1 parent 0f80cc8 commit 40701f8
Show file tree
Hide file tree
Showing 21 changed files with 290 additions and 242 deletions.
43 changes: 24 additions & 19 deletions src/catchup/ApplyCheckpointWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,14 @@ ApplyCheckpointWork::openInputFiles()
mHeaderHistoryEntry = LedgerHeaderHistoryEntry();
if (mApp.getConfig().CATCHUP_SKIP_KNOWN_RESULTS)
{
mTxResultIn.close();
mTxResultIn = std::make_optional<XDRInputFileStream>();
FileTransferInfo tri(mDownloadDir, FileType::HISTORY_FILE_TYPE_RESULTS,
mCheckpoint);
CLOG_DEBUG(History, "Replaying transaction results from {}",
tri.localPath_nogz());
mTxResultIn.open(tri.localPath_nogz());
mTxHistoryResultEntry = TransactionHistoryResultEntry{};
mTxResultIn->open(tri.localPath_nogz());
mTxHistoryResultEntry =
std::make_optional<TransactionHistoryResultEntry>();
}
mFilesOpen = true;
}
Expand Down Expand Up @@ -157,29 +158,33 @@ ApplyCheckpointWork::getCurrentTxResultSet()
ZoneScoped;
auto& lm = mApp.getLedgerManager();
auto seq = lm.getLastClosedLedgerNum() + 1;

// Check mTxResultSet prior to loading next result set.
// This order is important because it accounts for ledger "gaps"
// in the history archives (which are caused by ledgers with empty tx
// sets, as those are not uploaded).
do
while (mTxResultIn)
{
if (mTxHistoryResultEntry.ledgerSeq < seq)
{
CLOG_DEBUG(History, "Advancing past txresultset for ledger {}",
mTxHistoryResultEntry.ledgerSeq);
}
else if (mTxHistoryResultEntry.ledgerSeq > seq)
{
break;
}
else
if (mTxHistoryResultEntry)

{
releaseAssert(mTxHistoryResultEntry.ledgerSeq == seq);
CLOG_DEBUG(History, "Loaded txresultset for ledger {}", seq);
return std::make_optional(mTxHistoryResultEntry.txResultSet);
if (mTxHistoryResultEntry->ledgerSeq < seq)
{
CLOG_DEBUG(History, "Advancing past txresultset for ledger {}",
mTxHistoryResultEntry->ledgerSeq);
}
else if (mTxHistoryResultEntry->ledgerSeq > seq)
{
break;
}
else
{
releaseAssert(mTxHistoryResultEntry->ledgerSeq == seq);
CLOG_DEBUG(History, "Loaded txresultset for ledger {}", seq);
return std::make_optional(mTxHistoryResultEntry->txResultSet);
}
}
} while (mTxResultIn && mTxResultIn.readOne(mTxHistoryResultEntry));
mTxResultIn->readOne(*mTxHistoryResultEntry);
}
CLOG_DEBUG(History, "No txresultset for ledger {}", seq);
return std::nullopt;
}
Expand Down
4 changes: 2 additions & 2 deletions src/catchup/ApplyCheckpointWork.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ class ApplyCheckpointWork : public BasicWork

XDRInputFileStream mHdrIn;
XDRInputFileStream mTxIn;
XDRInputFileStream mTxResultIn;
std::optional<XDRInputFileStream> mTxResultIn;
TransactionHistoryEntry mTxHistoryEntry;
TransactionHistoryResultEntry mTxHistoryResultEntry;
std::optional<TransactionHistoryResultEntry> mTxHistoryResultEntry;
LedgerHeaderHistoryEntry mHeaderHistoryEntry;
OnFailureCallback mOnFailure;

Expand Down
2 changes: 1 addition & 1 deletion src/catchup/CatchupConfiguration.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ namespace stellar
// doing offline commandline catchups with stellar-core catchup command.
//
// Catchup can be done in two modes - ONLINE and OFFLINE. In ONLINE mode, the
// node is connected to the network. If receives ledgers during catchup and
// node is connected to the network. It receives ledgers during catchup and
// applies them after history is applied. Also, an additional closing ledger is
// required to mark catchup as complete and the node as synced. In OFFLINE mode,
// the node is not connected to network, so new ledgers are not being
Expand Down
101 changes: 62 additions & 39 deletions src/catchup/DownloadApplyTxsWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,40 +43,59 @@ DownloadApplyTxsWork::yieldMoreWork()
{
throw std::runtime_error("Work has no more children to iterate over!");
}
std::vector<FileType> fileTypesToDownload{
FileType::HISTORY_FILE_TYPE_TRANSACTIONS};
std::vector<std::shared_ptr<BasicWork>> downloadSeq;
std::vector<FileTransferInfo> filesToTransfer;
if (mApp.getConfig().CATCHUP_SKIP_KNOWN_RESULTS)
{
fileTypesToDownload.emplace_back(FileType::HISTORY_FILE_TYPE_RESULTS);
}
for (auto const& fileType : fileTypesToDownload)
{
CLOG_INFO(History,
"Downloading, unzipping and applying {} for checkpoint {}",
typeString(fileType), mCheckpointToQueue);
FileTransferInfo ft(mDownloadDir, fileType, mCheckpointToQueue);
filesToTransfer.emplace_back(ft);
downloadSeq.emplace_back(
std::make_shared<GetAndUnzipRemoteFileWork>(mApp, ft, mArchive));
}

OnFailureCallback cb = [archive = mArchive, filesToTransfer]() {
for (auto const& ft : filesToTransfer)
{
CLOG_ERROR(History, "Archive {} maybe contains corrupt file {}",
archive->getName(), ft.remoteName());
}
};
CLOG_INFO(History,
"Downloading, unzipping and applying {} for checkpoint {}",
typeString(FileType::HISTORY_FILE_TYPE_TRANSACTIONS),
mCheckpointToQueue);
FileTransferInfo ft(mDownloadDir, FileType::HISTORY_FILE_TYPE_TRANSACTIONS,
mCheckpointToQueue);
auto getAndUnzip =
std::make_shared<GetAndUnzipRemoteFileWork>(mApp, ft, mArchive);

auto const& hm = mApp.getHistoryManager();
auto low = hm.firstLedgerInCheckpointContaining(mCheckpointToQueue);
auto high = std::min(mCheckpointToQueue, mRange.last());

TmpDir const& dir = mDownloadDir;
uint32_t checkpoint = mCheckpointToQueue;
auto getFileWeak = std::weak_ptr<GetAndUnzipRemoteFileWork>(getAndUnzip);

OnFailureCallback cb = [getFileWeak, checkpoint, &dir]() {
auto getFile = getFileWeak.lock();
if (getFile)
{
auto archive = getFile->getArchive();
if (archive)
{
FileTransferInfo ti(
dir, FileType::HISTORY_FILE_TYPE_TRANSACTIONS, checkpoint);
CLOG_ERROR(History, "Archive {} maybe contains corrupt file {}",
archive->getName(), ti.remoteName());
}
}
};

auto apply = std::make_shared<ApplyCheckpointWork>(
mApp, mDownloadDir, LedgerRange::inclusive(low, high), cb);

std::vector<std::shared_ptr<BasicWork>> seq{getAndUnzip};
std::vector<FileTransferInfo> filesToTransfer{ft};

if (mApp.getConfig().CATCHUP_SKIP_KNOWN_RESULTS)
{
CLOG_INFO(History,
"Downloading, unzipping and applying {} for checkpoint {}",
typeString(FileType::HISTORY_FILE_TYPE_RESULTS),
mCheckpointToQueue);

FileTransferInfo resultsFile(mDownloadDir,
FileType::HISTORY_FILE_TYPE_RESULTS,
mCheckpointToQueue);
seq.emplace_back(std::make_shared<GetAndUnzipRemoteFileWork>(
mApp, resultsFile, mArchive));
filesToTransfer.push_back(resultsFile);
}

auto maybeWaitForMerges = [](Application& app) {
if (app.getConfig().CATCHUP_WAIT_MERGES_TX_APPLY_FOR_TESTING)
{
Expand All @@ -94,10 +113,8 @@ DownloadApplyTxsWork::yieldMoreWork()
{
auto prev = mLastYieldedWork;
bool pqFellBehind = false;
auto applyName = apply->getName();
auto predicate = [prev, pqFellBehind, waitForPublish = mWaitForPublish,
maybeWaitForMerges,
applyName](Application& app) mutable {
maybeWaitForMerges](Application& app) mutable {
if (!prev)
{
throw std::runtime_error("Download and apply txs: related Work "
Expand Down Expand Up @@ -128,42 +145,48 @@ DownloadApplyTxsWork::yieldMoreWork()
}
return res && maybeWaitForMerges(app);
};
downloadSeq.push_back(std::make_shared<ConditionalWork>(
seq.push_back(std::make_shared<ConditionalWork>(
mApp, "conditional-" + apply->getName(), predicate, apply));
}
else
{
downloadSeq.push_back(std::make_shared<ConditionalWork>(
seq.push_back(std::make_shared<ConditionalWork>(
mApp, "wait-merges" + apply->getName(), maybeWaitForMerges, apply));
}

downloadSeq.push_back(std::make_shared<WorkWithCallback>(
mApp, "delete-transactions-" + std::to_string(mCheckpointToQueue),
seq.push_back(std::make_shared<WorkWithCallback>(
mApp,
"delete-transactions-" +
(mApp.getConfig().CATCHUP_SKIP_KNOWN_RESULTS
? std::string("and-results-")
: "") +
std::to_string(mCheckpointToQueue),
[filesToTransfer](Application& app) {
for (auto const& ft : filesToTransfer)
{
CLOG_DEBUG(History, "Deleting transactions {}",
CLOG_DEBUG(History, "Deleting {} {}", ft.getTypeString(),
ft.localPath_nogz());
try
{
std::filesystem::remove(
std::filesystem::path(ft.localPath_nogz()));
CLOG_DEBUG(History, "Deleted transactions {}",
CLOG_DEBUG(History, "Deleted {} {}", ft.getTypeString(),
ft.localPath_nogz());
}
catch (std::filesystem::filesystem_error const& e)
{
CLOG_ERROR(History, "Could not delete transactions {}: {}",
ft.localPath_nogz(), e.what());
CLOG_ERROR(History, "Could not delete {} {}: {}",
ft.getTypeString(), ft.localPath_nogz(),
e.what());
return false;
}
}
return true;
}));

auto nextWork = std::make_shared<WorkSequence>(
mApp, "download-apply-" + std::to_string(mCheckpointToQueue),
downloadSeq, BasicWork::RETRY_NEVER);
mApp, "download-apply-" + std::to_string(mCheckpointToQueue), seq,
BasicWork::RETRY_NEVER);
mCheckpointToQueue += mApp.getHistoryManager().getCheckpointFrequency();
mLastYieldedWork = nextWork;
return nextWork;
Expand Down
85 changes: 39 additions & 46 deletions src/ledger/LedgerManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -915,13 +915,14 @@ LedgerManagerImpl::closeLedger(LedgerCloseData const& ledgerData)
// first, prefetch source accounts for txset, then charge fees
prefetchTxSourceIds(txs);
auto const mutableTxResults =
processFeesSeqNums(txs, ltx, *applicableTxSet, ledgerCloseMeta);
processFeesSeqNums(txs, ltx, *applicableTxSet, ledgerCloseMeta,
ledgerData.getExpectedResults());

TransactionResultSet txResultSet;
txResultSet.results.reserve(txs.size());
// Subtle: after this call, `header` is invalidated, and is not safe to use
applyTransactions(*applicableTxSet, txs, mutableTxResults, ltx, txResultSet,
ledgerData.getExpectedResults(), ledgerCloseMeta);
ledgerCloseMeta);
if (mApp.getConfig().MODE_STORES_HISTORY_MISC)
{
auto ledgerSeq = ltx.loadHeader().current().ledgerSeq;
Expand Down Expand Up @@ -1348,7 +1349,8 @@ std::vector<MutableTxResultPtr>
LedgerManagerImpl::processFeesSeqNums(
std::vector<TransactionFrameBasePtr> const& txs,
AbstractLedgerTxn& ltxOuter, ApplicableTxSetFrame const& txSet,
std::unique_ptr<LedgerCloseMetaFrame> const& ledgerCloseMeta)
std::unique_ptr<LedgerCloseMetaFrame> const& ledgerCloseMeta,
std::optional<TransactionResultSet> const& expectedResults)
{
ZoneScoped;
std::vector<MutableTxResultPtr> txResults;
Expand All @@ -1361,13 +1363,47 @@ LedgerManagerImpl::processFeesSeqNums(
auto header = ltx.loadHeader().current();
std::map<AccountID, SequenceNumber> accToMaxSeq;

// If we have expected results, we assign them to the mutable tx results
// here.
std::optional<std::vector<TransactionResultPair>::const_iterator>
expectedResultsIter = std::nullopt;
if (expectedResults)
{
expectedResultsIter =
std::make_optional(expectedResults->results.begin());
}

bool mergeSeen = false;
for (auto tx : txs)
{
LedgerTxn ltxTx(ltx);

txResults.push_back(
tx->processFeeSeqNum(ltxTx, txSet.getTxBaseFee(tx, header)));
if (expectedResultsIter)
{
releaseAssert(*expectedResultsIter !=
expectedResults->results.end());
releaseAssert((*expectedResultsIter)->transactionHash ==
tx->getContentsHash());
if ((*expectedResultsIter)->result.result.code() ==
TransactionResultCode::txSUCCESS)
{
CLOG_DEBUG(
Tx, "Skipping replay of successful transaction: tx {}",
binToHex(tx->getContentsHash()));
txResults.back()->setReplaySuccessfulTransactionResult(
(*expectedResultsIter)->result);
}
else
{
CLOG_DEBUG(Tx, "Replaying failing transaction: tx {}",
binToHex(tx->getContentsHash()));
txResults.back()->setReplayFailingTransactionResult(
(*expectedResultsIter)->result);
}
++(*expectedResultsIter);
}

if (protocolVersionStartsFrom(
ltxTx.loadHeader().current().ledgerVersion,
Expand Down Expand Up @@ -1499,7 +1535,6 @@ LedgerManagerImpl::applyTransactions(
std::vector<TransactionFrameBasePtr> const& txs,
std::vector<MutableTxResultPtr> const& mutableTxResults,
AbstractLedgerTxn& ltx, TransactionResultSet& txResultSet,
std::optional<TransactionResultSet> const& expectedResults,
std::unique_ptr<LedgerCloseMetaFrame> const& ledgerCloseMeta)
{
ZoneNamedN(txsZone, "applyTransactions", true);
Expand All @@ -1522,17 +1557,6 @@ LedgerManagerImpl::applyTransactions(

prefetchTransactionData(txs);

std::optional<std::vector<TransactionResultPair>::const_iterator>
expectedResultsIter = std::nullopt;
if (mApp.getConfig().CATCHUP_SKIP_KNOWN_RESULTS && expectedResults)
{
auto const& resVec = expectedResults->results;
CLOG_DEBUG(Tx, "Will skip replaying known results: {} txs, {} results",
txs.size(), resVec.size());
releaseAssertOrThrow(txs.size() == resVec.size());
expectedResultsIter = std::make_optional(resVec.begin());
}

Hash sorobanBasePrngSeed = txSet.getContentsHash();
uint64_t txNum{0};
uint64_t txSucceeded{0};
Expand Down Expand Up @@ -1566,37 +1590,6 @@ LedgerManagerImpl::applyTransactions(
TransactionResultPair results;
results.transactionHash = tx->getContentsHash();

if (expectedResultsIter)
{
releaseAssert(*expectedResultsIter !=
expectedResults->results.end());
while ((*expectedResultsIter)->transactionHash !=
results.transactionHash)
{
(*expectedResultsIter)++;
releaseAssert(*expectedResultsIter !=
expectedResults->results.end());
}
releaseAssert((*expectedResultsIter)->transactionHash ==
results.transactionHash);
if ((*expectedResultsIter)->result.result.code() ==
TransactionResultCode::txSUCCESS)
{
CLOG_DEBUG(Tx,
"Skipping signature verification for known "
"successful tx#{}",
index);
tx->setReplaySuccessfulTransactionResult(
(*expectedResultsIter)->result);
}
else
{
CLOG_DEBUG(Tx, "Skipping replay of known failed tx#{}", index);
tx->setReplayFailingTransactionResult(
(*expectedResultsIter)->result);
}
}

tx->apply(mApp.getAppConnector(), ltx, tm, mutableTxResult, subSeed);
tx->processPostApply(mApp.getAppConnector(), ltx, tm, mutableTxResult);

Expand Down
Loading

0 comments on commit 40701f8

Please sign in to comment.