Skip to content

Commit

Permalink
Improve lifetime management of ledger objects (SLEs) to prevent run…
Browse files Browse the repository at this point in the history
…away memory usage: (XRPLF#4822)

* Add logging for Application.cpp sweep()
* Improve lifetime management of ledger objects (`SLE`s)
* Only store SLE digest in CachedView; get SLEs from CachedSLEs
* Also force release of last ledger used for path finding if there are
  no path finding requests to process
* Count more ST objects (derive from `CountedObject`)
* Track CachedView stats in CountedObjects
* Rename the CachedView counters
* Fix the scope of the digest lookup lock

Before this patch, if you asked "is it caching?" It was always caching.
  • Loading branch information
ximinez authored Jan 12, 2024
1 parent 4308407 commit d9f90c8
Show file tree
Hide file tree
Showing 19 changed files with 283 additions and 38 deletions.
3 changes: 3 additions & 0 deletions src/ripple/app/ledger/InboundLedgers.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ class InboundLedgers

virtual void
stop() = 0;

virtual std::size_t
cacheSize() = 0;
};

std::unique_ptr<InboundLedgers>
Expand Down
21 changes: 21 additions & 0 deletions src/ripple/app/ledger/LedgerReplayer.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,27 @@ class LedgerReplayer final
void
stop();

std::size_t
tasksSize() const
{
std::lock_guard<std::mutex> lock(mtx_);
return tasks_.size();
}

std::size_t
deltasSize() const
{
std::lock_guard<std::mutex> lock(mtx_);
return deltas_.size();
}

std::size_t
skipListsSize() const
{
std::lock_guard<std::mutex> lock(mtx_);
return skipLists_.size();
}

private:
mutable std::mutex mtx_;
std::vector<std::shared_ptr<LedgerReplayTask>> tasks_;
Expand Down
7 changes: 7 additions & 0 deletions src/ripple/app/ledger/impl/InboundLedgers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,13 @@ class InboundLedgersImp : public InboundLedgers
mRecentFailures.clear();
}

std::size_t
cacheSize() override
{
ScopedLockType lock(mLock);
return mLedgers.size();
}

private:
clock_type& m_clock;

Expand Down
5 changes: 5 additions & 0 deletions src/ripple/app/ledger/impl/LedgerMaster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1543,6 +1543,7 @@ LedgerMaster::updatePaths()
if (app_.getOPs().isNeedNetworkLedger())
{
--mPathFindThread;
mPathLedger.reset();
JLOG(m_journal.debug()) << "Need network ledger for updating paths";
return;
}
Expand All @@ -1568,6 +1569,7 @@ LedgerMaster::updatePaths()
else
{ // Nothing to do
--mPathFindThread;
mPathLedger.reset();
JLOG(m_journal.debug()) << "Nothing to do for updating paths";
return;
}
Expand All @@ -1584,6 +1586,7 @@ LedgerMaster::updatePaths()
<< "Published ledger too old for updating paths";
std::lock_guard ml(m_mutex);
--mPathFindThread;
mPathLedger.reset();
return;
}
}
Expand All @@ -1596,6 +1599,7 @@ LedgerMaster::updatePaths()
if (!pathRequests.requestsPending())
{
--mPathFindThread;
mPathLedger.reset();
JLOG(m_journal.debug())
<< "No path requests found. Nothing to do for updating "
"paths. "
Expand All @@ -1613,6 +1617,7 @@ LedgerMaster::updatePaths()
<< "No path requests left. No need for further updating "
"paths";
--mPathFindThread;
mPathLedger.reset();
return;
}
}
Expand Down
172 changes: 162 additions & 10 deletions src/ripple/app/main/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1065,20 +1065,172 @@ class ApplicationImp : public Application, public BasicApp
// VFALCO TODO fix the dependency inversion using an observer,
// have listeners register for "onSweep ()" notification.

