Skip to content

Commit

Permalink
Drop duplicate SCP messages as early as possible
Browse files Browse the repository at this point in the history
  • Loading branch information
marta-lokhova committed Nov 15, 2024
1 parent 9fec186 commit 77dea72
Show file tree
Hide file tree
Showing 13 changed files with 117 additions and 56 deletions.
8 changes: 7 additions & 1 deletion src/main/AppConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "overlay/BanManager.h"
#include "overlay/OverlayManager.h"
#include "overlay/OverlayMetrics.h"
#include "overlay/Peer.h"
#include "util/Timer.h"

namespace stellar
Expand Down Expand Up @@ -34,7 +35,6 @@ AppConnector::getLedgerManager()
OverlayManager&
AppConnector::getOverlayManager()
{
releaseAssert(threadIsMain());
return mApp.getOverlayManager();
}

Expand Down Expand Up @@ -129,4 +129,10 @@ AppConnector::getOverlayMetrics()
return mApp.getOverlayManager().getOverlayMetrics();
}

bool
AppConnector::isScheduled(std::shared_ptr<MsgCapacityTracker> msgTracker)
{
return mApp.getOverlayManager().isScheduled(msgTracker);
}

}
2 changes: 2 additions & 0 deletions src/main/AppConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ struct OverlayMetrics;
class SorobanNetworkConfig;
class SorobanMetrics;
struct LedgerTxnDelta;
class MsgCapacityTracker;

// Helper class to isolate access to Application; all function helpers must
// either be called from main or be thread-sade
Expand Down Expand Up @@ -51,5 +52,6 @@ class AppConnector
Config const& getConfig() const;
bool overlayShuttingDown() const;
OverlayMetrics& getOverlayMetrics();
bool isScheduled(std::shared_ptr<MsgCapacityTracker> msgTracker);
};
}
4 changes: 2 additions & 2 deletions src/overlay/Floodgate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ Floodgate::clearBelow(uint32_t maxLedger)
}

bool
Floodgate::addRecord(StellarMessage const& msg, Peer::pointer peer, Hash& index)
Floodgate::addRecord(StellarMessage const& msg, Peer::pointer peer,
Hash const& index)
{
ZoneScoped;
index = xdrBlake2(msg);
if (mShuttingDown)
{
return false;
Expand Down
2 changes: 1 addition & 1 deletion src/overlay/Floodgate.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class Floodgate
// returns true if this is a new record
// fills msgID with msg's hash
bool addRecord(StellarMessage const& msg, Peer::pointer fromPeer,
Hash& msgID);
Hash const& msgID);

// returns true if msg was sent to at least one peer
// The hash required for transactions
Expand Down
2 changes: 0 additions & 2 deletions src/overlay/FlowControl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,6 @@ SendMoreCapacity
FlowControl::endMessageProcessing(StellarMessage const& msg)
{
ZoneScoped;
releaseAssert(threadIsMain());
std::lock_guard<std::mutex> guard(mFlowControlMutex);

mFloodDataProcessed += mFlowControlCapacity.releaseLocalCapacity(msg);
Expand Down Expand Up @@ -560,7 +559,6 @@ bool
FlowControl::stopThrottling()
{
std::lock_guard<std::mutex> guard(mFlowControlMutex);
releaseAssert(threadIsMain());
if (mLastThrottle)
{
CLOG_DEBUG(Overlay, "Stop throttling reading from peer {}",
Expand Down
12 changes: 9 additions & 3 deletions src/overlay/OverlayManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class OverlayManager
// Returns true if this is a new message
// fills msgID with msg's hash
virtual bool recvFloodedMsgID(StellarMessage const& msg, Peer::pointer peer,
Hash& msgID) = 0;
Hash const& msgID) = 0;

bool
recvFloodedMsg(StellarMessage const& msg, Peer::pointer peer)
Expand All @@ -100,8 +100,8 @@ class OverlayManager
}

// Process incoming transaction, pass it down to the transaction queue
virtual void recvTransaction(StellarMessage const& msg,
Peer::pointer peer) = 0;
virtual void recvTransaction(StellarMessage const& msg, Peer::pointer peer,
Hash const& index) = 0;

// removes msgID from the floodgate's internal state
// as it's not tracked anymore, calling "broadcast" with a (now forgotten)
Expand Down Expand Up @@ -211,5 +211,11 @@ class OverlayManager
virtual ~OverlayManager()
{
}

// Is message already referenced by the scheduler
// This method is always called from one thread, therefore no cache
// synchorization is needed
virtual bool
isScheduled(std::shared_ptr<MsgCapacityTracker> tracker) const = 0;
};
}
32 changes: 27 additions & 5 deletions src/overlay/OverlayManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ OverlayManagerImpl::OverlayManagerImpl(Application& app)
mApp.getConfig().TARGET_PEER_CONNECTIONS, mSurveyManager)
, mResolvingPeersWithBackoff(true)
, mResolvingPeersRetryCount(0)
, mScheduledMessages(100000, true)
{
mPeerSources[PeerType::INBOUND] = std::make_unique<RandomPeerSource>(
mPeerManager, RandomPeerSource::nextAttemptCutoff(PeerType::INBOUND));
Expand Down Expand Up @@ -1140,15 +1141,37 @@ OverlayManagerImpl::shufflePeerList(std::vector<Peer::pointer>& peerList)

bool
OverlayManagerImpl::recvFloodedMsgID(StellarMessage const& msg,
Peer::pointer peer, Hash& msgID)
Peer::pointer peer, Hash const& msgID)
{
ZoneScoped;
return mFloodGate.addRecord(msg, peer, msgID);
}

