Skip to content

Commit

Permalink
De-dupe inbound SCP traffic (#4544)
Browse files Browse the repository at this point in the history
The primary goal of this change is to reduce burden on the main thread
scheduler by hashing and de-duping inbound SCP traffic. In simulations,
we've observed duplicate SCP message consume quite a bit of main
thread/scheduler capacity. With this change, the amount of duplicate SCP
traffic core processes is reduced by ~50%. Another small change in this
PR is to broadcast SCP messages immediately instead of scheduling them
asynchronously - SCP traffic is high priority, so there's no reason to
delay it like we do in master.
Attaching scheduler load comparison plus duplicate traffic inbound rate
for comparison
<img width="2299" alt="Screenshot 2024-11-14 at 16 25 25"
src="https://github.com/user-attachments/assets/e4f72401-a47a-4052-a5a7-0082ce7736df">
<img width="2305" alt="Screenshot 2024-11-14 at 16 28 54"
src="https://github.com/user-attachments/assets/9bcbc8e5-e763-4bc7-b93b-1cd4cf734456">
  • Loading branch information
graydon authored Nov 21, 2024
2 parents 4b4cd36 + dc4ad77 commit 93d0921
Show file tree
Hide file tree
Showing 22 changed files with 319 additions and 192 deletions.
8 changes: 8 additions & 0 deletions src/main/AppConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "overlay/BanManager.h"
#include "overlay/OverlayManager.h"
#include "overlay/OverlayMetrics.h"
#include "overlay/Peer.h"
#include "util/Timer.h"

namespace stellar
Expand Down Expand Up @@ -129,4 +130,11 @@ AppConnector::getOverlayMetrics()
return mApp.getOverlayManager().getOverlayMetrics();
}

bool
AppConnector::checkScheduledAndCache(
std::shared_ptr<CapacityTrackedMessage> msgTracker)
{
return mApp.getOverlayManager().checkScheduledAndCache(msgTracker);
}

}
4 changes: 4 additions & 0 deletions src/main/AppConnector.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ struct OverlayMetrics;
class SorobanNetworkConfig;
class SorobanMetrics;
struct LedgerTxnDelta;
class CapacityTrackedMessage;

// Helper class to isolate access to Application; all function helpers must
// either be called from main or be thread-sade
Expand Down Expand Up @@ -51,5 +52,8 @@ class AppConnector
Config const& getConfig() const;
bool overlayShuttingDown() const;
OverlayMetrics& getOverlayMetrics();
// This method is always exclusively called from one thread
bool
checkScheduledAndCache(std::shared_ptr<CapacityTrackedMessage> msgTracker);
};
}
2 changes: 1 addition & 1 deletion src/main/Config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ Config::Config() : NODE_SEED(SecretKey::random())
LEDGER_PROTOCOL_MIN_VERSION_INTERNAL_ERROR_REPORT = 18;

OVERLAY_PROTOCOL_MIN_VERSION = 33;
OVERLAY_PROTOCOL_VERSION = 35;
OVERLAY_PROTOCOL_VERSION = 36;

VERSION_STR = STELLAR_CORE_VERSION;

Expand Down
42 changes: 25 additions & 17 deletions src/overlay/Floodgate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ Floodgate::clearBelow(uint32_t maxLedger)
}