nodeFamily_.sweep();
{
std::shared_ptr<FullBelowCache const> const fullBelowCache =
nodeFamily_.getFullBelowCache(0);

std::shared_ptr<TreeNodeCache const> const treeNodeCache =
nodeFamily_.getTreeNodeCache(0);

std::size_t const oldFullBelowSize = fullBelowCache->size();
std::size_t const oldTreeNodeSize = treeNodeCache->size();

nodeFamily_.sweep();

JLOG(m_journal.debug())
<< "NodeFamily::FullBelowCache sweep. Size before: "
<< oldFullBelowSize
<< "; size after: " << fullBelowCache->size();

JLOG(m_journal.debug())
<< "NodeFamily::TreeNodeCache sweep. Size before: "
<< oldTreeNodeSize << "; size after: " << treeNodeCache->size();
}
if (shardFamily_)
{
std::size_t const oldFullBelowSize =
shardFamily_->getFullBelowCacheSize();
std::size_t const oldTreeNodeSize =
shardFamily_->getTreeNodeCacheSize().second;

shardFamily_->sweep();
getMasterTransaction().sweep();
getNodeStore().sweep();

JLOG(m_journal.debug())
<< "ShardFamily::FullBelowCache sweep. Size before: "
<< oldFullBelowSize
<< "; size after: " << shardFamily_->getFullBelowCacheSize();

JLOG(m_journal.debug())
<< "ShardFamily::TreeNodeCache sweep. Size before: "
<< oldTreeNodeSize << "; size after: "
<< shardFamily_->getTreeNodeCacheSize().second;
}
{
TaggedCache<uint256, Transaction> const& masterTxCache =
getMasterTransaction().getCache();

std::size_t const oldMasterTxSize = masterTxCache.size();

getMasterTransaction().sweep();

JLOG(m_journal.debug())
<< "MasterTransaction sweep. Size before: " << oldMasterTxSize
<< "; size after: " << masterTxCache.size();
}
{
// Does not appear to have an associated cache.
getNodeStore().sweep();
}
if (shardStore_)
{
// Does not appear to have an associated cache.
shardStore_->sweep();
getLedgerMaster().sweep();
getTempNodeCache().sweep();
getValidations().expire(m_journal);
getInboundLedgers().sweep();
getLedgerReplayer().sweep();
m_acceptedLedgerCache.sweep();
cachedSLEs_.sweep();
}
{
std::size_t const oldLedgerMasterCacheSize =
getLedgerMaster().getFetchPackCacheSize();

getLedgerMaster().sweep();

JLOG(m_journal.debug())
<< "LedgerMaster sweep. Size before: "
<< oldLedgerMasterCacheSize << "; size after: "
<< getLedgerMaster().getFetchPackCacheSize();
}
{
// NodeCache == TaggedCache<SHAMapHash, Blob>
std::size_t const oldTempNodeCacheSize = getTempNodeCache().size();

getTempNodeCache().sweep();

JLOG(m_journal.debug())
<< "TempNodeCache sweep. Size before: " << oldTempNodeCacheSize
<< "; size after: " << getTempNodeCache().size();
}
{
std::size_t const oldCurrentCacheSize =
getValidations().sizeOfCurrentCache();
std::size_t const oldSizeSeqEnforcesSize =
getValidations().sizeOfSeqEnforcersCache();
std::size_t const oldByLedgerSize =
getValidations().sizeOfByLedgerCache();
std::size_t const oldBySequenceSize =
getValidations().sizeOfBySequenceCache();

getValidations().expire(m_journal);

JLOG(m_journal.debug())
<< "Validations Current expire. Size before: "
<< oldCurrentCacheSize
<< "; size after: " << getValidations().sizeOfCurrentCache();

JLOG(m_journal.debug())
<< "Validations SeqEnforcer expire. Size before: "
<< oldSizeSeqEnforcesSize << "; size after: "
<< getValidations().sizeOfSeqEnforcersCache();

JLOG(m_journal.debug())
<< "Validations ByLedger expire. Size before: "
<< oldByLedgerSize
<< "; size after: " << getValidations().sizeOfByLedgerCache();

JLOG(m_journal.debug())
<< "Validations BySequence expire. Size before: "
<< oldBySequenceSize
<< "; size after: " << getValidations().sizeOfBySequenceCache();
}
{
std::size_t const oldInboundLedgersSize =
getInboundLedgers().cacheSize();

getInboundLedgers().sweep();

JLOG(m_journal.debug())
<< "InboundLedgers sweep. Size before: "
<< oldInboundLedgersSize
<< "; size after: " << getInboundLedgers().cacheSize();
}
{
size_t const oldTasksSize = getLedgerReplayer().tasksSize();
size_t const oldDeltasSize = getLedgerReplayer().deltasSize();
size_t const oldSkipListsSize = getLedgerReplayer().skipListsSize();

getLedgerReplayer().sweep();

JLOG(m_journal.debug())
<< "LedgerReplayer tasks sweep. Size before: " << oldTasksSize
<< "; size after: " << getLedgerReplayer().tasksSize();

JLOG(m_journal.debug())
<< "LedgerReplayer deltas sweep. Size before: "
<< oldDeltasSize
<< "; size after: " << getLedgerReplayer().deltasSize();

JLOG(m_journal.debug())
<< "LedgerReplayer skipLists sweep. Size before: "
<< oldSkipListsSize
<< "; size after: " << getLedgerReplayer().skipListsSize();
}
{
std::size_t const oldAcceptedLedgerSize =
m_acceptedLedgerCache.size();

m_acceptedLedgerCache.sweep();

JLOG(m_journal.debug())
<< "AcceptedLedgerCache sweep. Size before: "
<< oldAcceptedLedgerSize
<< "; size after: " << m_acceptedLedgerCache.size();
}
{
std::size_t const oldCachedSLEsSize = cachedSLEs_.size();

cachedSLEs_.sweep();

JLOG(m_journal.debug())
<< "CachedSLEs sweep. Size before: " << oldCachedSLEsSize
<< "; size after: " << cachedSLEs_.size();
}