bool
OverlayManagerImpl::isScheduled(
std::shared_ptr<MsgCapacityTracker> tracker) const
{
releaseAssert(!threadIsMain() ||
!mApp.getConfig().EXPERIMENTAL_BACKGROUND_OVERLAY_PROCESSING);
if (!tracker->maybeGetHash())
{
return false;
}
auto index = tracker->maybeGetHash().value();
if (mScheduledMessages.exists(index))
{
if (mScheduledMessages.get(index).lock())
{
return true;
}
}
mScheduledMessages.put(index, std::weak_ptr<MsgCapacityTracker>(tracker));
return false;
}

void
OverlayManagerImpl::recvTransaction(StellarMessage const& msg,
Peer::pointer peer)
Peer::pointer peer, Hash const& index)
{
ZoneScoped;
auto transaction = TransactionFrameBase::makeTransactionFromWire(
Expand All @@ -1157,8 +1180,7 @@ OverlayManagerImpl::recvTransaction(StellarMessage const& msg,
{
// record that this peer sent us this transaction
// add it to the floodmap so that this peer gets credit for it
Hash msgID;
recvFloodedMsgID(msg, peer, msgID);
recvFloodedMsgID(msg, peer, index);

mTxDemandsManager.recordTxPullLatency(transaction->getFullHash(), peer);

Expand All @@ -1171,7 +1193,7 @@ OverlayManagerImpl::recvTransaction(StellarMessage const& msg,
addResult.code ==
TransactionQueue::AddResultCode::ADD_STATUS_DUPLICATE))
{
forgetFloodedMsg(msgID);
forgetFloodedMsg(index);
CLOG_DEBUG(Overlay,
"Peer::recvTransaction Discarded transaction {} from {}",
hexAbbrev(transaction->getFullHash()), peer->toString());
Expand Down
11 changes: 8 additions & 3 deletions src/overlay/OverlayManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,9 @@ class OverlayManagerImpl : public OverlayManager

void clearLedgersBelow(uint32_t ledgerSeq, uint32_t lclSeq) override;
bool recvFloodedMsgID(StellarMessage const& msg, Peer::pointer peer,
Hash& msgID) override;
void recvTransaction(StellarMessage const& msg,
Peer::pointer peer) override;
Hash const& msgID) override;
void recvTransaction(StellarMessage const& msg, Peer::pointer peer,
Hash const& index) override;
void forgetFloodedMsg(Hash const& msgID) override;
void recvTxDemand(FloodDemand const& dmd, Peer::pointer peer) override;
bool broadcastMessage(std::shared_ptr<StellarMessage const> msg,
Expand Down Expand Up @@ -181,6 +181,8 @@ class OverlayManagerImpl : public OverlayManager
std::future<ResolvedPeers> mResolvedPeers;
bool mResolvingPeersWithBackoff;
int mResolvingPeersRetryCount;
mutable RandomEvictionCache<Hash, std::weak_ptr<MsgCapacityTracker>>
mScheduledMessages;

void triggerPeerResolution();
std::pair<std::vector<PeerBareAddress>, bool>
Expand Down Expand Up @@ -214,5 +216,8 @@ class OverlayManagerImpl : public OverlayManager
// Returns `true` iff the overlay can accept the outbound peer at `address`.
// Logs whenever a peer cannot be accepted.
bool canAcceptOutboundPeer(PeerBareAddress const& address) const;

bool
isScheduled(std::shared_ptr<MsgCapacityTracker> tracker) const override;
};
}
Loading

0 comments on commit 77dea72

Please sign in to comment.