bool
Floodgate::addRecord(StellarMessage const& msg, Peer::pointer peer, Hash& index)
Floodgate::addRecord(StellarMessage const& msg, Peer::pointer peer,
Hash const& index)
{
ZoneScoped;
index = xdrBlake2(msg);
if (mShuttingDown)
{
return false;
Expand Down Expand Up @@ -145,21 +145,29 @@ Floodgate::broadcast(std::shared_ptr<StellarMessage const> msg,
else
{
mSendFromBroadcast.Mark();
std::weak_ptr<Peer> weak(
std::static_pointer_cast<Peer>(peer.second));
// This is an async operation, and peer might get dropped by the
// time we actually try to send the message. This is fine, as
// sendMessage will just be a no-op in that case
mApp.postOnMainThread(
[msg, weak, log = !broadcasted]() {
auto strong = weak.lock();
if (strong)
{
strong->sendMessage(msg, log);
}
},
fmt::format(FMT_STRING("broadcast to {}"),
peer.second->toString()));

if (msg->type() == SCP_MESSAGE)
{
peer.second->sendMessage(msg, !broadcasted);
}
else
{
// This is an async operation, and peer might get dropped by
// the time we actually try to send the message. This is
// fine, as sendMessage will just be a no-op in that case
std::weak_ptr<Peer> weak(
std::static_pointer_cast<Peer>(peer.second));
mApp.postOnMainThread(
[msg, weak, log = !broadcasted]() {
auto strong = weak.lock();
if (strong)
{
strong->sendMessage(msg, log);
}
},
fmt::format(FMT_STRING("broadcast to {}"),
peer.second->toString()));
}
}
broadcasted = true;
}
Expand Down
2 changes: 1 addition & 1 deletion src/overlay/Floodgate.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class Floodgate
// returns true if this is a new record
// fills msgID with msg's hash
bool addRecord(StellarMessage const& msg, Peer::pointer fromPeer,
Hash& msgID);
Hash const& msgID);

// returns true if msg was sent to at least one peer
// The hash required for transactions
Expand Down
9 changes: 3 additions & 6 deletions src/overlay/FlowControl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ FlowControl::FlowControl(AppConnector& connector, bool useBackgroundThread)
: mFlowControlCapacity(connector.getConfig(), mNodeID)
, mFlowControlBytesCapacity(
connector.getConfig(), mNodeID,
connector.getOverlayManager().getFlowControlBytesConfig().mTotal)
connector.getOverlayManager().getFlowControlBytesTotal())
, mOverlayMetrics(connector.getOverlayManager().getOverlayMetrics())
, mAppConnector(connector)
, mUseBackgroundThread(useBackgroundThread)
Expand Down Expand Up @@ -240,7 +240,6 @@ SendMoreCapacity
FlowControl::endMessageProcessing(StellarMessage const& msg)
{
ZoneScoped;
releaseAssert(threadIsMain());
std::lock_guard<std::mutex> guard(mFlowControlMutex);

mFloodDataProcessed += mFlowControlCapacity.releaseLocalCapacity(msg);
Expand All @@ -252,9 +251,8 @@ FlowControl::endMessageProcessing(StellarMessage const& msg)
bool shouldSendMore =
mFloodDataProcessed ==
mAppConnector.getConfig().FLOW_CONTROL_SEND_MORE_BATCH_SIZE;
auto const byteBatchSize = mAppConnector.getOverlayManager()
.getFlowControlBytesConfig()
.mBatchSize;
auto const byteBatchSize =
OverlayManager::getFlowControlBytesBatch(mAppConnector.getConfig());
shouldSendMore =
shouldSendMore || mFloodDataProcessedBytes >= byteBatchSize;

Expand Down Expand Up @@ -560,7 +558,6 @@ bool
FlowControl::stopThrottling()
{
std::lock_guard<std::mutex> guard(mFlowControlMutex);
releaseAssert(threadIsMain());
if (mLastThrottle)
{
CLOG_DEBUG(Overlay, "Stop throttling reading from peer {}",
Expand Down
28 changes: 15 additions & 13 deletions src/overlay/OverlayManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
// under the Apache License, Version 2.0. See the COPYING file at the root
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0

#include "crypto/BLAKE2.h"
#include "overlay/Peer.h"

/**
Expand All @@ -22,7 +23,7 @@
* The `StellarMessage` union contains 3 logically distinct kinds of message:
*
* - Messages directed to or from a specific peer, with or without a response:
* HELLO, GET_PEERS, PEERS, DONT_HAVE, ERROR_MSG
* HELLO, PEERS, DONT_HAVE, ERROR_MSG
*
* - One-way broadcast messages informing other peers of an event:
* TRANSACTION and SCP_MESSAGE
Expand Down Expand Up @@ -54,19 +55,14 @@ struct StellarMessage;
class OverlayManager
{
public:
struct AdjustedFlowControlConfig
{
uint32_t mTotal;
uint32_t mBatchSize;
};

static int constexpr MIN_INBOUND_FACTOR = 3;

static std::unique_ptr<OverlayManager> create(Application& app);

// Drop all PeerRecords from the Database
static void dropAll(Database& db);
static bool isFloodMessage(StellarMessage const& msg);
static uint32_t getFlowControlBytesBatch(Config const& cfg);

// Flush all FloodGate and ItemFetcher state for ledgers older than
// `ledgerSeq`.
Expand All @@ -90,18 +86,17 @@ class OverlayManager
// Returns true if this is a new message
// fills msgID with msg's hash
virtual bool recvFloodedMsgID(StellarMessage const& msg, Peer::pointer peer,
Hash& msgID) = 0;
Hash const& msgID) = 0;

bool
recvFloodedMsg(StellarMessage const& msg, Peer::pointer peer)
{
Hash msgID;
return recvFloodedMsgID(msg, peer, msgID);
return recvFloodedMsgID(msg, peer, xdrBlake2(msg));
}

// Process incoming transaction, pass it down to the transaction queue
virtual void recvTransaction(StellarMessage const& msg,
Peer::pointer peer) = 0;
virtual void recvTransaction(StellarMessage const& msg, Peer::pointer peer,
Hash const& index) = 0;

// removes msgID from the floodgate's internal state
// as it's not tracked anymore, calling "broadcast" with a (now forgotten)
Expand Down Expand Up @@ -207,9 +202,16 @@ class OverlayManager

virtual void recordMessageMetric(StellarMessage const& stellarMsg,
Peer::pointer peer) = 0;
virtual AdjustedFlowControlConfig getFlowControlBytesConfig() const = 0;
virtual uint32_t getFlowControlBytesTotal() const = 0;

virtual ~OverlayManager()
{
}

// Is message already referenced by the scheduler
// This method is always called from one thread, therefore no cache
// synchorization is needed
virtual bool
checkScheduledAndCache(std::shared_ptr<CapacityTrackedMessage> tracker) = 0;
};
}
63 changes: 49 additions & 14 deletions src/overlay/OverlayManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ OverlayManagerImpl::OverlayManagerImpl(Application& app)
mApp.getConfig().TARGET_PEER_CONNECTIONS, mSurveyManager)
, mResolvingPeersWithBackoff(true)
, mResolvingPeersRetryCount(0)
, mScheduledMessages(100000, true)
{
mPeerSources[PeerType::INBOUND] = std::make_unique<RandomPeerSource>(
mPeerManager, RandomPeerSource::nextAttemptCutoff(PeerType::INBOUND));
Expand Down Expand Up @@ -356,9 +357,10 @@ OverlayManagerImpl::start()
mTxDemandsManager.start();
}

OverlayManager::AdjustedFlowControlConfig
OverlayManagerImpl::getFlowControlBytesConfig() const
uint32_t
OverlayManagerImpl::getFlowControlBytesTotal() const
{
releaseAssert(threadIsMain());
auto const maxTxSize = mApp.getHerder().getMaxTxSize();
releaseAssert(maxTxSize > 0);
auto const& cfg = mApp.getConfig();
Expand All @@ -373,17 +375,26 @@ OverlayManagerImpl::getFlowControlBytesConfig() const
INITIAL_FLOW_CONTROL_SEND_MORE_BATCH_SIZE_BYTES >=
maxTxSize))
{
return {static_cast<uint32_t>(maxTxSize) +
INITIAL_FLOW_CONTROL_SEND_MORE_BATCH_SIZE_BYTES,
INITIAL_FLOW_CONTROL_SEND_MORE_BATCH_SIZE_BYTES};
return maxTxSize + INITIAL_FLOW_CONTROL_SEND_MORE_BATCH_SIZE_BYTES;
}
return {INITIAL_PEER_FLOOD_READING_CAPACITY_BYTES,
INITIAL_FLOW_CONTROL_SEND_MORE_BATCH_SIZE_BYTES};
return INITIAL_PEER_FLOOD_READING_CAPACITY_BYTES;
}

// If flow control parameters were provided, return them
return {cfg.PEER_FLOOD_READING_CAPACITY_BYTES,
cfg.FLOW_CONTROL_SEND_MORE_BATCH_SIZE_BYTES};
return cfg.PEER_FLOOD_READING_CAPACITY_BYTES;
}

uint32_t
OverlayManager::getFlowControlBytesBatch(Config const& cfg)
{
if (cfg.PEER_FLOOD_READING_CAPACITY_BYTES == 0 &&
cfg.FLOW_CONTROL_SEND_MORE_BATCH_SIZE_BYTES == 0)
{
return INITIAL_FLOW_CONTROL_SEND_MORE_BATCH_SIZE_BYTES;
}

// If flow control parameters were provided, return them
return cfg.FLOW_CONTROL_SEND_MORE_BATCH_SIZE_BYTES;
}

void
Expand Down Expand Up @@ -1140,15 +1151,40 @@ OverlayManagerImpl::shufflePeerList(std::vector<Peer::pointer>& peerList)

bool
OverlayManagerImpl::recvFloodedMsgID(StellarMessage const& msg,
Peer::pointer peer, Hash& msgID)
Peer::pointer peer, Hash const& msgID)
{
ZoneScoped;
return mFloodGate.addRecord(msg, peer, msgID);
}

bool
OverlayManagerImpl::checkScheduledAndCache(
std::shared_ptr<CapacityTrackedMessage> tracker)
{
#ifndef BUILD_TESTS
releaseAssert(!threadIsMain() ||
!mApp.getConfig().BACKGROUND_OVERLAY_PROCESSING);
#endif
if (!tracker->maybeGetHash())
{
return false;
}
auto index = tracker->maybeGetHash().value();
if (mScheduledMessages.exists(index))
{
if (mScheduledMessages.get(index).lock())
{
return true;
}
}
mScheduledMessages.put(index,
std::weak_ptr<CapacityTrackedMessage>(tracker));
return false;
}

void
OverlayManagerImpl::recvTransaction(StellarMessage const& msg,
Peer::pointer peer)
Peer::pointer peer, Hash const& index)
{
ZoneScoped;
auto transaction = TransactionFrameBase::makeTransactionFromWire(
Expand All @@ -1157,8 +1193,7 @@ OverlayManagerImpl::recvTransaction(StellarMessage const& msg,
{
// record that this peer sent us this transaction
// add it to the floodmap so that this peer gets credit for it
Hash msgID;
recvFloodedMsgID(msg, peer, msgID);
recvFloodedMsgID(msg, peer, index);

mTxDemandsManager.recordTxPullLatency(transaction->getFullHash(), peer);

Expand All @@ -1171,7 +1206,7 @@ OverlayManagerImpl::recvTransaction(StellarMessage const& msg,
addResult.code ==
TransactionQueue::AddResultCode::ADD_STATUS_DUPLICATE))
{
forgetFloodedMsg(msgID);
forgetFloodedMsg(index);
CLOG_DEBUG(Overlay,
"Peer::recvTransaction Discarded transaction {} from {}",
hexAbbrev(transaction->getFullHash()), peer->toString());
Expand Down
13 changes: 9 additions & 4 deletions src/overlay/OverlayManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,9 @@ class OverlayManagerImpl : public OverlayManager

void clearLedgersBelow(uint32_t ledgerSeq, uint32_t lclSeq) override;
bool recvFloodedMsgID(StellarMessage const& msg, Peer::pointer peer,
Hash& msgID) override;
void recvTransaction(StellarMessage const& msg,
Peer::pointer peer) override;
Hash const& msgID) override;
void recvTransaction(StellarMessage const& msg, Peer::pointer peer,
Hash const& index) override;
void forgetFloodedMsg(Hash const& msgID) override;
void recvTxDemand(FloodDemand const& dmd, Peer::pointer peer) override;
bool broadcastMessage(std::shared_ptr<StellarMessage const> msg,
Expand Down Expand Up @@ -181,6 +181,8 @@ class OverlayManagerImpl : public OverlayManager
std::future<ResolvedPeers> mResolvedPeers;
bool mResolvingPeersWithBackoff;
int mResolvingPeersRetryCount;
RandomEvictionCache<Hash, std::weak_ptr<CapacityTrackedMessage>>
mScheduledMessages;

void triggerPeerResolution();
std::pair<std::vector<PeerBareAddress>, bool>
Expand Down Expand Up @@ -209,10 +211,13 @@ class OverlayManagerImpl : public OverlayManager
void extractPeersFromMap(std::map<NodeID, Peer::pointer> const& peerMap,
std::vector<Peer::pointer>& result);
void shufflePeerList(std::vector<Peer::pointer>& peerList);
AdjustedFlowControlConfig getFlowControlBytesConfig() const override;
uint32_t getFlowControlBytesTotal() const override;

// Returns `true` iff the overlay can accept the outbound peer at `address`.
// Logs whenever a peer cannot be accepted.
bool canAcceptOutboundPeer(PeerBareAddress const& address) const;

bool checkScheduledAndCache(
std::shared_ptr<CapacityTrackedMessage> tracker) override;
};
}
Loading

0 comments on commit 93d0921

Please sign in to comment.