#ifdef RIPPLED_REPORTING
if (auto pg = dynamic_cast<PostgresDatabase*>(&*mRelationalDatabase))
Expand Down
4 changes: 4 additions & 0 deletions src/ripple/app/paths/PathRequests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,13 +200,17 @@ PathRequests::updateAll(std::shared_ptr<ReadView const> const& inLedger)
break;
}

// Hold on to the line cache until after the lock is released, so it can
// be destroyed outside of the lock
std::shared_ptr<RippleLineCache> lastCache;
{
// Get the latest requests, cache, and ledger for next pass
std::lock_guard sl(mLock);

if (requests_.empty())
break;
requests = requests_;
lastCache = cache;
cache = getLineCache(cache->getLedger(), false);
}
} while (!app_.getJobQueue().isStopping());
Expand Down
28 changes: 28 additions & 0 deletions src/ripple/consensus/Validations.h
Original file line number Diff line number Diff line change
Expand Up @@ -1142,6 +1142,34 @@ class Validations

return laggards;
}

std::size_t
sizeOfCurrentCache() const
{
std::lock_guard lock{mutex_};
return current_.size();
}

std::size_t
sizeOfSeqEnforcersCache() const
{
std::lock_guard lock{mutex_};
return seqEnforcers_.size();
}

std::size_t
sizeOfByLedgerCache() const
{
std::lock_guard lock{mutex_};
return byLedger_.size();
}

std::size_t
sizeOfBySequenceCache() const
{
std::lock_guard lock{mutex_};
return bySequence_.size();
}
};

} // namespace ripple
Expand Down
5 changes: 1 addition & 4 deletions src/ripple/ledger/CachedView.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,7 @@ class CachedViewImpl : public DigestAwareReadView
DigestAwareReadView const& base_;
CachedSLEs& cache_;
std::mutex mutable mutex_;
std::unordered_map<
key_type,
std::shared_ptr<SLE const>,
hardened_hash<>> mutable map_;
std::unordered_map<key_type, uint256, hardened_hash<>> mutable map_;

public:
CachedViewImpl() = delete;
Expand Down
Loading

0 comments on commit d9f90c8

Please sign in to comment.