diff --git a/Builds/VisualStudio/stellar-core.vcxproj b/Builds/VisualStudio/stellar-core.vcxproj
index 3ad051f4d9..2c53d25e83 100644
--- a/Builds/VisualStudio/stellar-core.vcxproj
+++ b/Builds/VisualStudio/stellar-core.vcxproj
@@ -613,6 +613,7 @@ exit /b 0
+
@@ -1031,6 +1032,7 @@ exit /b 0
+
diff --git a/Builds/VisualStudio/stellar-core.vcxproj.filters b/Builds/VisualStudio/stellar-core.vcxproj.filters
index 77f369bc8e..aee05a0099 100644
--- a/Builds/VisualStudio/stellar-core.vcxproj.filters
+++ b/Builds/VisualStudio/stellar-core.vcxproj.filters
@@ -1065,6 +1065,9 @@
overlay
+
+ overlay
+
overlay
@@ -2156,6 +2159,9 @@
overlay
+
+ overlay
+
overlay
diff --git a/docs/metrics.md b/docs/metrics.md
index 9a5bc5f073..56e8de21d2 100644
--- a/docs/metrics.md
+++ b/docs/metrics.md
@@ -133,8 +133,12 @@ overlay.outbound.establish | meter | outbound connection esta
overlay.recv. | timer | received message
overlay.send. | meter | sent message
overlay.timeout.idle | meter | idle peer timeout
+overlay.recv.start-survey-collecting | timer | time spent in processing request to start survey collecting phase
+overlay.recv.stop-survey-collecting | timer | time spent in processing request to stop survey collecting phase
overlay.recv.survey-request | timer | time spent in processing survey request
overlay.recv.survey-response | timer | time spent in processing survey response
+overlay.send.start-survey-collecting | timer | sent request to start survey collecting phase
+overlay.send.stop-survey-collecting | timer | sent request to stop survey collecting phase
overlay.send.survey-request | meter | sent survey request
overlay.send.survey-response | meter | sent survey response
process.action.queue | counter | number of items waiting in internal action-queue
diff --git a/docs/software/admin.md b/docs/software/admin.md
index 0d119d9f56..34a4c128d3 100644
--- a/docs/software/admin.md
+++ b/docs/software/admin.md
@@ -764,12 +764,20 @@ There is a survey mechanism in the overlay that allows a validator to request co
By default, a node will relay or respond to a survey message if the message originated from a node in the receiving nodes transitive quorum. This behavior can be overridden by setting `SURVEYOR_KEYS` in the config file to a more restrictive set of nodes to relay or respond to.
+The survey works in two phases: the collecting phase, and the reporting phase. During the collecting phase, nodes record information about themselves and their peers, such as the number of messages sent to a given peer. During the reporting phase, the surveyor requests the results of the collecting phase from nodes on the network.
+
+The surveyor begins the collecting phase by broadcasting a `TimeSlicedSurveyStartCollectingMessage`. The surveyor ends the collecting phase and initiates the reporting phase by broadcasting a `TimeSlicedSurveyStopCollectingMessage`. These start/stop collecting messages ensure that the collecting phase is roughly equal for all nodes present for the duration of the collecting phase.
+
+During the reporting phase, the surveyor sends `TimeSlicedSurveyRequestMessage`s to individual nodes to gather the information the node recorded during the collecting phase.
+
##### Example survey command
In this example, we have three nodes `GBBN`, `GDEX`, and `GBUI` (we'll refer to them by the first four letters of their public keys). We will execute the commands below from `GBUI`, and note that `GBBN` has `SURVEYOR_KEYS=["$self"]` in it's config file, so `GBBN` will not relay or respond to any survey messages.
- 1. `$ stellar-core http-command 'surveytopology?duration=1000&node=GBBNXPPGDFDUQYH6RT5VGPDSOWLZEXXFD3ACUPG5YXRHLTATTUKY42CL'`
- 2. `$ stellar-core http-command 'surveytopology?duration=1000&node=GDEXJV6XKKLDUWKTSXOOYVOYWZGVNIKKQ7GVNR5FOV7VV5K4MGJT5US4'`
+ 1. `$ stellar-core http-command 'startsurveycollecting?nonce=1234'`
+ 1. `$ stellar-core http-command 'stopsurveycollecting?nonce=1234'`
+ 1. `$ stellar-core http-command 'surveytopologytimesliced?node=GBBNXPPGDFDUQYH6RT5VGPDSOWLZEXXFD3ACUPG5YXRHLTATTUKY42CL&inboundpeerindex=0&outboundpeerindex=0'`
+ 2. `$ stellar-core http-command 'surveytopologytimesliced?node=GDEXJV6XKKLDUWKTSXOOYVOYWZGVNIKKQ7GVNR5FOV7VV5K4MGJT5US4&inboundpeerindex=0&outboundpeerindex=0'`
3. `$ stellar-core http-command 'getsurveyresult'`
Once the responses are received, the `getsurveyresult` command will return a result like this:
@@ -821,6 +829,12 @@ Once the responses are received, the `getsurveyresult` command will return a res
"numTotalOutboundPeers" : 0,
"maxInboundPeerCount" : 64,
"maxOutboundPeerCount" : 8,
+ "addedAuthenticatedPeers" : 0,
+ "droppedAuthenticatedPeers" : 0,
+ "p75SCPFirstToSelfLatencyNs" : 121042,
+ "p75SCPSelfToOtherLatencyNs" : 112452,
+ "lostSyncCount" : 0,
+ "isValidator" : false,
"outboundPeers" : null
}
}
@@ -835,6 +849,7 @@ Notable field definitions
* `badResponseNodes` : List of nodes that sent a malformed response
* `topology` : Map of nodes to connection information
* `inboundPeers`/`outboundPeers` : List of connection information by nodes
+ * `averageLatencyMs` : Average latency with this peer in milliseconds.
* `bytesRead`: The total number of bytes read from this peer.
* `bytesWritten`: The total number of bytes written to this peer.
* `duplicateFetchBytesRecv`: The number of bytes received that were duplicate transaction sets and quorum sets.
@@ -853,6 +868,12 @@ Notable field definitions
* `numTotalInboundPeers`/`numTotalOutboundPeers` : The number of total inbound and outbound peers this node is connected to. The response will have a random subset of 25 connected peers per direction (inbound/outbound). These fields tell you if you're missing nodes so you can send another request out to get another random subset of nodes.
* `maxInboundPeerCount`/`maxOutboundPeerCount` : The number of total inbound and outbound peers that this node can accept. These fields correspond to stellar-core configurations `MAX_ADDITIONAL_PEER_CONNECTIONS` and `TARGET_PEER_CONNECTIONS`, respectively.
+ * `addedAuthenticatedPeers` : The number of authenticated peers added.
+ * `droppedAuthenticatedPeers` : The number of authenticated peers dropped.
+ * `p75SCPFirstToSelfLatencyNs` : 75th percentile latency to hear about new SCP messages in nanoseconds.
+ * `p75SCPSelfToOtherLatencyNs` : 75th percentile latency for other nodes to hear this node's SCP messages in nanoseconds.
+ * `lostSyncCount` : The number of times this node lost sync.
+ * `isValidator` : Is this node a validator?
### Quorum Health
diff --git a/docs/software/commands.md b/docs/software/commands.md
index a99491299a..64c2e67e91 100644
--- a/docs/software/commands.md
+++ b/docs/software/commands.md
@@ -357,25 +357,66 @@ format.
* **surveytopology**
`surveytopology?duration=DURATION&node=NODE_ID`
+ **This command is deprecated and will be removed in a future release. Use the
+ new time sliced survey interface instead (`startsurveycollecting`,
+ `stopsurveycollecting`, `surveytopologytimesliced`, and `getsurveyresults`).**
Starts a survey that will request peer connectivity information from nodes
in the backlog. `DURATION` is the number of seconds this survey will run
for, and `NODE_ID` is the public key you will add to the backlog to survey.
Running this command while the survey is running will add the node to the
- backlog and reset the timer to run for `DURATION` seconds. By default, this
- node will respond to/relay a survey message if the message originated
- from a node in it's transitive quorum. This behaviour can be overridden by adding
- keys to `SURVEYOR_KEYS` in the config file, which will be the set of keys to check
- instead of the transitive quorum. If you would like to opt-out of this survey mechanism,
- just set `SURVEYOR_KEYS` to `$self` or a bogus key
+ backlog and reset the timer to run for `DURATION` seconds. See [Changing
+ default survey behavior](#changing-default-survey-behavior) for details about
+ the default survey behavior, as well as how to change that behavior or opt-out
+ entirely.
* **stopsurvey**
`stopsurvey`
+ **This command is deprecated and will be removed in a future release. It is no
+ longer necessary to explicitly stop a survey in the new time sliced survey
+ interface as these surveys expire automatically.**
Will stop the survey if one is running. Noop if no survey is running
+* **startsurveycollecting**
+ `startsurveycollecting?nonce=NONCE`
+ Start a survey in the collecting phase with a given nonce. Does nothing if a
+ survey is already running on the network as only one survey may run at a time.
+ See [Changing default survey behavior](#changing-default-survey-behavior) for
+ details about the default survey behavior, as well as how to change that
+ behavior or opt-out entirely.
+
+* **stopsurveycollecting**
+ `stopsurveycollecting`
+ Stop the collecting phase of the survey started in the previous
+ `startsurveycollecting` command. Moves the survey into the reporting phase.
+ Does nothing if no survey is running, or if a different node is running the
+ active survey.
+
+* **surveytopologytimesliced**
+ `surveytopologytimesliced?node=NODE_ID&inboundpeerindex=INBOUND_INDEX&outboundpeerindex=OUTBOUND_INDEX`
+ During the reporting phase of a survey, invoke this command to request
+ information recorded during the collecting phase from `NODE_ID`. This command
+ adds the survey request to a backlog; it does not immediately send the
+ request. Use `getsurveyresult` to see the response. A response will include
+ information about up to 25 inbound and outbound peers respectively. If a node
+ has more than 25 inbound and/or outbound peers, you will need to survey the
+ node multiple times to get the complete peer list. You can request peers
+ starting from a specific index in each peer list by setting `INBOUND_INDEX`
+ and `OUTBOUND_INDEX` appropriately. See [Changing default survey
+ behavior](#changing-default-survey-behavior) for details about the default
+ survey behavior, as well as how to change that behavior or opt-out entirely.
+
* **getsurveyresult**
`getsurveyresult`
Returns the current survey results. The results will be reset every time a new survey
- is started
+ is started. Use this command for both the time sliced survey interface as well
+ as the old deprecated survey interface.
+
+#### Changing default survey behavior
+By default, this node will respond to/relay a survey message if the message
+originated from a node in its transitive quorum. This behavior can be overridden
+by adding keys to `SURVEYOR_KEYS` in the config file, which will be the set of
+keys to check instead of the transitive quorum. If you would like to opt-out of
+this survey mechanism, just set `SURVEYOR_KEYS` to `$self` or a bogus key
### The following HTTP commands are exposed on test instances
* **generateload** `generateload[?mode=
diff --git a/docs/stellar-core_example.cfg b/docs/stellar-core_example.cfg
index 45b3a85c3d..ccb7980fcc 100644
--- a/docs/stellar-core_example.cfg
+++ b/docs/stellar-core_example.cfg
@@ -495,6 +495,13 @@ ALLOW_LOCALHOST_FOR_TESTING=false
# before applying transactions.
CATCHUP_WAIT_MERGES_TX_APPLY_FOR_TESTING=false
+# ARTIFICIALLY_SET_SURVEY_PHASE_DURATION_FOR_TESTING (in minutes), defaults to
+# no override. Overrides the maximum survey phase duration for both the
+# collecting and reporting phase to the specified value. Performs no override if
+# set to 0. Do not use in production. This option is ignored in builds without
+# tests enabled.
+ARTIFICIALLY_SET_SURVEY_PHASE_DURATION_FOR_TESTING=0
+
# PEER_READING_CAPACITY defaults to 200
# Controls how many messages from a particular peer
# core can process simultaneously, and throttles reading from a peer when at
diff --git a/hash-xdrs.sh b/hash-xdrs.sh
index 2062fa5ceb..42c4d7cfa7 100755
--- a/hash-xdrs.sh
+++ b/hash-xdrs.sh
@@ -25,7 +25,11 @@ namespace stellar {
extern const std::vector> XDR_FILES_SHA256 = {
EOF
-sha256sum -b $1/xdr/*.x | grep -v Stellar-internal | perl -pe 's/([a-f0-9]+)[ \*]+(.*)/{"$2", "$1"},/'
+# Hashes to ignore
+IGNORE="Stellar-internal\|Stellar-overlay\|Stellar-contract-spec\|Stellar-contract-meta\|Stellar-contract-env-meta"
-echo '{"", ""}};'
+sha256sum -b $1/xdr/*.x | grep -v "${IGNORE}" | perl -pe 's/([a-f0-9]+)[ \*]+(.*)/{"$2", "$1"},/'
+
+# Add empty entries for the 5 skipped files
+echo '{"", ""}, {"", ""}, {"", ""}, {"", ""}, {"", ""}};'
echo '}'
diff --git a/src/herder/HerderSCPDriver.cpp b/src/herder/HerderSCPDriver.cpp
index 5e5e65f251..e5b9394e65 100644
--- a/src/herder/HerderSCPDriver.cpp
+++ b/src/herder/HerderSCPDriver.cpp
@@ -13,6 +13,8 @@
#include "ledger/LedgerManager.h"
#include "main/Application.h"
#include "main/ErrorMessages.h"
+#include "overlay/OverlayManager.h"
+#include "overlay/SurveyManager.h"
#include "scp/SCP.h"
#include "scp/Slot.h"
#include "util/Logging.h"
@@ -1034,6 +1036,13 @@ HerderSCPDriver::recordSCPExternalizeEvent(uint64_t slotIndex, NodeID const& id,
mSCPMetrics.mFirstToSelfExternalizeLag,
"first to self externalize lag",
std::chrono::nanoseconds::zero(), slotIndex);
+ mApp.getOverlayManager().getSurveyManager().modifyNodeData(
+ [&](CollectingNodeData& nd) {
+ nd.mSCPFirstToSelfLatencyNsHistogram.Update(
+ std::chrono::duration_cast(
+ now - *timing.mFirstExternalize)
+ .count());
+ });
}
if (!timing.mSelfExternalize || forceUpdateSelf)
{
@@ -1052,6 +1061,13 @@ HerderSCPDriver::recordSCPExternalizeEvent(uint64_t slotIndex, NodeID const& id,
fmt::format(FMT_STRING("self to {} externalize lag"),
toShortString(id)),
std::chrono::nanoseconds::zero(), slotIndex);
+ mApp.getOverlayManager().getSurveyManager().modifyNodeData(
+ [&](CollectingNodeData& nd) {
+ nd.mSCPSelfToOtherLatencyNsHistogram.Update(
+ std::chrono::duration_cast(
+ now - *timing.mFirstExternalize)
+ .count());
+ });
}
// Record lag for other nodes
diff --git a/src/main/CommandHandler.cpp b/src/main/CommandHandler.cpp
index 1c0d960c97..46fc9e5356 100644
--- a/src/main/CommandHandler.cpp
+++ b/src/main/CommandHandler.cpp
@@ -102,6 +102,11 @@ CommandHandler::CommandHandler(Application& app) : mApp(app)
#ifndef BUILD_TESTS
addRoute("getsurveyresult", &CommandHandler::getSurveyResult);
addRoute("surveytopology", &CommandHandler::surveyTopology);
+ addRoute("startsurveycollecting",
+ &CommandHandler::startSurveyCollecting);
+ addRoute("stopsurveycollecting", &CommandHandler::stopSurveyCollecting);
+ addRoute("surveytopologytimesliced",
+ &CommandHandler::surveyTopologyTimeSliced);
#endif
addRoute("unban", &CommandHandler::unban);
}
@@ -125,6 +130,10 @@ CommandHandler::CommandHandler(Application& app) : mApp(app)
addRoute("testtx", &CommandHandler::testTx);
addRoute("getsurveyresult", &CommandHandler::getSurveyResult);
addRoute("surveytopology", &CommandHandler::surveyTopology);
+ addRoute("startsurveycollecting", &CommandHandler::startSurveyCollecting);
+ addRoute("stopsurveycollecting", &CommandHandler::stopSurveyCollecting);
+ addRoute("surveytopologytimesliced",
+ &CommandHandler::surveyTopologyTimeSliced);
#endif
}
@@ -1125,16 +1134,27 @@ CommandHandler::clearMetrics(std::string const& params, std::string& retStr)
}
void
-CommandHandler::surveyTopology(std::string const& params, std::string& retStr)
+CommandHandler::checkBooted() const
{
- ZoneScoped;
-
if (mApp.getState() == Application::APP_CREATED_STATE ||
mApp.getHerder().getState() == Herder::HERDER_BOOTING_STATE)
{
throw std::runtime_error(
"Application is not fully booted, try again later");
}
+}
+
+void
+CommandHandler::surveyTopology(std::string const& params, std::string& retStr)
+{
+ ZoneScoped;
+
+ CLOG_WARNING(
+ Overlay,
+ "`surveytopology` is deprecated and will be removed in a future "
+ "release. Please use the new time sliced survey interface.");
+
+ checkBooted();
std::map map;
http::server::server::parseParams(params, map);
@@ -1146,11 +1166,12 @@ CommandHandler::surveyTopology(std::string const& params, std::string& retStr)
auto& surveyManager = mApp.getOverlayManager().getSurveyManager();
- bool success = surveyManager.startSurvey(
+ bool success = surveyManager.startSurveyReporting(
SurveyMessageCommandType::SURVEY_TOPOLOGY, duration);
surveyManager.addNodeToRunningSurveyBacklog(
- SurveyMessageCommandType::SURVEY_TOPOLOGY, duration, id);
+ SurveyMessageCommandType::SURVEY_TOPOLOGY, duration, id, std::nullopt,
+ std::nullopt);
retStr = "Adding node.";
retStr += success ? "Survey started " : "Survey already running!";
@@ -1160,8 +1181,11 @@ void
CommandHandler::stopSurvey(std::string const&, std::string& retStr)
{
ZoneScoped;
+ CLOG_WARNING(Overlay,
+ "`stopsurvey` is deprecated and will be removed in a future "
+ "release. Please use the new time sliced survey interface.");
auto& surveyManager = mApp.getOverlayManager().getSurveyManager();
- surveyManager.stopSurvey();
+ surveyManager.stopSurveyReporting();
retStr = "survey stopped";
}
@@ -1173,6 +1197,79 @@ CommandHandler::getSurveyResult(std::string const&, std::string& retStr)
retStr = surveyManager.getJsonResults().toStyledString();
}
+void
+CommandHandler::startSurveyCollecting(std::string const& params,
+ std::string& retStr)
+{
+ ZoneScoped;
+ checkBooted();
+
+ std::map map;
+ http::server::server::parseParams(params, map);
+
+ uint32_t const nonce = parseRequiredParam(map, "nonce");
+
+ auto& surveyManager = mApp.getOverlayManager().getSurveyManager();
+ if (surveyManager.broadcastStartSurveyCollecting(nonce))
+ {
+ retStr = "Requested network to start survey collecting.";
+ }
+ else
+ {
+ retStr = "Failed to start survey collecting. Another survey is active "
+ "on the network.";
+ }
+}
+
+void
+CommandHandler::stopSurveyCollecting(std::string const&, std::string& retStr)
+{
+ ZoneScoped;
+ checkBooted();
+
+ auto& surveyManager = mApp.getOverlayManager().getSurveyManager();
+ if (surveyManager.broadcastStopSurveyCollecting())
+ {
+ retStr = "Requested network to stop survey collecting.";
+ }
+ else
+ {
+ retStr = "Failed to stop survey collecting. No survey is active on the "
+ "network.";
+ }
+}
+
+void
+CommandHandler::surveyTopologyTimeSliced(std::string const& params,
+ std::string& retStr)
+{
+ ZoneScoped;
+ checkBooted();
+
+ std::map map;
+ http::server::server::parseParams(params, map);
+
+ auto idString = parseRequiredParam(map, "node");
+ NodeID id = KeyUtils::fromStrKey(idString);
+ auto inboundPeerIndex = parseRequiredParam(map, "inboundpeerindex");
+ auto outboundPeerIndex =
+ parseRequiredParam(map, "outboundpeerindex");
+
+ auto& surveyManager = mApp.getOverlayManager().getSurveyManager();
+
+ bool success = surveyManager.startSurveyReporting(
+ SurveyMessageCommandType::TIME_SLICED_SURVEY_TOPOLOGY,
+ /*surveyDuration*/ std::nullopt);
+
+ surveyManager.addNodeToRunningSurveyBacklog(
+ SurveyMessageCommandType::TIME_SLICED_SURVEY_TOPOLOGY,
+ /*surveyDuration*/ std::nullopt, id, inboundPeerIndex,
+ outboundPeerIndex);
+ retStr = "Adding node.";
+
+ retStr += success ? "Survey started " : "Survey already running!";
+}
+
#ifdef BUILD_TESTS
void
CommandHandler::generateLoad(std::string const& params, std::string& retStr)
diff --git a/src/main/CommandHandler.h b/src/main/CommandHandler.h
index 73eb64b780..4ce87d79b9 100644
--- a/src/main/CommandHandler.h
+++ b/src/main/CommandHandler.h
@@ -68,6 +68,13 @@ class CommandHandler
void stopSurvey(std::string const&, std::string& retStr);
void getSurveyResult(std::string const&, std::string& retStr);
void sorobanInfo(std::string const&, std::string& retStr);
+ void startSurveyCollecting(std::string const& params, std::string& retStr);
+ void stopSurveyCollecting(std::string const& params, std::string& retStr);
+ void surveyTopologyTimeSliced(std::string const& params,
+ std::string& retStr);
+
+ // Checks if stellar-core is booted and throws an exception if not.
+ void checkBooted() const;
#ifdef BUILD_TESTS
void generateLoad(std::string const& params, std::string& retStr);
diff --git a/src/main/Config.cpp b/src/main/Config.cpp
index 02fd9e4b7a..84e65212cb 100644
--- a/src/main/Config.cpp
+++ b/src/main/Config.cpp
@@ -64,6 +64,7 @@ static const std::unordered_set TESTING_ONLY_OPTIONS = {
"LOADGEN_INSTRUCTIONS_FOR_TESTING",
"LOADGEN_INSTRUCTIONS_DISTRIBUTION_FOR_TESTING"
"CATCHUP_WAIT_MERGES_TX_APPLY_FOR_TESTING",
+ "ARTIFICIALLY_SET_SURVEY_PHASE_DURATION_FOR_TESTING",
"ARTIFICIALLY_DELAY_BUCKET_APPLICATION_FOR_TESTING",
"ARTIFICIALLY_SLEEP_MAIN_THREAD_FOR_TESTING",
"ARTIFICIALLY_SKIP_CONNECTION_ADJUSTMENT_FOR_TESTING"};
@@ -141,6 +142,8 @@ Config::Config() : NODE_SEED(SecretKey::random())
LOADGEN_INSTRUCTIONS_FOR_TESTING = {};
LOADGEN_INSTRUCTIONS_DISTRIBUTION_FOR_TESTING = {};
CATCHUP_WAIT_MERGES_TX_APPLY_FOR_TESTING = false;
+ ARTIFICIALLY_SET_SURVEY_PHASE_DURATION_FOR_TESTING =
+ std::chrono::minutes::zero();
ARTIFICIALLY_SLEEP_MAIN_THREAD_FOR_TESTING =
std::chrono::microseconds::zero();
@@ -149,7 +152,7 @@ Config::Config() : NODE_SEED(SecretKey::random())
LEDGER_PROTOCOL_MIN_VERSION_INTERNAL_ERROR_REPORT = 18;
OVERLAY_PROTOCOL_MIN_VERSION = 32;
- OVERLAY_PROTOCOL_VERSION = 33;
+ OVERLAY_PROTOCOL_VERSION = 34;
VERSION_STR = STELLAR_CORE_VERSION;
@@ -1548,6 +1551,12 @@ Config::processConfig(std::shared_ptr t)
{
CATCHUP_WAIT_MERGES_TX_APPLY_FOR_TESTING = readBool(item);
}
+ else if (item.first ==
+ "ARTIFICIALLY_SET_SURVEY_PHASE_DURATION_FOR_TESTING")
+ {
+ ARTIFICIALLY_SET_SURVEY_PHASE_DURATION_FOR_TESTING =
+ std::chrono::minutes(readInt(item));
+ }
else if (item.first == "HISTOGRAM_WINDOW_SIZE")
{
auto const s = readInt(item);
diff --git a/src/main/Config.h b/src/main/Config.h
index f114ef8208..94330c4cd4 100644
--- a/src/main/Config.h
+++ b/src/main/Config.h
@@ -268,6 +268,12 @@ class Config : public std::enable_shared_from_this
// Waits for merges to complete before applying transactions during catchup
bool CATCHUP_WAIT_MERGES_TX_APPLY_FOR_TESTING;
+ // Overrides the maximum survey phase duration for both the collecting and
+ // reporting phase to the specified value. Performs no override if set to 0.
+ // Do not use in production. This option is ignored in builds without tests
+ // enabled.
+ std::chrono::minutes ARTIFICIALLY_SET_SURVEY_PHASE_DURATION_FOR_TESTING;
+
// A config parameter that controls how many messages from a particular peer
// core can process simultaneously. If core is at capacity, it temporarily
// stops reading from a peer until it completes processing of at least one
diff --git a/src/overlay/Floodgate.cpp b/src/overlay/Floodgate.cpp
index cab199d17a..ecdeb5c045 100644
--- a/src/overlay/Floodgate.cpp
+++ b/src/overlay/Floodgate.cpp
@@ -84,7 +84,8 @@ Floodgate::addRecord(StellarMessage const& msg, Peer::pointer peer, Hash& index)
// send message to anyone you haven't gotten it from
bool
Floodgate::broadcast(std::shared_ptr msg,
- std::optional const& hash)
+ std::optional const& hash,
+ uint32_t minOverlayVersion)
{
ZoneScoped;
if (mShuttingDown)
@@ -121,6 +122,14 @@ Floodgate::broadcast(std::shared_ptr msg,
for (auto peer : peers)
{
releaseAssert(peer.second->isAuthenticated());
+
+ if (peer.second->getRemoteOverlayVersion().value() < minOverlayVersion)
+ {
+ // Skip peers running overlay versions that are older than
+ // `minOverlayVersion`.
+ continue;
+ }
+
bool pullMode = msg->type() == TRANSACTION;
if (peersTold.insert(peer.second->toString()).second)
diff --git a/src/overlay/Floodgate.h b/src/overlay/Floodgate.h
index 5eb5cbda77..17763b3cb2 100644
--- a/src/overlay/Floodgate.h
+++ b/src/overlay/Floodgate.h
@@ -61,8 +61,11 @@ class Floodgate
// returns true if msg was sent to at least one peer
// The hash required for transactions
+ // `minOverlayVersion` is the minimum overlay version a peer must have in
+ // order to be sent the message.
bool broadcast(std::shared_ptr msg,
- std::optional const& hash = std::nullopt);
+ std::optional const& hash = std::nullopt,
+ uint32_t minOverlayVersion = 0);
// returns the list of peers that sent us the item with hash `msgID`
// NB: `msgID` is the hash of a `StellarMessage`
diff --git a/src/overlay/OverlayManager.h b/src/overlay/OverlayManager.h
index a03f09aea0..8a19af7fc1 100644
--- a/src/overlay/OverlayManager.h
+++ b/src/overlay/OverlayManager.h
@@ -77,9 +77,11 @@ class OverlayManager
// returns true if message was sent to at least one peer
// When passing a transaction message,
// the hash of TransactionEnvelope must be passed also for pull mode.
- virtual bool
- broadcastMessage(std::shared_ptr msg,
- std::optional const hash = std::nullopt) = 0;
+ // `minOverlayVersion` is the minimum overlay version a peer must have in
+ // order to be sent the message.
+ virtual bool broadcastMessage(std::shared_ptr msg,
+ std::optional const hash = std::nullopt,
+ uint32_t minOverlayVersion = 0) = 0;
// Make a note in the FloodGate that a given peer has provided us with a
// given broadcast message, so that it is inhibited from being resent to
diff --git a/src/overlay/OverlayManagerImpl.cpp b/src/overlay/OverlayManagerImpl.cpp
index 9d0d47d524..824aeeb69e 100644
--- a/src/overlay/OverlayManagerImpl.cpp
+++ b/src/overlay/OverlayManagerImpl.cpp
@@ -18,6 +18,7 @@
#include "overlay/PeerBareAddress.h"
#include "overlay/PeerManager.h"
#include "overlay/RandomPeerSource.h"
+#include "overlay/SurveyDataManager.h"
#include "overlay/TCPPeer.h"
#include "overlay/TxDemandsManager.h"
#include "util/GlobalChecks.h"
@@ -74,7 +75,7 @@ OverlayManagerImpl::PeersList::PeersList(
OverlayManagerImpl& overlayManager,
medida::MetricsRegistry& metricsRegistry,
std::string const& directionString, std::string const& cancelledName,
- int maxAuthenticatedCount)
+ int maxAuthenticatedCount, std::shared_ptr sm)
: mConnectionsAttempted(metricsRegistry.NewMeter(
{"overlay", directionString, "attempt"}, "connection"))
, mConnectionsEstablished(metricsRegistry.NewMeter(
@@ -86,6 +87,7 @@ OverlayManagerImpl::PeersList::PeersList(
, mOverlayManager(overlayManager)
, mDirectionString(directionString)
, mMaxAuthenticatedCount(maxAuthenticatedCount)
+ , mSurveyManager(sm)
{
}
@@ -141,6 +143,7 @@ OverlayManagerImpl::PeersList::removePeer(Peer* peer)
mDirectionString, peer->toString());
mAuthenticated.erase(authentiatedIt);
mConnectionsDropped.Mark();
+ mSurveyManager->recordDroppedPeer(*peer);
return;
}
@@ -184,6 +187,10 @@ OverlayManagerImpl::PeersList::moveToAuthenticated(Peer::pointer peer)
CLOG_INFO(Overlay, "Connected to {}", peer->toString());
+ mSurveyManager->modifyNodeData([&](CollectingNodeData& nodeData) {
+ ++nodeData.mAddedAuthenticatedPeers;
+ });
+
return true;
}
@@ -283,10 +290,6 @@ OverlayManager::create(Application& app)
OverlayManagerImpl::OverlayManagerImpl(Application& app)
: mApp(app)
- , mInboundPeers(*this, mApp.getMetrics(), "inbound", "reject",
- mApp.getConfig().MAX_ADDITIONAL_PEER_CONNECTIONS)
- , mOutboundPeers(*this, mApp.getMetrics(), "outbound", "cancel",
- mApp.getConfig().TARGET_PEER_CONNECTIONS)
, mLiveInboundPeersCounter(make_shared(0))
, mPeerManager(app)
, mDoor(mApp)
@@ -299,6 +302,11 @@ OverlayManagerImpl::OverlayManagerImpl(Application& app)
, mFloodGate(app)
, mTxDemandsManager(app)
, mSurveyManager(make_shared(app))
+ , mInboundPeers(*this, mApp.getMetrics(), "inbound", "reject",
+ mApp.getConfig().MAX_ADDITIONAL_PEER_CONNECTIONS,
+ mSurveyManager)
+ , mOutboundPeers(*this, mApp.getMetrics(), "outbound", "cancel",
+ mApp.getConfig().TARGET_PEER_CONNECTIONS, mSurveyManager)
, mResolvingPeersWithBackoff(true)
, mResolvingPeersRetryCount(0)
{
@@ -703,6 +711,11 @@ OverlayManagerImpl::tick()
VirtualTimer::onFailureNoop);
}
+ // Check and update the overlay survey state
+ mSurveyManager->updateSurveyPhase(getInboundAuthenticatedPeers(),
+ getOutboundAuthenticatedPeers(),
+ mApp.getConfig());
+
auto availablePendingSlots = availableOutboundPendingSlots();
if (availablePendingSlots == 0)
{
@@ -1202,10 +1215,11 @@ OverlayManagerImpl::recvTxDemand(FloodDemand const& dmd, Peer::pointer peer)
bool
OverlayManagerImpl::broadcastMessage(std::shared_ptr msg,
- std::optional const hash)
+ std::optional const hash,
+ uint32_t minOverlayVersion)
{
ZoneScoped;
- auto res = mFloodGate.broadcast(msg, hash);
+ auto res = mFloodGate.broadcast(msg, hash, minOverlayVersion);
if (res)
{
mOverlayMetrics.mMessagesBroadcast.Mark();
@@ -1292,7 +1306,11 @@ OverlayManagerImpl::recordMessageMetric(StellarMessage const& stellarMsg,
bool flood = false;
if (isFloodMessage(stellarMsg) || stellarMsg.type() == SURVEY_REQUEST ||
- stellarMsg.type() == SURVEY_RESPONSE)
+ stellarMsg.type() == SURVEY_RESPONSE ||
+ stellarMsg.type() == TIME_SLICED_SURVEY_START_COLLECTING ||
+ stellarMsg.type() == TIME_SLICED_SURVEY_STOP_COLLECTING ||
+ stellarMsg.type() == TIME_SLICED_SURVEY_REQUEST ||
+ stellarMsg.type() == TIME_SLICED_SURVEY_RESPONSE)
{
flood = true;
}
diff --git a/src/overlay/OverlayManagerImpl.h b/src/overlay/OverlayManagerImpl.h
index fcddad728a..dab695a12a 100644
--- a/src/overlay/OverlayManagerImpl.h
+++ b/src/overlay/OverlayManagerImpl.h
@@ -48,7 +48,8 @@ class OverlayManagerImpl : public OverlayManager
medida::MetricsRegistry& metricsRegistry,
std::string const& directionString,
std::string const& cancelledName,
- int maxAuthenticatedCount);
+ int maxAuthenticatedCount,
+ std::shared_ptr sm);
medida::Meter& mConnectionsAttempted;
medida::Meter& mConnectionsEstablished;
@@ -58,6 +59,7 @@ class OverlayManagerImpl : public OverlayManager
OverlayManagerImpl& mOverlayManager;
std::string mDirectionString;
size_t mMaxAuthenticatedCount;
+ std::shared_ptr mSurveyManager;
std::vector mPending;
std::map mAuthenticated;
@@ -69,9 +71,6 @@ class OverlayManagerImpl : public OverlayManager
void shutdown();
};
- PeersList mInboundPeers;
- PeersList mOutboundPeers;
-
std::shared_ptr mLiveInboundPeersCounter;
PeersList& getPeersList(Peer* peer);
@@ -99,6 +98,8 @@ class OverlayManagerImpl : public OverlayManager
std::shared_ptr mSurveyManager;
+ PeersList mInboundPeers;
+ PeersList mOutboundPeers;
int availableOutboundPendingSlots() const;
public:
@@ -112,9 +113,9 @@ class OverlayManagerImpl : public OverlayManager
Peer::pointer peer) override;
void forgetFloodedMsg(Hash const& msgID) override;
void recvTxDemand(FloodDemand const& dmd, Peer::pointer peer) override;
- bool
- broadcastMessage(std::shared_ptr msg,
- std::optional const hash = std::nullopt) override;
+ bool broadcastMessage(std::shared_ptr msg,
+ std::optional const hash = std::nullopt,
+ uint32_t minOverlayVersion = 0) override;
void connectTo(PeerBareAddress const& address) override;
void maybeAddInboundConnection(Peer::pointer peer) override;
diff --git a/src/overlay/OverlayMetrics.cpp b/src/overlay/OverlayMetrics.cpp
index 2f9a2e9b1b..6125acae76 100644
--- a/src/overlay/OverlayMetrics.cpp
+++ b/src/overlay/OverlayMetrics.cpp
@@ -75,6 +75,10 @@ OverlayMetrics::OverlayMetrics(Application& app)
app.getMetrics().NewTimer({"overlay", "recv", "survey-request"}))
, mRecvSurveyResponseTimer(
app.getMetrics().NewTimer({"overlay", "recv", "survey-response"}))
+ , mRecvStartSurveyCollectingTimer(app.getMetrics().NewTimer(
+ {"overlay", "recv", "start-survey-collecting"}))
+ , mRecvStopSurveyCollectingTimer(app.getMetrics().NewTimer(
+ {"overlay", "recv", "stop-survey-collecting"}))
, mRecvFloodAdvertTimer(
app.getMetrics().NewTimer({"overlay", "recv", "flood-advert"}))
@@ -134,6 +138,10 @@ OverlayMetrics::OverlayMetrics(Application& app)
{"overlay", "send", "survey-request"}, "message"))
, mSendSurveyResponseMeter(app.getMetrics().NewMeter(
{"overlay", "send", "survey-response"}, "message"))
+ , mSendStartSurveyCollectingMeter(app.getMetrics().NewMeter(
+ {"overlay", "send", "start-survey-collecting"}, "message"))
+ , mSendStopSurveyCollectingMeter(app.getMetrics().NewMeter(
+ {"overlay", "send", "stop-survey-collecting"}, "message"))
, mSendFloodAdvertMeter(app.getMetrics().NewMeter(
{"overlay", "send", "flood-advert"}, "message"))
, mSendFloodDemandMeter(app.getMetrics().NewMeter(
diff --git a/src/overlay/OverlayMetrics.h b/src/overlay/OverlayMetrics.h
index 591044bf9e..22bcf2e8f4 100644
--- a/src/overlay/OverlayMetrics.h
+++ b/src/overlay/OverlayMetrics.h
@@ -61,6 +61,8 @@ struct OverlayMetrics
medida::Timer& mRecvSurveyRequestTimer;
medida::Timer& mRecvSurveyResponseTimer;
+ medida::Timer& mRecvStartSurveyCollectingTimer;
+ medida::Timer& mRecvStopSurveyCollectingTimer;
medida::Timer& mRecvFloodAdvertTimer;
medida::Timer& mRecvFloodDemandTimer;
@@ -94,6 +96,8 @@ struct OverlayMetrics
medida::Meter& mSendSurveyRequestMeter;
medida::Meter& mSendSurveyResponseMeter;
+ medida::Meter& mSendStartSurveyCollectingMeter;
+ medida::Meter& mSendStopSurveyCollectingMeter;
medida::Meter& mSendFloodAdvertMeter;
medida::Meter& mSendFloodDemandMeter;
diff --git a/src/overlay/Peer.cpp b/src/overlay/Peer.cpp
index d388fc2d2c..210e9181ba 100644
--- a/src/overlay/Peer.cpp
+++ b/src/overlay/Peer.cpp
@@ -21,6 +21,7 @@
#include "overlay/OverlayMetrics.h"
#include "overlay/PeerAuth.h"
#include "overlay/PeerManager.h"
+#include "overlay/SurveyDataManager.h"
#include "overlay/SurveyManager.h"
#include "overlay/TxAdverts.h"
#include "util/GlobalChecks.h"
@@ -608,6 +609,10 @@ Peer::msgSummary(StellarMessage const& msg)
case SURVEY_REQUEST:
case SURVEY_RESPONSE:
+ case TIME_SLICED_SURVEY_REQUEST:
+ case TIME_SLICED_SURVEY_RESPONSE:
+ case TIME_SLICED_SURVEY_START_COLLECTING:
+ case TIME_SLICED_SURVEY_STOP_COLLECTING:
return SurveyManager::getMsgSummary(msg);
case SEND_MORE:
return "SENDMORE";
@@ -671,11 +676,19 @@ Peer::sendMessage(std::shared_ptr msg, bool log)
mOverlayMetrics.mSendGetSCPStateMeter.Mark();
break;
case SURVEY_REQUEST:
+ case TIME_SLICED_SURVEY_REQUEST:
mOverlayMetrics.mSendSurveyRequestMeter.Mark();
break;
case SURVEY_RESPONSE:
+ case TIME_SLICED_SURVEY_RESPONSE:
mOverlayMetrics.mSendSurveyResponseMeter.Mark();
break;
+ case TIME_SLICED_SURVEY_START_COLLECTING:
+ mOverlayMetrics.mSendStartSurveyCollectingMeter.Mark();
+ break;
+ case TIME_SLICED_SURVEY_STOP_COLLECTING:
+ mOverlayMetrics.mSendStopSurveyCollectingMeter.Mark();
+ break;
case SEND_MORE:
case SEND_MORE_EXTENDED:
mOverlayMetrics.mSendSendMoreMeter.Mark();
@@ -956,6 +969,7 @@ Peer::recvRawMessage(StellarMessage const& stellarMsg)
break;
case SURVEY_REQUEST:
+ case TIME_SLICED_SURVEY_REQUEST:
{
auto t = mOverlayMetrics.mRecvSurveyRequestTimer.TimeScope();
recvSurveyRequestMessage(stellarMsg);
@@ -963,12 +977,27 @@ Peer::recvRawMessage(StellarMessage const& stellarMsg)
break;
case SURVEY_RESPONSE:
+ case TIME_SLICED_SURVEY_RESPONSE:
{
auto t = mOverlayMetrics.mRecvSurveyResponseTimer.TimeScope();
recvSurveyResponseMessage(stellarMsg);
}
break;
+ case TIME_SLICED_SURVEY_START_COLLECTING:
+ {
+ auto t = mOverlayMetrics.mRecvStartSurveyCollectingTimer.TimeScope();
+ recvSurveyStartCollectingMessage(stellarMsg);
+ }
+ break;
+
+ case TIME_SLICED_SURVEY_STOP_COLLECTING:
+ {
+ auto t = mOverlayMetrics.mRecvStopSurveyCollectingTimer.TimeScope();
+ recvSurveyStopCollectingMessage(stellarMsg);
+ }
+ break;
+
case GET_TX_SET:
{
auto t = mOverlayMetrics.mRecvGetTxSetTimer.TimeScope();
@@ -1162,6 +1191,10 @@ Peer::maybeProcessPingResponse(Hash const& id)
CLOG_DEBUG(Overlay, "Latency {}: {} ms", toString(),
mLastPing.count());
mOverlayMetrics.mConnectionLatencyTimer.Update(mLastPing);
+ mAppConnector.getOverlayManager().getSurveyManager().modifyPeerData(
+ *this, [&](CollectingPeerData& peerData) {
+ peerData.mLatencyMsHistogram.Update(mLastPing.count());
+ });
}
}
}
@@ -1660,6 +1693,24 @@ Peer::recvSurveyResponseMessage(StellarMessage const& msg)
msg, shared_from_this());
}
+void
+Peer::recvSurveyStartCollectingMessage(StellarMessage const& msg)
+{
+ ZoneScoped;
+ mAppConnector.getOverlayManager()
+ .getSurveyManager()
+ .relayStartSurveyCollecting(msg, shared_from_this());
+}
+
+void
+Peer::recvSurveyStopCollectingMessage(StellarMessage const& msg)
+{
+ ZoneScoped;
+ mAppConnector.getOverlayManager()
+ .getSurveyManager()
+ .relayStopSurveyCollecting(msg, shared_from_this());
+}
+
void
Peer::recvFloodAdvert(StellarMessage const& msg)
{
diff --git a/src/overlay/Peer.h b/src/overlay/Peer.h
index 1857b002e8..9d6defe9e9 100644
--- a/src/overlay/Peer.h
+++ b/src/overlay/Peer.h
@@ -247,6 +247,8 @@ class Peer : public std::enable_shared_from_this,
void recvPeers(StellarMessage const& msg);
void recvSurveyRequestMessage(StellarMessage const& msg);
void recvSurveyResponseMessage(StellarMessage const& msg);
+ void recvSurveyStartCollectingMessage(StellarMessage const& msg);
+ void recvSurveyStopCollectingMessage(StellarMessage const& msg);
void recvSendMore(StellarMessage const& msg);
void recvGetTxSet(StellarMessage const& msg);
@@ -369,7 +371,7 @@ class Peer : public std::enable_shared_from_this,
}
NodeID
- getPeerID()
+ getPeerID() const
{
return mPeerID;
}
@@ -380,6 +382,12 @@ class Peer : public std::enable_shared_from_this,
return mPeerMetrics;
}
+ PeerMetrics const&
+ getPeerMetrics() const
+ {
+ return mPeerMetrics;
+ }
+
std::string const& toString();
virtual void drop(std::string const& reason, DropDirection dropDirection,
diff --git a/src/overlay/SurveyDataManager.cpp b/src/overlay/SurveyDataManager.cpp
new file mode 100644
index 0000000000..e0ef62d2a8
--- /dev/null
+++ b/src/overlay/SurveyDataManager.cpp
@@ -0,0 +1,534 @@
+// Copyright 2024 Stellar Development Foundation and contributors. Licensed
+// under the Apache License, Version 2.0. See the COPYING file at the root
+// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0
+
+#include "overlay/SurveyDataManager.h"
+
+#include "crypto/SecretKey.h"
+#include "overlay/Peer.h"
+#include "util/Logging.h"
+
+#include
+#include
+
+using namespace std::chrono_literals;
+
+namespace stellar
+{
+namespace
+{
+// Collecting phase is limited to 30 minutes. If 30 minutes pass without
+// receiving a StopSurveyCollecting message the `SurveyDataManager` will
+// automatically transition to the reporting phase.
+constexpr std::chrono::minutes COLLECTING_PHASE_MAX_DURATION{30};
+
+// Reporting phase is limited to 3 hours, after which the
+// `SurveyDataManager` will reset all data and transition to the `INACTIVE`
+// phase.
+constexpr std::chrono::hours REPORTING_PHASE_MAX_DURATION{3};
+
+// Fill a TimeSlicedPeerDataList with elements from `peerData` starting from
+// index `idx` and respecting the max size of the TimeSlicedPeerDataList.
+TimeSlicedPeerDataList
+fillTimeSlicedPeerDataList(std::vector const& peerData,
+ size_t idx)
+{
+ TimeSlicedPeerDataList result;
+ if (idx >= peerData.size())
+ {
+ CLOG_DEBUG(Overlay,
+ "fillTimeSlicedPeerDataList: Received request for peer data "
+ "starting from index {}, but the peers list contains only "
+ "{} peers.",
+ idx, peerData.size());
+ return result;
+ }
+ size_t maxEnd = std::min(peerData.size(), idx + result.max_size());
+ result.insert(result.end(), peerData.begin() + idx,
+ peerData.begin() + maxEnd);
+ return result;
+}
+
+// Initialize a map of peer data with the initial metrics from `peers`
+void
+initializeCollectingPeerData(
+ std::map const& peers,
+ std::unordered_map& peerData)
+{
+ releaseAssert(peerData.empty());
+ for (auto const& [id, peer] : peers)
+ {
+ // Copy initial peer metrics
+ peerData.try_emplace(id, peer->getPeerMetrics());
+ }
+}
+
+} // namespace
+
+CollectingNodeData::CollectingNodeData(uint64_t initialLostSyncCount,
+ Application::State initialState)
+ : mSCPFirstToSelfLatencyNsHistogram(
+ medida::SamplingInterface::SampleType::kSliding)
+ , mSCPSelfToOtherLatencyNsHistogram(
+ medida::SamplingInterface::SampleType::kSliding)
+ , mInitialLostSyncCount(initialLostSyncCount)
+ , mInitialState(initialState)
+{
+}
+
+CollectingPeerData::CollectingPeerData(Peer::PeerMetrics const& peerMetrics)
+ : mInitialMessageRead(peerMetrics.mMessageRead)
+ , mInitialMessageWrite(peerMetrics.mMessageWrite)
+ , mInitialByteRead(peerMetrics.mByteRead)
+ , mInitialByteWrite(peerMetrics.mByteWrite)
+ , mInitialUniqueFloodBytesRecv(peerMetrics.mUniqueFloodBytesRecv)
+ , mInitialDuplicateFloodBytesRecv(peerMetrics.mDuplicateFloodBytesRecv)
+ , mInitialUniqueFetchBytesRecv(peerMetrics.mUniqueFetchBytesRecv)
+ , mInitialDuplicateFetchBytesRecv(peerMetrics.mDuplicateFetchBytesRecv)
+ , mInitialUniqueFloodMessageRecv(peerMetrics.mUniqueFloodMessageRecv)
+ , mInitialDuplicateFloodMessageRecv(peerMetrics.mDuplicateFloodMessageRecv)
+ , mInitialUniqueFetchMessageRecv(peerMetrics.mUniqueFetchMessageRecv)
+ , mInitialDuplicateFetchMessageRecv(peerMetrics.mDuplicateFetchMessageRecv)
+ , mLatencyMsHistogram(medida::SamplingInterface::SampleType::kSliding)
+{
+}
+
+SurveyDataManager::SurveyDataManager(
+ std::function const& getNow,
+ medida::Meter const& lostSyncMeter, Config const& cfg)
+ : mGetNow(getNow), mLostSyncMeter(lostSyncMeter)
+{
+#ifdef BUILD_TESTS
+ // Override phase durations if set in the config and this build has tests
+ // enabled
+ std::chrono::minutes maxPhaseDuration =
+ cfg.ARTIFICIALLY_SET_SURVEY_PHASE_DURATION_FOR_TESTING;
+ if (maxPhaseDuration != 0min)
+ {
+ setPhaseMaxDurationsForTesting(maxPhaseDuration);
+ }
+#endif
+}
+
+bool
+SurveyDataManager::startSurveyCollecting(
+ TimeSlicedSurveyStartCollectingMessage const& msg,
+ std::map const& inboundPeers,
+ std::map const& outboundPeers,
+ Application::State const initialState)
+{
+ ZoneScoped;
+
+ if (mPhase == SurveyPhase::INACTIVE)
+ {
+ CLOG_TRACE(Overlay, "Starting survey collecting with nonce {}",
+ msg.nonce);
+ mPhase = SurveyPhase::COLLECTING;
+ mCollectStartTime = mGetNow();
+ mNonce = msg.nonce;
+ mSurveyor = msg.surveyorID;
+ mCollectingNodeData.emplace(mLostSyncMeter.count(), initialState);
+ if (mCollectingInboundPeerData.empty() &&
+ mCollectingOutboundPeerData.empty())
+ {
+ initializeCollectingPeerData(inboundPeers,
+ mCollectingInboundPeerData);
+ initializeCollectingPeerData(outboundPeers,
+ mCollectingOutboundPeerData);
+ return true;
+ }
+
+ emitInconsistencyError("startSurveyCollecting");
+ return false;
+ }
+
+ CLOG_TRACE(Overlay,
+ "Ignoring request to start survey collecting with nonce {} "
+ "because there is already an active survey",
+ msg.nonce);
+ return false;
+}
+
+bool
+SurveyDataManager::startReportingPhase(
+ std::map const& inboundPeers,
+ std::map const& outboundPeers, Config const& config)
+{
+ if (mPhase != SurveyPhase::COLLECTING || !mFinalInboundPeerData.empty() ||
+ !mFinalOutboundPeerData.empty())
+ {
+ emitInconsistencyError("startReportingPhase");
+ return false;
+ }
+
+ mPhase = SurveyPhase::REPORTING;
+ mCollectEndTime = mGetNow();
+
+ // Finalize peer and node data
+ finalizePeerData(inboundPeers, mCollectingInboundPeerData,
+ mFinalInboundPeerData);
+ finalizePeerData(outboundPeers, mCollectingOutboundPeerData,
+ mFinalOutboundPeerData);
+ finalizeNodeData(config);
+
+ // Clear collecting data
+ mCollectingInboundPeerData.clear();
+ mCollectingOutboundPeerData.clear();
+
+ return true;
+}
+
+bool
+SurveyDataManager::stopSurveyCollecting(
+ TimeSlicedSurveyStopCollectingMessage const& msg,
+ std::map const& inboundPeers,
+ std::map const& outboundPeers, Config const& config)
+{
+ ZoneScoped;
+
+ uint32_t const nonce = msg.nonce;
+ if (mPhase == SurveyPhase::COLLECTING && mNonce == nonce &&
+ mSurveyor == msg.surveyorID)
+ {
+ CLOG_TRACE(Overlay, "Stopping survey collecting with nonce {}", nonce);
+ return startReportingPhase(inboundPeers, outboundPeers, config);
+ }
+ CLOG_TRACE(Overlay,
+ "Ignoring request to stop survey collecting with nonce {} "
+ "because there is no active survey or the nonce does not "
+ "match the active survey's nonce",
+ nonce);
+ return false;
+}
+
+void
+SurveyDataManager::modifyNodeData(std::function f)
+{
+ ZoneScoped;
+
+ if (mPhase == SurveyPhase::COLLECTING)
+ {
+ if (mCollectingNodeData.has_value())
+ {
+ f(mCollectingNodeData.value());
+ }
+ else
+ {
+ emitInconsistencyError("modifyNodeData");
+ }
+ }
+}
+
+void
+SurveyDataManager::modifyPeerData(Peer const& peer,
+ std::function f)
+{
+ ZoneScoped;
+
+ if (mPhase == SurveyPhase::COLLECTING)
+ {
+ auto it = mCollectingInboundPeerData.find(peer.getPeerID());
+ if (it != mCollectingInboundPeerData.end())
+ {
+ f(it->second);
+ return;
+ }
+
+ it = mCollectingOutboundPeerData.find(peer.getPeerID());
+ if (it != mCollectingOutboundPeerData.end())
+ {
+ f(it->second);
+ }
+ }
+}
+
+void
+SurveyDataManager::recordDroppedPeer(Peer const& peer)
+{
+ ZoneScoped;
+
+ if (mPhase == SurveyPhase::COLLECTING)
+ {
+ if (mCollectingInboundPeerData.erase(peer.getPeerID()) == 0)
+ {
+ mCollectingOutboundPeerData.erase(peer.getPeerID());
+ }
+
+ if (mCollectingNodeData.has_value())
+ {
+ ++mCollectingNodeData.value().mDroppedAuthenticatedPeers;
+ }
+ else
+ {
+ emitInconsistencyError("recordDroppedPeer");
+ }
+ }
+}
+
+std::optional
+SurveyDataManager::getNonce() const
+{
+ return mNonce;
+}
+
+bool
+SurveyDataManager::nonceIsReporting(uint32_t nonce) const
+{
+ return mPhase == SurveyPhase::REPORTING && mNonce == nonce;
+}
+
+bool
+SurveyDataManager::fillSurveyData(TimeSlicedSurveyRequestMessage const& request,
+ TopologyResponseBodyV2& response)
+{
+ ZoneScoped;
+
+ if (mPhase == SurveyPhase::REPORTING && mNonce == request.nonce &&
+ mSurveyor == request.request.surveyorPeerID)
+ {
+ if (!mFinalNodeData.has_value())
+ {
+ emitInconsistencyError("getSurveyData");
+ return false;
+ }
+
+ response.nodeData = mFinalNodeData.value();
+ response.inboundPeers = fillTimeSlicedPeerDataList(
+ mFinalInboundPeerData,
+ static_cast(request.inboundPeersIndex));
+ response.outboundPeers = fillTimeSlicedPeerDataList(
+ mFinalOutboundPeerData,
+ static_cast(request.outboundPeersIndex));
+ return true;
+ }
+ return false;
+}
+
+bool
+SurveyDataManager::surveyIsActive() const
+{
+ return mPhase != SurveyPhase::INACTIVE;
+}
+
+#ifdef BUILD_TESTS
+void
+SurveyDataManager::setPhaseMaxDurationsForTesting(
+ std::chrono::minutes maxPhaseDuration)
+{
+ mMaxPhaseDurationForTesting = maxPhaseDuration;
+}
+#endif
+
+void
+SurveyDataManager::updateSurveyPhase(
+ std::map const& inboundPeers,
+ std::map const& outboundPeers, Config const& config)
+{
+ switch (mPhase)
+ {
+ case SurveyPhase::COLLECTING:
+ if (!mCollectStartTime.has_value() || mCollectEndTime.has_value())
+ {
+ emitInconsistencyError("updateSurveyPhase");
+ return;
+ }
+ if (mGetNow() >
+ mCollectStartTime.value() + getCollectingPhaseMaxDuration())
+ {
+ CLOG_TRACE(Overlay, "Survey collecting phase has expired. "
+ "Advancing to reporting phase.");
+ startReportingPhase(inboundPeers, outboundPeers, config);
+ }
+ break;
+ case SurveyPhase::REPORTING:
+ if (!mCollectStartTime.has_value() || !mCollectEndTime.has_value())
+ {
+ emitInconsistencyError("updateSurveyPhase");
+ return;
+ }
+ if (mGetNow() >
+ mCollectEndTime.value() + getReportingPhaseMaxDuration())
+ {
+ CLOG_TRACE(
+ Overlay,
+ "Survey reporting phase has expired. Resetting survey data.");
+ reset();
+ }
+ break;
+ case SurveyPhase::INACTIVE:
+ if (mCollectStartTime.has_value() || mCollectEndTime.has_value())
+ {
+ emitInconsistencyError("updateSurveyPhase");
+ return;
+ }
+ // Nothing to do
+ break;
+ }
+}
+
+void
+SurveyDataManager::reset()
+{
+ mPhase = SurveyPhase::INACTIVE;
+ mCollectStartTime.reset();
+ mCollectEndTime.reset();
+ mNonce.reset();
+ mSurveyor.reset();
+ mCollectingNodeData.reset();
+ mCollectingInboundPeerData.clear();
+ mCollectingOutboundPeerData.clear();
+ mFinalNodeData.reset();
+ mFinalInboundPeerData.clear();
+ mFinalOutboundPeerData.clear();
+}
+
+void
+SurveyDataManager::emitInconsistencyError(std::string const& where)
+{
+#ifdef BUILD_TESTS
+ // Throw an exception when testing to make the error more visible
+ throw std::runtime_error("Encountered inconsistent survey data while "
+ "executing `" +
+ where + "`.");
+#endif
+ CLOG_ERROR(Overlay,
+ "Encountered inconsistent survey data while executing "
+ "`{}`. Resetting survey state.",
+ where);
+ reset();
+}
+
+void
+SurveyDataManager::finalizeNodeData(Config const& config)
+{
+ if (mFinalNodeData.has_value() || !mCollectingNodeData.has_value())
+ {
+ emitInconsistencyError("finalizeNodeData");
+ return;
+ }
+
+ // Fill in node data
+ mFinalNodeData.emplace();
+ mFinalNodeData->addedAuthenticatedPeers =
+ mCollectingNodeData->mAddedAuthenticatedPeers;
+ mFinalNodeData->droppedAuthenticatedPeers =
+ mCollectingNodeData->mDroppedAuthenticatedPeers;
+ mFinalNodeData->totalInboundPeerCount =
+ static_cast(mFinalInboundPeerData.size());
+ mFinalNodeData->totalOutboundPeerCount =
+ static_cast(mFinalOutboundPeerData.size());
+ mFinalNodeData->p75SCPFirstToSelfLatencyNs =
+ mCollectingNodeData->mSCPFirstToSelfLatencyNsHistogram.GetSnapshot()
+ .get75thPercentile();
+ mFinalNodeData->p75SCPSelfToOtherLatencyNs =
+ mCollectingNodeData->mSCPSelfToOtherLatencyNsHistogram.GetSnapshot()
+ .get75thPercentile();
+ mFinalNodeData->lostSyncCount = static_cast(
+ mLostSyncMeter.count() - mCollectingNodeData->mInitialLostSyncCount);
+ switch (mCollectingNodeData->mInitialState)
+ {
+ case Application::APP_ACQUIRING_CONSENSUS_STATE:
+ case Application::APP_CATCHING_UP_STATE:
+ // Node was out-of-sync at the start of the survey
+ ++mFinalNodeData->lostSyncCount;
+ break;
+ default:
+ break;
+ }
+ mFinalNodeData->isValidator = config.NODE_IS_VALIDATOR;
+ mFinalNodeData->maxInboundPeerCount =
+ config.MAX_ADDITIONAL_PEER_CONNECTIONS;
+ mFinalNodeData->maxOutboundPeerCount = config.TARGET_PEER_CONNECTIONS;
+
+ // Clear collecting data
+ mCollectingNodeData.reset();
+}
+
+void
+SurveyDataManager::finalizePeerData(
+ std::map const peers,
+ std::unordered_map const& collectingPeerData,
+ std::vector& finalPeerData)
+{
+ for (auto const& [id, peer] : peers)
+ {
+ auto const it = collectingPeerData.find(id);
+ if (it != collectingPeerData.end())
+ {
+ CollectingPeerData const& collectingData = it->second;
+ Peer::PeerMetrics const& peerMetrics = peer->getPeerMetrics();
+
+ TimeSlicedPeerData& finalData = finalPeerData.emplace_back();
+ PeerStats& finalStats = finalData.peerStats;
+
+ finalStats.id = id;
+ finalStats.versionStr = peer->getRemoteVersion();
+ finalStats.messagesRead =
+ peerMetrics.mMessageRead - collectingData.mInitialMessageRead;
+ finalStats.messagesWritten =
+ peerMetrics.mMessageWrite - collectingData.mInitialMessageWrite;
+ finalStats.bytesRead =
+ peerMetrics.mByteRead - collectingData.mInitialByteRead;
+ finalStats.bytesWritten =
+ peerMetrics.mByteWrite - collectingData.mInitialByteWrite;
+ finalStats.secondsConnected = static_cast(
+ std::chrono::duration_cast(
+ mGetNow() - peerMetrics.mConnectedTime)
+ .count());
+ finalStats.uniqueFloodBytesRecv =
+ peerMetrics.mUniqueFloodBytesRecv -
+ collectingData.mInitialUniqueFloodBytesRecv;
+ finalStats.duplicateFloodBytesRecv =
+ peerMetrics.mDuplicateFloodBytesRecv -
+ collectingData.mInitialDuplicateFloodBytesRecv;
+ finalStats.uniqueFetchBytesRecv =
+ peerMetrics.mUniqueFetchBytesRecv -
+ collectingData.mInitialUniqueFetchBytesRecv;
+ finalStats.duplicateFetchBytesRecv =
+ peerMetrics.mDuplicateFetchBytesRecv -
+ collectingData.mInitialDuplicateFetchBytesRecv;
+ finalStats.uniqueFloodMessageRecv =
+ peerMetrics.mUniqueFloodMessageRecv -
+ collectingData.mInitialUniqueFloodMessageRecv;
+ finalStats.duplicateFloodMessageRecv =
+ peerMetrics.mDuplicateFloodMessageRecv -
+ collectingData.mInitialDuplicateFloodMessageRecv;
+ finalStats.uniqueFetchMessageRecv =
+ peerMetrics.mUniqueFetchMessageRecv -
+ collectingData.mInitialUniqueFetchMessageRecv;
+ finalStats.duplicateFetchMessageRecv =
+ peerMetrics.mDuplicateFetchMessageRecv -
+ collectingData.mInitialDuplicateFetchMessageRecv;
+ finalData.averageLatencyMs = static_cast(
+ collectingData.mLatencyMsHistogram.GetSnapshot().getMedian());
+ }
+ }
+}
+
+std::chrono::minutes
+SurveyDataManager::getCollectingPhaseMaxDuration() const
+{
+#ifdef BUILD_TESTS
+ if (mMaxPhaseDurationForTesting.has_value())
+ {
+ return mMaxPhaseDurationForTesting.value();
+ }
+#endif
+ return std::chrono::duration_cast(
+ COLLECTING_PHASE_MAX_DURATION);
+}
+
+std::chrono::minutes
+SurveyDataManager::getReportingPhaseMaxDuration() const
+{
+#ifdef BUILD_TESTS
+ if (mMaxPhaseDurationForTesting.has_value())
+ {
+ return mMaxPhaseDurationForTesting.value();
+ }
+#endif
+ return std::chrono::duration_cast(
+ REPORTING_PHASE_MAX_DURATION);
+}
+
+} // namespace stellar
\ No newline at end of file
diff --git a/src/overlay/SurveyDataManager.h b/src/overlay/SurveyDataManager.h
new file mode 100644
index 0000000000..8f152c452e
--- /dev/null
+++ b/src/overlay/SurveyDataManager.h
@@ -0,0 +1,223 @@
+#pragma once
+
+// Copyright 2024 Stellar Development Foundation and contributors. Licensed
+// under the Apache License, Version 2.0. See the COPYING file at the root
+// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0
+
+#include "main/Application.h"
+#include "main/Config.h"
+#include "overlay/Peer.h"
+#include "util/NonCopyable.h"
+#include "util/Timer.h"
+
+#include "medida/histogram.h"
+#include "medida/meter.h"
+#include "xdr/Stellar-overlay.h"
+#include "xdr/Stellar-types.h"
+
+#include
+#include
+#include
+#include
+
+namespace stellar
+{
+
+enum class SurveyPhase
+{
+ // Survey is currently collecting data
+ COLLECTING,
+ // Collecting complete. Survey data is available for reporting.
+ REPORTING,
+ // No active survey in progress. No data is being collected or reported.
+ INACTIVE
+};
+
+struct CollectingNodeData
+{
+ CollectingNodeData(uint64_t initialLostSyncCount,
+ Application::State initialState);
+
+ // Peer change data
+ uint32_t mAddedAuthenticatedPeers = 0;
+ uint32_t mDroppedAuthenticatedPeers = 0;
+
+ // SCP stats (in nanoseconds)
+ medida::Histogram mSCPFirstToSelfLatencyNsHistogram;
+ medida::Histogram mSCPSelfToOtherLatencyNsHistogram;
+
+ // To compute how many times the node lost sync in the time slice
+ uint64_t const mInitialLostSyncCount;
+
+ // State of the node at the start of the survey
+ Application::State const mInitialState;
+};
+
+// Data about a peer
+struct CollectingPeerData
+{
+ CollectingPeerData(Peer::PeerMetrics const& peerMetrics);
+
+ // Metrics at the start of the survey
+ uint64_t const mInitialMessageRead;
+ uint64_t const mInitialMessageWrite;
+ uint64_t const mInitialByteRead;
+ uint64_t const mInitialByteWrite;
+ uint64_t const mInitialUniqueFloodBytesRecv;
+ uint64_t const mInitialDuplicateFloodBytesRecv;
+ uint64_t const mInitialUniqueFetchBytesRecv;
+ uint64_t const mInitialDuplicateFetchBytesRecv;
+ uint64_t const mInitialUniqueFloodMessageRecv;
+ uint64_t const mInitialDuplicateFloodMessageRecv;
+ uint64_t const mInitialUniqueFetchMessageRecv;
+ uint64_t const mInitialDuplicateFetchMessageRecv;
+
+ // For computing average latency (in milliseconds)
+ medida::Histogram mLatencyMsHistogram;
+};
+
+/*
+ * Manage data collection during time sliced overlay surveys. This class is
+ * thread-safe.
+ */
+class SurveyDataManager : public NonMovableOrCopyable
+{
+ public:
+ // Create a survey manager. `clock` must be the Application's clock.
+ // `lostSyncMeter` is a meter to track how many times the node lost sync.
+ SurveyDataManager(std::function const& getNow,
+ medida::Meter const& lostSyncMeter, Config const& cfg);
+
+ // Start the collecting phase of a survey. Ignores requests if a survey is
+ // already active. `inboundPeers` and `outboundPeers` should collectively
+ // contain the `NodeID`s of all connected peers. Returns `true` if this
+ // successfully starts a survey.
+ bool
+ startSurveyCollecting(TimeSlicedSurveyStartCollectingMessage const& msg,
+ std::map const& inboundPeers,
+ std::map const& outboundPeers,
+ Application::State initialState);
+
+ // Stop the collecting phase of a survey and enter the reporting phase.
+ // Ignores request if no survey is active or if nonce does not match the
+ // active survey. Returns `true` if this successfully stops a survey.
+ bool
+ stopSurveyCollecting(TimeSlicedSurveyStopCollectingMessage const& msg,
+ std::map const& inboundPeers,
+ std::map const& outboundPeers,
+ Config const& config);
+
+ // Apply `f` to the data for this node. Does nothing if the survey is not in
+ // the collecting phase.
+ void modifyNodeData(std::function f);
+
+ // Apply `f` to the data for `peer`. Does nothing if the survey is not in
+ // the collecting phase or if `peer` is not in the current time slice data.
+ void modifyPeerData(Peer const& peer,
+ std::function f);
+
+ // Record that `peer` was dropped from the overlay. Does nothing if the
+ // survey is not in the collecting phase.
+ void recordDroppedPeer(Peer const& peer);
+
+ // Get nonce of current survey, if one is active.
+ std::optional getNonce() const;
+
+ // Returns `true` if the `nonce` matches the survey in the reporting phase.
+ bool nonceIsReporting(uint32_t nonce) const;
+
+ // Fills `response` with the results of the survey, provided that the survey
+ // corresponding to the request's nonce is in the reporting phase. Returns
+ // `true` on success.
+ bool fillSurveyData(TimeSlicedSurveyRequestMessage const& request,
+ TopologyResponseBodyV2& response);
+
+ // Returns `true` iff there is currently an active survey
+ bool surveyIsActive() const;
+
+ // Checks and updates the phase of the survey if necessary. Resets the
+ // survey state upon transition to `SurveyPhase::INACTIVE`. Takes peer and
+ // config info in case the collecting phase times out and the survey
+ // automatically transitions to the reporting phase.
+ void updateSurveyPhase(std::map const& inboundPeers,
+ std::map const& outboundPeers,
+ Config const& config);
+
+#ifdef BUILD_TESTS
+ // Call to use the provided duration as max for both collecting and
+ // reporting phase instead of the normal max phase durations
+ void setPhaseMaxDurationsForTesting(std::chrono::minutes maxPhaseDuration);
+#endif // BUILD_TESTS
+
+ private:
+ // Get the current time
+ std::function const mGetNow;
+
+ // Metric tracking sync status
+ medida::Meter const& mLostSyncMeter;
+
+ // Start and stop times for the collecting phase
+ std::optional mCollectStartTime = std::nullopt;
+ std::optional mCollectEndTime = std::nullopt;
+
+ // Nonce of the active survey (if any)
+ std::optional mNonce = std::nullopt;
+
+ // Surveyor running active survey (if any)
+ std::optional mSurveyor = std::nullopt;
+
+ // Data about this node captured during the collecting phase
+ std::optional mCollectingNodeData = std::nullopt;
+
+ // Finalized reporting phase data about this node
+ std::optional mFinalNodeData = std::nullopt;
+
+ // Data about peers during collecting phase
+ std::unordered_map mCollectingInboundPeerData;
+ std::unordered_map mCollectingOutboundPeerData;
+
+ // Finalized reporting phase data about peers
+ std::vector mFinalInboundPeerData;
+ std::vector mFinalOutboundPeerData;
+
+ // The current survey phase
+ SurveyPhase mPhase = SurveyPhase::INACTIVE;
+
+#ifdef BUILD_TESTS
+ // Override maximum phase durations for testing
+ std::optional mMaxPhaseDurationForTesting =
+ std::nullopt;
+#endif // BUILD_TESTS
+
+ // Reset survey data. Intended to be called when survey data expires.
+ void reset();
+
+ // Transition to the reporting phase. Should only be called from the
+ // collecting phase. Returns `false` if transition fails.
+ bool
+ startReportingPhase(std::map const& inboundPeers,
+ std::map const& outboundPeers,
+ Config const& config);
+
+ // Function to call when the impossible occurs. Logs an error and resets
+ // the survey. Use instead of `releaseAssert` as an overlay survey
+ // failure is not important enough to crash the program.
+ void emitInconsistencyError(std::string const& where);
+
+ // Finalize node data into `mFinalNodeData`. Should only be called after
+ // finalizing peer data.
+ void finalizeNodeData(Config const& config);
+
+ // Finalize peer data into `finalPeerData`
+ void finalizePeerData(std::map const peers,
+ std::unordered_map const&
+ collectingPeerData,
+ std::vector& finalPeerData);
+
+ // Get the max phase durations for the collecting and reporting phases
+ // respectively
+ std::chrono::minutes getCollectingPhaseMaxDuration() const;
+ std::chrono::minutes getReportingPhaseMaxDuration() const;
+};
+
+} // namespace stellar
\ No newline at end of file
diff --git a/src/overlay/SurveyManager.cpp b/src/overlay/SurveyManager.cpp
index 6aed400a68..88294e0997 100644
--- a/src/overlay/SurveyManager.cpp
+++ b/src/overlay/SurveyManager.cpp
@@ -7,7 +7,9 @@
#include "herder/Herder.h"
#include "main/Application.h"
#include "main/ErrorMessages.h"
+#include "medida/metrics_registry.h"
#include "overlay/OverlayManager.h"
+#include "overlay/SurveyDataManager.h"
#include "util/GlobalChecks.h"
#include "util/Logging.h"
#include "xdrpp/marshal.h"
@@ -17,6 +19,74 @@ namespace stellar
uint32_t const SurveyManager::SURVEY_THROTTLE_TIMEOUT_MULT(3);
+uint32_t constexpr TIME_SLICED_SURVEY_MIN_OVERLAY_PROTOCOL_VERSION = 33;
+
+namespace
+{
+// Generate JSON for a single peer
+Json::Value
+peerStatsToJson(PeerStats const& peer)
+{
+ Json::Value peerInfo;
+ peerInfo["nodeId"] = KeyUtils::toStrKey(peer.id);
+ peerInfo["version"] = peer.versionStr;
+ peerInfo["messagesRead"] = static_cast(peer.messagesRead);
+ peerInfo["messagesWritten"] =
+ static_cast(peer.messagesWritten);
+ peerInfo["bytesRead"] = static_cast(peer.bytesRead);
+ peerInfo["bytesWritten"] = static_cast(peer.bytesWritten);
+ peerInfo["secondsConnected"] =
+ static_cast(peer.secondsConnected);
+
+ peerInfo["uniqueFloodBytesRecv"] =
+ static_cast(peer.uniqueFloodBytesRecv);
+ peerInfo["duplicateFloodBytesRecv"] =
+ static_cast(peer.duplicateFloodBytesRecv);
+ peerInfo["uniqueFetchBytesRecv"] =
+ static_cast(peer.uniqueFetchBytesRecv);
+ peerInfo["duplicateFetchBytesRecv"] =
+ static_cast(peer.duplicateFetchBytesRecv);
+
+ peerInfo["uniqueFloodMessageRecv"] =
+ static_cast(peer.uniqueFloodMessageRecv);
+ peerInfo["duplicateFloodMessageRecv"] =
+ static_cast(peer.duplicateFloodMessageRecv);
+ peerInfo["uniqueFetchMessageRecv"] =
+ static_cast(peer.uniqueFetchMessageRecv);
+ peerInfo["duplicateFetchMessageRecv"] =
+ static_cast(peer.duplicateFetchMessageRecv);
+ return peerInfo;
+}
+
+// Generate JSON for each peer in `peerList` and append to `jsonResultList`
+void
+recordTimeSlicedLinkResults(Json::Value& jsonResultList,
+ TimeSlicedPeerDataList const& peerList)
+{
+ for (auto const& peer : peerList)
+ {
+ Json::Value peerInfo = peerStatsToJson(peer.peerStats);
+ peerInfo["averageLatencyMs"] = peer.averageLatencyMs;
+ jsonResultList.append(peerInfo);
+ }
+}
+
+// Extract a reference to the SurveyRequestMessage within a StellarMessage
+SurveyRequestMessage const&
+extractSurveyRequestMessage(StellarMessage const& msg)
+{
+ switch (msg.type())
+ {
+ case SURVEY_REQUEST:
+ return msg.signedSurveyRequestMessage().request;
+ case TIME_SLICED_SURVEY_REQUEST:
+ return msg.signedTimeSlicedSurveyRequestMessage().request.request;
+ default:
+ releaseAssert(false);
+ }
+}
+} // namespace
+
SurveyManager::SurveyManager(Application& app)
: mApp(app)
, mSurveyThrottleTimer(std::make_unique(mApp))
@@ -27,14 +97,19 @@ SurveyManager::SurveyManager(Application& app)
, SURVEY_THROTTLE_TIMEOUT_SEC(
mApp.getConfig().getExpectedLedgerCloseTime() *
SURVEY_THROTTLE_TIMEOUT_MULT)
+ , mSurveyDataManager(
+ [this]() { return mApp.getClock().now(); },
+ mApp.getMetrics().NewMeter({"scp", "sync", "lost"}, "sync"),
+ mApp.getConfig())
{
}
bool
-SurveyManager::startSurvey(SurveyMessageCommandType type,
- std::chrono::seconds surveyDuration)
+SurveyManager::startSurveyReporting(
+ SurveyMessageCommandType type,
+ std::optional surveyDuration)
{
- if (mRunningSurveyType)
+ if (mRunningSurveyReportingPhaseType)
{
return false;
}
@@ -49,12 +124,35 @@ SurveyManager::startSurvey(SurveyMessageCommandType type,
mPeersToSurvey.clear();
mPeersToSurveyQueue = std::queue();
- mRunningSurveyType = std::make_optional(type);
+ mRunningSurveyReportingPhaseType =
+ std::make_optional(type);
mCurve25519SecretKey = curve25519RandomSecret();
mCurve25519PublicKey = curve25519DerivePublic(mCurve25519SecretKey);
- updateSurveyExpiration(surveyDuration);
+ // Check surveyDuration (should only be set for old style surveys; time
+ // sliced surveys use a builtin timeout)
+ switch (type)
+ {
+ case SURVEY_TOPOLOGY:
+ if (!surveyDuration.has_value())
+ {
+ throw std::runtime_error(
+ "startSurveyReporting failed: missing survey duration");
+ }
+ updateOldStyleSurveyExpiration(surveyDuration.value());
+ break;
+ case TIME_SLICED_SURVEY_TOPOLOGY:
+ // Time sliced surveys have a built-in timeout, so one should not be
+ // passed in.
+ if (surveyDuration.has_value())
+ {
+ throw std::runtime_error(
+ "startSurveyReporting failed: unexpected survey duration");
+ }
+ break;
+ }
+
// starts timer
topOffRequests(type);
@@ -62,15 +160,15 @@ SurveyManager::startSurvey(SurveyMessageCommandType type,
}
void
-SurveyManager::stopSurvey()
+SurveyManager::stopSurveyReporting()
{
- // do nothing if survey isn't running
- if (!mRunningSurveyType)
+ // do nothing if survey isn't running in reporting phase
+ if (!mRunningSurveyReportingPhaseType)
{
return;
}
- mRunningSurveyType.reset();
+ mRunningSurveyReportingPhaseType.reset();
mSurveyThrottleTimer->cancel();
clearCurve25519Keys(mCurve25519PublicKey, mCurve25519SecretKey);
@@ -78,22 +176,210 @@ SurveyManager::stopSurvey()
CLOG_INFO(Overlay, "SurveyResults {}", getJsonResults().toStyledString());
}
+bool
+SurveyManager::broadcastStartSurveyCollecting(uint32_t nonce)
+{
+ if (mSurveyDataManager.surveyIsActive())
+ {
+ CLOG_ERROR(
+ Overlay,
+ "Cannot start survey with nonce {} because another survey is "
+ "already active",
+ nonce);
+ return false;
+ }
+ StellarMessage newMsg;
+ newMsg.type(TIME_SLICED_SURVEY_START_COLLECTING);
+ auto& signedStartCollecting =
+ newMsg.signedTimeSlicedSurveyStartCollectingMessage();
+ auto& startCollecting = signedStartCollecting.startCollecting;
+
+ startCollecting.surveyorID = mApp.getConfig().NODE_SEED.getPublicKey();
+ startCollecting.nonce = nonce;
+ startCollecting.ledgerNum = mApp.getHerder().trackingConsensusLedgerIndex();
+
+ auto sigBody = xdr::xdr_to_opaque(startCollecting);
+ signedStartCollecting.signature = mApp.getConfig().NODE_SEED.sign(sigBody);
+
+ relayStartSurveyCollecting(newMsg, nullptr);
+ return true;
+}
+
+void
+SurveyManager::relayStartSurveyCollecting(StellarMessage const& msg,
+ Peer::pointer peer)
+{
+ releaseAssert(msg.type() == TIME_SLICED_SURVEY_START_COLLECTING);
+ auto const& signedStartCollecting =
+ msg.signedTimeSlicedSurveyStartCollectingMessage();
+ auto const& startCollecting = signedStartCollecting.startCollecting;
+
+ auto surveyorIsSelf =
+ startCollecting.surveyorID == mApp.getConfig().NODE_SEED.getPublicKey();
+ if (!surveyorIsSelf)
+ {
+ releaseAssert(peer);
+
+ if (!surveyorPermitted(startCollecting.surveyorID))
+ {
+ return;
+ }
+ }
+
+ auto onSuccessValidation = [&]() -> bool {
+ // Check signature
+ return dropPeerIfSigInvalid(startCollecting.surveyorID,
+ signedStartCollecting.signature,
+ xdr::xdr_to_opaque(startCollecting), peer);
+ };
+
+ if (!mMessageLimiter.validateStartSurveyCollecting(
+ startCollecting, mSurveyDataManager, onSuccessValidation))
+ {
+ return;
+ }
+
+ OverlayManager& om = mApp.getOverlayManager();
+ if (!mSurveyDataManager.startSurveyCollecting(
+ startCollecting, om.getInboundAuthenticatedPeers(),
+ om.getOutboundAuthenticatedPeers(), mApp.getState()))
+ {
+ return;
+ }
+
+ if (peer)
+ {
+ om.recvFloodedMsg(msg, peer);
+ }
+
+ broadcast(msg);
+}
+
+bool
+SurveyManager::broadcastStopSurveyCollecting()
+{
+ std::optional maybeNonce = mSurveyDataManager.getNonce();
+ if (!maybeNonce.has_value())
+ {
+ return false;
+ }
+
+ StellarMessage newMsg;
+ newMsg.type(TIME_SLICED_SURVEY_STOP_COLLECTING);
+ auto& signedStopCollecting =
+ newMsg.signedTimeSlicedSurveyStopCollectingMessage();
+ auto& stopCollecting = signedStopCollecting.stopCollecting;
+
+ stopCollecting.surveyorID = mApp.getConfig().NODE_SEED.getPublicKey();
+ stopCollecting.nonce = maybeNonce.value();
+ stopCollecting.ledgerNum = mApp.getHerder().trackingConsensusLedgerIndex();
+
+ auto sigBody = xdr::xdr_to_opaque(stopCollecting);
+ signedStopCollecting.signature = mApp.getConfig().NODE_SEED.sign(sigBody);
+
+ relayStopSurveyCollecting(newMsg, nullptr);
+
+ return true;
+}
+
+void
+SurveyManager::relayStopSurveyCollecting(StellarMessage const& msg,
+ Peer::pointer peer)
+{
+ releaseAssert(msg.type() == TIME_SLICED_SURVEY_STOP_COLLECTING);
+ auto const& signedStopCollecting =
+ msg.signedTimeSlicedSurveyStopCollectingMessage();
+ auto const& stopCollecting = signedStopCollecting.stopCollecting;
+
+ auto surveyorIsSelf =
+ stopCollecting.surveyorID == mApp.getConfig().NODE_SEED.getPublicKey();
+ if (!surveyorIsSelf)
+ {
+ releaseAssert(peer);
+
+ if (!surveyorPermitted(stopCollecting.surveyorID))
+ {
+ return;
+ }
+ }
+
+ auto onSuccessValidation = [&]() -> bool {
+ // Check signature
+ return dropPeerIfSigInvalid(stopCollecting.surveyorID,
+ signedStopCollecting.signature,
+ xdr::xdr_to_opaque(stopCollecting), peer);
+ };
+
+ if (!mMessageLimiter.validateStopSurveyCollecting(stopCollecting,
+ onSuccessValidation))
+ {
+ return;
+ }
+
+ OverlayManager& om = mApp.getOverlayManager();
+ if (!mSurveyDataManager.stopSurveyCollecting(
+ stopCollecting, om.getInboundAuthenticatedPeers(),
+ om.getOutboundAuthenticatedPeers(), mApp.getConfig()))
+ {
+ return;
+ }
+
+ if (peer)
+ {
+ mApp.getOverlayManager().recvFloodedMsg(msg, peer);
+ }
+
+ broadcast(msg);
+}
+
void
SurveyManager::addNodeToRunningSurveyBacklog(
- SurveyMessageCommandType type, std::chrono::seconds surveyDuration,
- NodeID const& nodeToSurvey)
+ SurveyMessageCommandType type,
+ std::optional surveyDuration,
+ NodeID const& nodeToSurvey, std::optional inboundPeersIndex,
+ std::optional outboundPeersIndex)
{
- if (!mRunningSurveyType || *mRunningSurveyType != type)
+ if (!mRunningSurveyReportingPhaseType ||
+ *mRunningSurveyReportingPhaseType != type)
{
throw std::runtime_error("addNodeToRunningSurveyBacklog failed");
}
addPeerToBacklog(nodeToSurvey);
- updateSurveyExpiration(surveyDuration);
+
+ switch (type)
+ {
+ case SURVEY_TOPOLOGY:
+ if (!surveyDuration.has_value())
+ {
+ throw std::runtime_error("addNodeToRunningSurveyBacklog failed: "
+ "missing survey duration");
+ }
+ updateOldStyleSurveyExpiration(surveyDuration.value());
+ break;
+ case TIME_SLICED_SURVEY_TOPOLOGY:
+ // Time sliced surveys have a built-in timeout, so one should not be
+ // passed in.
+ if (surveyDuration.has_value())
+ {
+ throw std::runtime_error("addNodeToRunningSurveyBacklog failed: "
+ "unexpected survey duration");
+ }
+
+ if (!inboundPeersIndex.has_value() || !outboundPeersIndex.has_value())
+ {
+ throw std::runtime_error(
+ "addNodeToRunningSurveyBacklog failed: missing peer indices");
+ }
+
+ mInboundPeerIndices[nodeToSurvey] = inboundPeersIndex.value();
+ mOutboundPeerIndices[nodeToSurvey] = outboundPeersIndex.value();
+ break;
+ }
}
-void
-SurveyManager::relayOrProcessResponse(StellarMessage const& msg,
+std::optional
+SurveyManager::validateSurveyResponse(StellarMessage const& msg,
Peer::pointer peer)
{
releaseAssert(msg.type() == SURVEY_RESPONSE);
@@ -106,12 +392,74 @@ SurveyManager::relayOrProcessResponse(StellarMessage const& msg,
xdr::xdr_to_opaque(response), peer);
};
- if (!mMessageLimiter.recordAndValidateResponse(response,
- onSuccessValidation))
+ if (mMessageLimiter.recordAndValidateResponse(response,
+ onSuccessValidation))
+ {
+ return response;
+ }
+ else
+ {
+ return std::nullopt;
+ }
+}
+
+std::optional
+SurveyManager::validateTimeSlicedSurveyResponse(StellarMessage const& msg,
+ Peer::pointer peer)
+{
+ releaseAssert(msg.type() == TIME_SLICED_SURVEY_RESPONSE);
+ auto const& signedResponse = msg.signedTimeSlicedSurveyResponseMessage();
+ auto const& response = signedResponse.response.response;
+
+ auto onSuccessValidation = [&]() -> bool {
+ // Check nonce
+ if (!mSurveyDataManager.nonceIsReporting(signedResponse.response.nonce))
+ {
+ return false;
+ }
+
+ // Check signature
+ return dropPeerIfSigInvalid(
+ response.surveyedPeerID, signedResponse.responseSignature,
+ xdr::xdr_to_opaque(signedResponse.response), peer);
+ };
+
+ if (mMessageLimiter.recordAndValidateResponse(response,
+ onSuccessValidation))
+ {
+ return response;
+ }
+ else
+ {
+ return std::nullopt;
+ }
+}
+
+void
+SurveyManager::relayOrProcessResponse(StellarMessage const& msg,
+ Peer::pointer peer)
+{
+ std::optional maybeResponse;
+ switch (msg.type())
+ {
+ case SURVEY_RESPONSE:
+ maybeResponse = validateSurveyResponse(msg, peer);
+ break;
+ case TIME_SLICED_SURVEY_RESPONSE:
+ maybeResponse = validateTimeSlicedSurveyResponse(msg, peer);
+ break;
+ default:
+ releaseAssert(false);
+ }
+
+ if (!maybeResponse.has_value())
{
+ // Validation failed
return;
}
+ auto const& response = maybeResponse.value();
+
// mMessageLimiter filters out duplicates, so here we are guaranteed
// to record the message for the first time
mApp.getOverlayManager().recvFloodedMsg(msg, peer);
@@ -120,7 +468,8 @@ SurveyManager::relayOrProcessResponse(StellarMessage const& msg,
{
// only process if survey is still running and we haven't seen the
// response
- if (mRunningSurveyType && *mRunningSurveyType == response.commandType)
+ if (mRunningSurveyReportingPhaseType &&
+ *mRunningSurveyReportingPhaseType == response.commandType)
{
try
{
@@ -130,8 +479,23 @@ SurveyManager::relayOrProcessResponse(StellarMessage const& msg,
SurveyResponseBody body;
xdr::xdr_from_opaque(opaqueDecrypted, body);
-
- processTopologyResponse(response.surveyedPeerID, body);
+ switch (msg.type())
+ {
+ case SURVEY_RESPONSE:
+ {
+ processOldStyleTopologyResponse(response.surveyedPeerID,
+ body);
+ }
+ break;
+ case TIME_SLICED_SURVEY_RESPONSE:
+ {
+ processTimeSlicedTopologyResponse(response.surveyedPeerID,
+ body);
+ }
+ break;
+ default:
+ releaseAssert(false);
+ }
}
catch (std::exception const& e)
{
@@ -145,8 +509,8 @@ SurveyManager::relayOrProcessResponse(StellarMessage const& msg,
}
else
{
- // messageLimiter guarantees we only flood the response if we've seen
- // the request
+ // messageLimiter guarantees we only flood the response if we've
+ // seen the request
broadcast(msg);
}
}
@@ -155,11 +519,7 @@ void
SurveyManager::relayOrProcessRequest(StellarMessage const& msg,
Peer::pointer peer)
{
- releaseAssert(msg.type() == SURVEY_REQUEST);
- SignedSurveyRequestMessage const& signedRequest =
- msg.signedSurveyRequestMessage();
-
- SurveyRequestMessage const& request = signedRequest.request;
+ SurveyRequestMessage const& request = extractSurveyRequestMessage(msg);
auto surveyorIsSelf =
request.surveyorPeerID == mApp.getConfig().NODE_SEED.getPublicKey();
@@ -167,32 +527,41 @@ SurveyManager::relayOrProcessRequest(StellarMessage const& msg,
{
releaseAssert(peer);
- // perform all validation checks before signature validation so we don't
- // waste time verifying signatures
- auto const& surveyorKeys = mApp.getConfig().SURVEYOR_KEYS;
-
- if (surveyorKeys.empty())
+ if (!surveyorPermitted(request.surveyorPeerID))
{
- auto const& quorumMap =
- mApp.getHerder().getCurrentlyTrackedQuorum();
- if (quorumMap.count(request.surveyorPeerID) == 0)
- {
- return;
- }
+ return;
}
- else
+ }
+
+ auto onSuccessValidation = [&]() -> bool {
+ bool res;
+ switch (msg.type())
+ {
+ case SURVEY_REQUEST:
+ res = dropPeerIfSigInvalid(
+ request.surveyorPeerID,
+ msg.signedSurveyRequestMessage().requestSignature,
+ xdr::xdr_to_opaque(request), peer);
+ break;
+ case TIME_SLICED_SURVEY_REQUEST:
{
- if (surveyorKeys.count(request.surveyorPeerID) == 0)
+ SignedTimeSlicedSurveyRequestMessage const& signedRequest =
+ msg.signedTimeSlicedSurveyRequestMessage();
+ // check nonce
+ res = mSurveyDataManager.nonceIsReporting(
+ signedRequest.request.nonce);
+ if (res)
{
- return;
+ // Check signature
+ res = dropPeerIfSigInvalid(
+ request.surveyorPeerID, signedRequest.requestSignature,
+ xdr::xdr_to_opaque(signedRequest.request), peer);
}
}
- }
-
- auto onSuccessValidation = [&]() -> bool {
- auto res = dropPeerIfSigInvalid(request.surveyorPeerID,
- signedRequest.requestSignature,
- xdr::xdr_to_opaque(request), peer);
+ break;
+ default:
+ releaseAssert(false);
+ }
if (!res && surveyorIsSelf)
{
CLOG_ERROR(Overlay, "Unexpected invalid survey request: {} ",
@@ -213,7 +582,18 @@ SurveyManager::relayOrProcessRequest(StellarMessage const& msg,
if (request.surveyedPeerID == mApp.getConfig().NODE_SEED.getPublicKey())
{
- processTopologyRequest(request);
+ switch (msg.type())
+ {
+ case SURVEY_REQUEST:
+ processOldStyleTopologyRequest(request);
+ break;
+ case TIME_SLICED_SURVEY_REQUEST:
+ processTimeSlicedTopologyRequest(
+ msg.signedTimeSlicedSurveyRequestMessage().request);
+ break;
+ default:
+ releaseAssert(false);
+ }
}
else
{
@@ -221,8 +601,21 @@ SurveyManager::relayOrProcessRequest(StellarMessage const& msg,
}
}
+void
+SurveyManager::populateSurveyRequestMessage(NodeID const& nodeToSurvey,
+ SurveyMessageCommandType type,
+ SurveyRequestMessage& request) const
+{
+ request.ledgerNum = mApp.getHerder().trackingConsensusLedgerIndex();
+ request.surveyorPeerID = mApp.getConfig().NODE_SEED.getPublicKey();
+
+ request.surveyedPeerID = nodeToSurvey;
+ request.encryptionKey = mCurve25519PublicKey;
+ request.commandType = type;
+}
+
StellarMessage
-SurveyManager::makeSurveyRequest(NodeID const& nodeToSurvey) const
+SurveyManager::makeOldStyleSurveyRequest(NodeID const& nodeToSurvey) const
{
StellarMessage newMsg;
newMsg.type(SURVEY_REQUEST);
@@ -230,13 +623,7 @@ SurveyManager::makeSurveyRequest(NodeID const& nodeToSurvey) const
auto& signedRequest = newMsg.signedSurveyRequestMessage();
auto& request = signedRequest.request;
- request.ledgerNum = mApp.getHerder().trackingConsensusLedgerIndex();
- request.surveyorPeerID = mApp.getConfig().NODE_SEED.getPublicKey();
-
- request.surveyedPeerID = nodeToSurvey;
- request.encryptionKey = mCurve25519PublicKey;
- request.commandType = SURVEY_TOPOLOGY;
-
+ populateSurveyRequestMessage(nodeToSurvey, SURVEY_TOPOLOGY, request);
auto sigBody = xdr::xdr_to_opaque(request);
signedRequest.requestSignature = mApp.getConfig().NODE_SEED.sign(sigBody);
@@ -246,14 +633,58 @@ SurveyManager::makeSurveyRequest(NodeID const& nodeToSurvey) const
void
SurveyManager::sendTopologyRequest(NodeID const& nodeToSurvey)
{
+ if (!mRunningSurveyReportingPhaseType.has_value())
+ {
+ CLOG_ERROR(Overlay, "Tried to send survey request when no survey is "
+ "running in reporting phase");
+ return;
+ }
+
+ StellarMessage newMsg;
+ switch (mRunningSurveyReportingPhaseType.value())
+ {
+ case SURVEY_TOPOLOGY:
+ newMsg = makeOldStyleSurveyRequest(nodeToSurvey);
+ break;
+ case TIME_SLICED_SURVEY_TOPOLOGY:
+ {
+ newMsg.type(TIME_SLICED_SURVEY_REQUEST);
+
+ auto& signedRequest = newMsg.signedTimeSlicedSurveyRequestMessage();
+ auto& outerRequest = signedRequest.request;
+ auto& innerRequest = outerRequest.request;
+ populateSurveyRequestMessage(nodeToSurvey, TIME_SLICED_SURVEY_TOPOLOGY,
+ innerRequest);
+
+ auto maybeNonce = mSurveyDataManager.getNonce();
+ if (!maybeNonce.has_value())
+ {
+ // Reporting phase has ended. Drop the request.
+ return;
+ }
+
+ outerRequest.nonce = maybeNonce.value();
+ outerRequest.inboundPeersIndex = mInboundPeerIndices.at(nodeToSurvey);
+ outerRequest.outboundPeersIndex = mOutboundPeerIndices.at(nodeToSurvey);
+
+ auto sigBody = xdr::xdr_to_opaque(outerRequest);
+ signedRequest.requestSignature =
+ mApp.getConfig().NODE_SEED.sign(sigBody);
+ }
+ break;
+ default:
+ releaseAssert(false);
+ }
// Record the request in message limiter and broadcast
- relayOrProcessRequest(makeSurveyRequest(nodeToSurvey), nullptr);
+ relayOrProcessRequest(newMsg, nullptr);
}
void
-SurveyManager::processTopologyResponse(NodeID const& surveyedPeerID,
- SurveyResponseBody const& body)
+SurveyManager::processOldStyleTopologyResponse(NodeID const& surveyedPeerID,
+ SurveyResponseBody const& body)
{
+ releaseAssert(body.type() == SURVEY_TOPOLOGY_RESPONSE_V0 ||
+ body.type() == SURVEY_TOPOLOGY_RESPONSE_V1);
auto& peerResults =
mResults["topology"][KeyUtils::toStrKey(surveyedPeerID)];
auto populatePeerResults = [&](auto const& topologyBody) {
@@ -285,7 +716,60 @@ SurveyManager::processTopologyResponse(NodeID const& surveyedPeerID,
}
void
-SurveyManager::processTopologyRequest(SurveyRequestMessage const& request) const
+SurveyManager::processTimeSlicedTopologyResponse(NodeID const& surveyedPeerID,
+ SurveyResponseBody const& body)
+{
+ releaseAssert(body.type() == SURVEY_TOPOLOGY_RESPONSE_V2);
+ auto& peerResults =
+ mResults["topology"][KeyUtils::toStrKey(surveyedPeerID)];
+
+ // Fill in node data
+ auto const& topologyBody = body.topologyResponseBodyV2();
+ TimeSlicedNodeData const& node = topologyBody.nodeData;
+ peerResults["addedAuthenticatedPeers"] = node.addedAuthenticatedPeers;
+ peerResults["droppedAuthenticatedPeers"] = node.droppedAuthenticatedPeers;
+ peerResults["numTotalInboundPeers"] = node.totalInboundPeerCount;
+ peerResults["numTotalOutboundPeers"] = node.totalOutboundPeerCount;
+ peerResults["p75SCPFirstToSelfLatencyNs"] = node.p75SCPFirstToSelfLatencyNs;
+ peerResults["p75SCPSelfToOtherLatencyNs"] = node.p75SCPSelfToOtherLatencyNs;
+ peerResults["lostSyncCount"] = node.lostSyncCount;
+ peerResults["isValidator"] = node.isValidator;
+ peerResults["maxInboundPeerCount"] = node.maxInboundPeerCount;
+ peerResults["maxOutboundPeerCount"] = node.maxOutboundPeerCount;
+
+ // Fill in link data
+ auto& inboundResults = peerResults["inboundPeers"];
+ auto& outboundResults = peerResults["outboundPeers"];
+ recordTimeSlicedLinkResults(inboundResults, topologyBody.inboundPeers);
+ recordTimeSlicedLinkResults(outboundResults, topologyBody.outboundPeers);
+}
+
+bool
+SurveyManager::populateSurveyResponseMessage(
+ SurveyRequestMessage const& request, SurveyMessageCommandType type,
+ SurveyResponseBody const& body, SurveyResponseMessage& response) const
+{
+ response.ledgerNum = request.ledgerNum;
+ response.surveyorPeerID = request.surveyorPeerID;
+ response.surveyedPeerID = mApp.getConfig().NODE_SEED.getPublicKey();
+ response.commandType = type;
+
+ try
+ {
+ response.encryptedBody = curve25519Encrypt(
+ request.encryptionKey, xdr::xdr_to_opaque(body));
+ }
+ catch (std::exception const& e)
+ {
+ CLOG_ERROR(Overlay, "curve25519Encrypt failed: {}", e.what());
+ return false;
+ }
+ return true;
+}
+
+void
+SurveyManager::processOldStyleTopologyRequest(
+ SurveyRequestMessage const& request) const
{
CLOG_TRACE(Overlay, "Responding to Topology request from {}",
mApp.getConfig().toShortString(request.surveyorPeerID));
@@ -294,12 +778,6 @@ SurveyManager::processTopologyRequest(SurveyRequestMessage const& request) const
newMsg.type(SURVEY_RESPONSE);
auto& signedResponse = newMsg.signedSurveyResponseMessage();
- auto& response = signedResponse.response;
-
- response.ledgerNum = request.ledgerNum;
- response.surveyorPeerID = request.surveyorPeerID;
- response.surveyedPeerID = mApp.getConfig().NODE_SEED.getPublicKey();
- response.commandType = SURVEY_TOPOLOGY;
SurveyResponseBody body;
body.type(SURVEY_TOPOLOGY_RESPONSE_V1);
@@ -326,18 +804,56 @@ SurveyManager::processTopologyRequest(SurveyRequestMessage const& request) const
topologyBody.maxOutboundPeerCount =
mApp.getConfig().TARGET_PEER_CONNECTIONS;
- try
+ auto& response = signedResponse.response;
+ if (!populateSurveyResponseMessage(request, SURVEY_TOPOLOGY, body,
+ response))
{
- response.encryptedBody = curve25519Encrypt(
- request.encryptionKey, xdr::xdr_to_opaque(body));
+ return;
}
- catch (std::exception const& e)
+
+ auto sigBody = xdr::xdr_to_opaque(response);
+ signedResponse.responseSignature = mApp.getConfig().NODE_SEED.sign(sigBody);
+
+ broadcast(newMsg);
+}
+
+void
+SurveyManager::processTimeSlicedTopologyRequest(
+ TimeSlicedSurveyRequestMessage const& request)
+{
+ std::string const peerIdStr =
+ mApp.getConfig().toShortString(request.request.surveyorPeerID);
+ CLOG_TRACE(Overlay, "Responding to Topology request from {}", peerIdStr);
+
+ SurveyResponseBody body;
+ body.type(SURVEY_TOPOLOGY_RESPONSE_V2);
+ if (!mSurveyDataManager.fillSurveyData(request,
+ body.topologyResponseBodyV2()))
{
- CLOG_ERROR(Overlay, "curve25519Encrypt failed: {}", e.what());
+ // This shouldn't happen because nonce and phase should have already
+ // been checked prior to calling this function
+ CLOG_ERROR(Overlay,
+ "Failed to respond to TimeSlicedTopology request from {} "
+ "due to unexpected nonce mismatch or survey phase mismatch",
+ peerIdStr);
return;
}
- auto sigBody = xdr::xdr_to_opaque(response);
+ StellarMessage newMsg;
+ newMsg.type(TIME_SLICED_SURVEY_RESPONSE);
+ auto& signedResponse = newMsg.signedTimeSlicedSurveyResponseMessage();
+
+ auto& outerResponse = signedResponse.response;
+ outerResponse.nonce = request.nonce;
+
+ auto& innerResponse = outerResponse.response;
+ if (!populateSurveyResponseMessage(
+ request.request, TIME_SLICED_SURVEY_TOPOLOGY, body, innerResponse))
+ {
+ return;
+ }
+
+ auto sigBody = xdr::xdr_to_opaque(outerResponse);
signedResponse.responseSignature = mApp.getConfig().NODE_SEED.sign(sigBody);
broadcast(newMsg);
@@ -346,8 +862,26 @@ SurveyManager::processTopologyRequest(SurveyRequestMessage const& request) const
void
SurveyManager::broadcast(StellarMessage const& msg) const
{
+ uint32_t minOverlayVersion = 0;
+ switch (msg.type())
+ {
+ case SURVEY_REQUEST:
+ case SURVEY_RESPONSE:
+ // Do nothing. All nodes on the network can understand these messages.
+ break;
+ case TIME_SLICED_SURVEY_START_COLLECTING:
+ case TIME_SLICED_SURVEY_STOP_COLLECTING:
+ case TIME_SLICED_SURVEY_REQUEST:
+ case TIME_SLICED_SURVEY_RESPONSE:
+ // Only send messages to nodes that can understand them.
+ minOverlayVersion = TIME_SLICED_SURVEY_MIN_OVERLAY_PROTOCOL_VERSION;
+ break;
+ default:
+ releaseAssert(false);
+ }
mApp.getOverlayManager().broadcastMessage(
- std::make_shared(msg));
+ std::make_shared(msg), /*hash*/ std::nullopt,
+ minOverlayVersion);
}
void
@@ -398,36 +932,7 @@ SurveyManager::recordResults(Json::Value& jsonResultList,
{
for (auto const& peer : peerList)
{
- Json::Value peerInfo;
- peerInfo["nodeId"] = KeyUtils::toStrKey(peer.id);
- peerInfo["version"] = peer.versionStr;
- peerInfo["messagesRead"] = static_cast(peer.messagesRead);
- peerInfo["messagesWritten"] =
- static_cast(peer.messagesWritten);
- peerInfo["bytesRead"] = static_cast(peer.bytesRead);
- peerInfo["bytesWritten"] = static_cast(peer.bytesWritten);
- peerInfo["secondsConnected"] =
- static_cast(peer.secondsConnected);
-
- peerInfo["uniqueFloodBytesRecv"] =
- static_cast(peer.uniqueFloodBytesRecv);
- peerInfo["duplicateFloodBytesRecv"] =
- static_cast(peer.duplicateFloodBytesRecv);
- peerInfo["uniqueFetchBytesRecv"] =
- static_cast(peer.uniqueFetchBytesRecv);
- peerInfo["duplicateFetchBytesRecv"] =
- static_cast(peer.duplicateFetchBytesRecv);
-
- peerInfo["uniqueFloodMessageRecv"] =
- static_cast(peer.uniqueFloodMessageRecv);
- peerInfo["duplicateFloodMessageRecv"] =
- static_cast(peer.duplicateFloodMessageRecv);
- peerInfo["uniqueFetchMessageRecv"] =
- static_cast(peer.uniqueFetchMessageRecv);
- peerInfo["duplicateFetchMessageRecv"] =
- static_cast(peer.duplicateFetchMessageRecv);
-
- jsonResultList.append(peerInfo);
+ jsonResultList.append(peerStatsToJson(peer));
}
}
@@ -440,7 +945,7 @@ SurveyManager::clearOldLedgers(uint32_t lastClosedledgerSeq)
Json::Value const&
SurveyManager::getJsonResults()
{
- mResults["surveyInProgress"] = mRunningSurveyType.has_value();
+ mResults["surveyInProgress"] = mRunningSurveyReportingPhaseType.has_value();
auto& jsonBacklog = mResults["backlog"];
jsonBacklog.clear();
@@ -476,6 +981,20 @@ SurveyManager::getMsgSummary(StellarMessage const& msg)
summary = "SURVEY_RESPONSE:";
commandType = msg.signedSurveyResponseMessage().response.commandType;
break;
+ case TIME_SLICED_SURVEY_REQUEST:
+ summary = "TIME_SLICED_SURVEY_REQUEST:";
+ commandType = msg.signedTimeSlicedSurveyRequestMessage()
+ .request.request.commandType;
+ break;
+ case TIME_SLICED_SURVEY_RESPONSE:
+ summary = "TIME_SLICED_SURVEY_RESPONSE:";
+ commandType = msg.signedTimeSlicedSurveyResponseMessage()
+ .response.response.commandType;
+ break;
+ case TIME_SLICED_SURVEY_START_COLLECTING:
+ return "TIME_SLICED_SURVEY_START_COLLECTING";
+ case TIME_SLICED_SURVEY_STOP_COLLECTING:
+ return "TIME_SLICED_SURVEY_STOP_COLLECTING";
default:
throw std::runtime_error(
"invalid call of SurveyManager::getMsgSummary");
@@ -486,10 +1005,9 @@ SurveyManager::getMsgSummary(StellarMessage const& msg)
void
SurveyManager::topOffRequests(SurveyMessageCommandType type)
{
- // Only stop the survey if all pending requests have been processed
- if (mApp.getClock().now() > mSurveyExpirationTime && mPeersToSurvey.empty())
+ if (surveyIsFinishedReporting())
{
- stopSurvey();
+ stopSurveyReporting();
return;
}
@@ -500,7 +1018,7 @@ SurveyManager::topOffRequests(SurveyMessageCommandType type)
// happen if some connections get congested)
uint32_t requestsSentInSchedule = 0;
- while (mRunningSurveyType &&
+ while (mRunningSurveyReportingPhaseType &&
requestsSentInSchedule < MAX_REQUEST_LIMIT_PER_LEDGER &&
!mPeersToSurvey.empty())
{
@@ -534,8 +1052,11 @@ SurveyManager::topOffRequests(SurveyMessageCommandType type)
}
void
-SurveyManager::updateSurveyExpiration(std::chrono::seconds surveyDuration)
+SurveyManager::updateOldStyleSurveyExpiration(
+ std::chrono::seconds surveyDuration)
{
+ // This function should only be called for old style surveys
+ releaseAssert(mRunningSurveyReportingPhaseType.value() == SURVEY_TOPOLOGY);
mSurveyExpirationTime = mApp.getClock().now() + surveyDuration;
}
@@ -549,7 +1070,8 @@ SurveyManager::addPeerToBacklog(NodeID const& nodeToSurvey)
if (mPeersToSurvey.count(nodeToSurvey) != 0 ||
nodeToSurvey == mApp.getConfig().NODE_SEED.getPublicKey())
{
- return;
+ throw std::runtime_error("addPeerToBacklog failed: Peer is already in "
+ "the backlog, or peer is self.");
}
mBadResponseNodes.erase(nodeToSurvey);
@@ -586,4 +1108,82 @@ SurveyManager::commandTypeName(SurveyMessageCommandType type)
{
return xdr::xdr_traits::enum_name(type);
}
+
+bool
+SurveyManager::surveyorPermitted(NodeID const& surveyorID) const
+{
+ auto const& surveyorKeys = mApp.getConfig().SURVEYOR_KEYS;
+
+ if (surveyorKeys.empty())
+ {
+ auto const& quorumMap = mApp.getHerder().getCurrentlyTrackedQuorum();
+ return quorumMap.count(surveyorID) != 0;
+ }
+
+ return surveyorKeys.count(surveyorID) != 0;
+}
+
+void
+SurveyManager::modifyNodeData(std::function f)
+{
+ mSurveyDataManager.modifyNodeData(f);
+}
+
+void
+SurveyManager::modifyPeerData(Peer const& peer,
+ std::function f)
+{
+ mSurveyDataManager.modifyPeerData(peer, f);
+}
+
+void
+SurveyManager::recordDroppedPeer(Peer const& peer)
+{
+ mSurveyDataManager.recordDroppedPeer(peer);
+}
+
+void
+SurveyManager::updateSurveyPhase(
+ std::map const& inboundPeers,
+ std::map const& outboundPeers, Config const& config)
+{
+ mSurveyDataManager.updateSurveyPhase(inboundPeers, outboundPeers, config);
+}
+
+bool
+SurveyManager::surveyIsFinishedReporting()
+{
+ if (!mRunningSurveyReportingPhaseType.has_value())
+ {
+ return true;
+ }
+
+ switch (mRunningSurveyReportingPhaseType.value())
+ {
+ case SURVEY_TOPOLOGY:
+ // Survey is finished if the survey duration has passed and there are no
+ // remaining peers to survey
+ return mApp.getClock().now() > mSurveyExpirationTime &&
+ mPeersToSurvey.empty();
+ case TIME_SLICED_SURVEY_TOPOLOGY:
+ {
+ // Survey is finished when reporting phase ends
+ std::optional maybeNonce = mSurveyDataManager.getNonce();
+ if (!maybeNonce.has_value())
+ {
+ return true;
+ }
+ return !mSurveyDataManager.nonceIsReporting(maybeNonce.value());
+ }
+ }
+}
+
+#ifdef BUILD_TESTS
+SurveyDataManager&
+SurveyManager::getSurveyDataManagerForTesting()
+{
+ return mSurveyDataManager;
+}
+#endif
+
}
diff --git a/src/overlay/SurveyManager.h b/src/overlay/SurveyManager.h
index 1aecd91bba..89a6a53a1d 100644
--- a/src/overlay/SurveyManager.h
+++ b/src/overlay/SurveyManager.h
@@ -5,6 +5,7 @@
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0
#include "overlay/Peer.h"
+#include "overlay/SurveyDataManager.h"
#include "overlay/SurveyMessageLimiter.h"
#include "util/Timer.h"
#include "util/UnorderedSet.h"
@@ -29,13 +30,22 @@ class SurveyManager : public std::enable_shared_from_this,
SurveyManager(Application& app);
- bool startSurvey(SurveyMessageCommandType type,
- std::chrono::seconds surveyDuration);
- void stopSurvey();
-
- void addNodeToRunningSurveyBacklog(SurveyMessageCommandType type,
- std::chrono::seconds surveyDuration,
- NodeID const& nodeToSurvey);
+ // Start/stop survey reporting. Must be called before/after gathering data
+ // during the reporting phase of a survey. `surveyDuration` must be provided
+ // for old style surveys, and must not be provided for time sliced surveys.
+ bool
+ startSurveyReporting(SurveyMessageCommandType type,
+ std::optional surveyDuration);
+ void stopSurveyReporting();
+
+ // Add a node to the backlog of nodes to survey. inboundPeerIndex and
+ // outboundPeerIndex are mandatory for time sliced surveys and indicate
+ // which peers the node should report on
+ void addNodeToRunningSurveyBacklog(
+ SurveyMessageCommandType type,
+ std::optional surveyDuration,
+ NodeID const& nodeToSurvey, std::optional inboundPeerIndex,
+ std::optional outboundPeerIndex);
void relayOrProcessResponse(StellarMessage const& msg, Peer::pointer peer);
void relayOrProcessRequest(StellarMessage const& msg, Peer::pointer peer);
@@ -43,14 +53,64 @@ class SurveyManager : public std::enable_shared_from_this,
Json::Value const& getJsonResults();
static std::string getMsgSummary(StellarMessage const& msg);
- StellarMessage makeSurveyRequest(NodeID const& nodeToSurvey) const;
+
+ StellarMessage makeOldStyleSurveyRequest(NodeID const& nodeToSurvey) const;
+
+ // Start survey collecting with a given nonce. Returns `false` if unable to
+ // start a survey due to an ongoing survey on the network. Otherwise returns
+ // `true`. Note that a `true` result does not guarantee that the survey will
+ // be successful. It is possible that a survey is already ongoing that this
+ // node does not know about.
+ bool broadcastStartSurveyCollecting(uint32_t nonce);
+
+ void relayStartSurveyCollecting(StellarMessage const& msg,
+ Peer::pointer peer);
+
+ // Stop survey collecting. Uses nonce of the currently running survey.
+ // Returns `false` if no survey is currently active.
+ bool broadcastStopSurveyCollecting();
+
+ void relayStopSurveyCollecting(StellarMessage const& msg,
+ Peer::pointer peer);
+
+ // The following functions expose functions by the same name in
+ // `mSurveyDataManager`
+ void modifyNodeData(std::function f);
+ void modifyPeerData(Peer const& peer,
+ std::function f);
+ void recordDroppedPeer(Peer const& peer);
+ void updateSurveyPhase(std::map const& inboundPeers,
+ std::map const& outboundPeers,
+ Config const& config);
+
+#ifdef BUILD_TESTS
+ // Get a reference to the internal `SurveyDataManager` (for testing only)
+ SurveyDataManager& getSurveyDataManagerForTesting();
+#endif
private:
// topology specific methods
void sendTopologyRequest(NodeID const& nodeToSurvey);
- void processTopologyResponse(NodeID const& surveyedPeerID,
- SurveyResponseBody const& body);
- void processTopologyRequest(SurveyRequestMessage const& request) const;
+ void processOldStyleTopologyResponse(NodeID const& surveyedPeerID,
+ SurveyResponseBody const& body);
+ void
+ processOldStyleTopologyRequest(SurveyRequestMessage const& request) const;
+ void processTimeSlicedTopologyResponse(NodeID const& surveyedPeerID,
+ SurveyResponseBody const& body);
+ void processTimeSlicedTopologyRequest(
+ TimeSlicedSurveyRequestMessage const& request);
+
+ // Populate `response` with the data from the other parameters. Returns
+ // `false` on encryption failure.
+ bool populateSurveyResponseMessage(SurveyRequestMessage const& request,
+ SurveyMessageCommandType type,
+ SurveyResponseBody const& body,
+ SurveyResponseMessage& response) const;
+
+ // Populate `request` with the data from the other parameters
+ void populateSurveyRequestMessage(NodeID const& nodeToSurvey,
+ SurveyMessageCommandType type,
+ SurveyRequestMessage& request) const;
void broadcast(StellarMessage const& msg) const;
void populatePeerStats(std::vector const& peers,
@@ -60,8 +120,10 @@ class SurveyManager : public std::enable_shared_from_this,
PeerStatList const& peerList) const;
void topOffRequests(SurveyMessageCommandType type);
- void updateSurveyExpiration(std::chrono::seconds surveyDuration);
+ void updateOldStyleSurveyExpiration(std::chrono::seconds surveyDuration);
+ // Add `nodeToSurvey` to the survey backlog. Throws if the node is
+ // already queued up to survey, or if the node itself is the surveyor.
void addPeerToBacklog(NodeID const& nodeToSurvey);
// returns true if signature is valid
@@ -70,6 +132,24 @@ class SurveyManager : public std::enable_shared_from_this,
static std::string commandTypeName(SurveyMessageCommandType type);
+ // Validate a survey response message. Returns the message if it is valid
+ // and nullopt otherwise.
+ std::optional
+ validateSurveyResponse(StellarMessage const& msg, Peer::pointer peer);
+
+ // Validate a time sliced survey response message. Returns the message if it
+ // is valid and nullopt otherwise.
+ std::optional
+ validateTimeSlicedSurveyResponse(StellarMessage const& msg,
+ Peer::pointer peer);
+
+ // Returns `true` if this node's configuration allows it to be surveyed by
+ // `surveyorID`
+ bool surveyorPermitted(NodeID const& surveyorID) const;
+
+ // Returns `true` if the survey has finished the reporting phase
+ bool surveyIsFinishedReporting();
+
Application& mApp;
std::unique_ptr mSurveyThrottleTimer;
@@ -78,7 +158,9 @@ class SurveyManager : public std::enable_shared_from_this,
uint32_t const NUM_LEDGERS_BEFORE_IGNORE;
uint32_t const MAX_REQUEST_LIMIT_PER_LEDGER;
- std::optional mRunningSurveyType;
+ // If a survey is in the reporting phase, this will be set to the type of
+ // the running survey
+ std::optional mRunningSurveyReportingPhaseType;
Curve25519Secret mCurve25519SecretKey;
Curve25519Public mCurve25519PublicKey;
SurveyMessageLimiter mMessageLimiter;
@@ -86,9 +168,16 @@ class SurveyManager : public std::enable_shared_from_this,
UnorderedSet mPeersToSurvey;
std::queue mPeersToSurveyQueue;
+ // Indices to use when surveying peers for time sliced surveys
+ std::unordered_map mInboundPeerIndices;
+ std::unordered_map mOutboundPeerIndices;
+
std::chrono::seconds const SURVEY_THROTTLE_TIMEOUT_SEC;
UnorderedSet mBadResponseNodes;
Json::Value mResults;
+
+ // Manager for time-sliced survey data
+ SurveyDataManager mSurveyDataManager;
};
}
diff --git a/src/overlay/SurveyMessageLimiter.cpp b/src/overlay/SurveyMessageLimiter.cpp
index 816e1225b5..b44ddc9ea7 100644
--- a/src/overlay/SurveyMessageLimiter.cpp
+++ b/src/overlay/SurveyMessageLimiter.cpp
@@ -5,6 +5,7 @@
#include "SurveyMessageLimiter.h"
#include "herder/Herder.h"
#include "main/Application.h"
+#include "overlay/SurveyDataManager.h"
namespace stellar
{
@@ -139,6 +140,52 @@ SurveyMessageLimiter::recordAndValidateResponse(
return true;
}
+bool
+SurveyMessageLimiter::validateStartSurveyCollecting(
+ TimeSlicedSurveyStartCollectingMessage const& startSurvey,
+ SurveyDataManager& surveyDataManager,
+ std::function onSuccessValidation)
+{
+ if (!surveyLedgerNumValid(startSurvey.ledgerNum))
+ {
+ // Request too old (or otherwise invalid)
+ return false;
+ }
+
+ if (surveyDataManager.surveyIsActive())
+ {
+ // A survey already active, toss. Only one survey may be active at a
+ // time.
+ return false;
+ }
+
+ if (!onSuccessValidation())
+ {
+ return false;
+ }
+
+ return true;
+}
+
+bool
+SurveyMessageLimiter::validateStopSurveyCollecting(
+ TimeSlicedSurveyStopCollectingMessage const& stopSurvey,
+ std::function onSuccessValidation)
+{
+ if (!surveyLedgerNumValid(stopSurvey.ledgerNum))
+ {
+ // Request too old (or otherwise invalid)
+ return false;
+ }
+
+ if (!onSuccessValidation())
+ {
+ return false;
+ }
+
+ return true;
+}
+
bool
SurveyMessageLimiter::surveyLedgerNumValid(uint32_t ledgerNum)
{
diff --git a/src/overlay/SurveyMessageLimiter.h b/src/overlay/SurveyMessageLimiter.h
index 7ddc350a1b..4704c4b46d 100644
--- a/src/overlay/SurveyMessageLimiter.h
+++ b/src/overlay/SurveyMessageLimiter.h
@@ -5,6 +5,7 @@
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0
#include "overlay/StellarXDR.h" // IWYU pragma: keep
+#include "overlay/SurveyDataManager.h"
#include "util/UnorderedMap.h"
#include
#include