Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce peer message traffic for ledger data #5126

Open
wants to merge 38 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
f0cf1fd
Log the caller / reason for server state changes:
ximinez Sep 2, 2024
756cad9
Class "CanProcess" to keep track of processing of distinct items
ximinez Aug 29, 2024
8a17f16
Drop duplicate outgoing TMGetLedger messages per peer:
ximinez Aug 13, 2024
226cb56
Drop duplicate incoming TMGetLedger messages per peer:
ximinez Aug 14, 2024
d5ec2d3
Drop duplicate incoming TMLedgerData messages:
vlntb Aug 14, 2024
ecfa396
Collapse multiple outgoing TMLedgerData messages with cookies into one
ximinez Aug 21, 2024
e490e57
Improve logging related to ledger acquisition
ximinez Sep 5, 2024
30eee9b
Review feedback from @Bronek:
ximinez Sep 25, 2024
978fecd
Merge remote-tracking branch 'upstream/develop' into pr/getledger
ximinez Sep 30, 2024
b8b7b31
[FOLD] Review feedback from @vlntb:
ximinez Oct 11, 2024
43b6e3e
[FOLD] Fix typo in unit test:
ximinez Oct 14, 2024
813ef05
Merge remote-tracking branch 'upstream/develop' into pr/getledger
ximinez Oct 15, 2024
89f5a67
Merge remote-tracking branch 'upstream/develop' into pr/getledger
ximinez Oct 18, 2024
d7e2d70
Merge remote-tracking branch 'upstream/develop' into pr/getledger
ximinez Oct 31, 2024
29de22e
Merge remote-tracking branch 'upstream/develop' into pr/getledger
ximinez Nov 4, 2024
e250086
Merge remote-tracking branch 'upstream/develop' into pr/getledger
ximinez Nov 5, 2024
da4a30c
Merge remote-tracking branch 'upstream/develop' into pr/getledger
ximinez Nov 8, 2024
eee8184
Merge remote-tracking branch 'upstream/develop' into pr/getledger
ximinez Nov 13, 2024
85dcf70
Merge remote-tracking branch 'upstream/develop' into pr/getledger
ximinez Nov 13, 2024
e23be8e
Merge remote-tracking branch 'upstream/develop' into pr/getledger
ximinez Nov 27, 2024
75fcca5
Merge remote-tracking branch 'upstream/develop' into pr/getledger
ximinez Dec 3, 2024
20ad383
Merge remote-tracking branch 'upstream/develop' into pr/getledger
ximinez Dec 5, 2024
f372585
Merge remote-tracking branch 'upstream/develop' into pr/getledger
ximinez Dec 16, 2024
d670c79
Merge remote-tracking branch 'upstream/develop' into pr/getledger
ximinez Dec 20, 2024
35acb44
Merge remote-tracking branch 'upstream/develop' into pr/getledger
ximinez Jan 7, 2025
1dcc061
Apply suggestions from code review
ximinez Jan 7, 2025
6d3dc8e
Review feedback from @Bronek:
ximinez Jan 8, 2025
cb6daa2
Fix: Work around compiler bug:
ximinez Jan 8, 2025
7894d8f
Merge remote-tracking branch 'upstream/develop' into pr/getledger
ximinez Jan 9, 2025
9293950
Merge remote-tracking branch 'upstream/develop' into pr/getledger
ximinez Jan 24, 2025
2090083
Merge remote-tracking branch 'upstream/develop' into pr/getledger
ximinez Jan 28, 2025
248b34f
Merge remote-tracking branch 'upstream/develop' into pr/getledger
ximinez Jan 30, 2025
79f9bb8
Merge remote-tracking branch 'upstream/develop' into pr/getledger
ximinez Feb 4, 2025
fd94b55
Merge remote-tracking branch 'upstream/develop' into pr/getledger
ximinez Feb 6, 2025
7e52abd
Merge remote-tracking branch 'upstream/develop' into pr/getledger
ximinez Feb 7, 2025
829e4f1
Merge remote-tracking branch 'upstream/develop' into pr/getledger
ximinez Feb 8, 2025
ca07176
Merge remote-tracking branch 'upstream/develop' into pr/getledger
ximinez Feb 11, 2025
39fe006
Merge remote-tracking branch 'upstream/develop' into pr/getledger
ximinez Feb 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Builds/levelization/results/loops.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Loop: xrpld.app xrpld.net
xrpld.app > xrpld.net

Loop: xrpld.app xrpld.overlay
xrpld.overlay == xrpld.app
xrpld.overlay ~= xrpld.app

Loop: xrpld.app xrpld.peerfinder
xrpld.app > xrpld.peerfinder
Expand Down
106 changes: 106 additions & 0 deletions include/xrpl/basics/CanProcess.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2024 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================

#ifndef RIPPLE_BASICS_CANPROCESS_H_INCLUDED
#define RIPPLE_BASICS_CANPROCESS_H_INCLUDED

ximinez marked this conversation as resolved.
Show resolved Hide resolved
/** RAII class to check if an Item is already being processed on another thread,
* as indicated by it's presence in a Collection.
*
* If the Item is not in the Collection, it will be added under lock in the
* ctor, and removed under lock in the dtor. The object will be considered
* "usable" and evaluate to `true`.
*
* If the Item is in the Collection, no changes will be made to the collection,
* and the CanProcess object will be considered "unusable".
*
* It's up to the caller to decide what "usable" and "unusable" mean. (e.g.
* Process or skip a block of code, or set a flag.)
*
* The current use is to avoid lock contention that would be involved in
* processing something associated with the Item.
*
* Examples:
*
* void IncomingLedgers::acquireAsync(LedgerHash const& hash, ...)
* {
* if (CanProcess check{acquiresMutex_, pendingAcquires_, hash})
* {
* acquire(hash, ...);
* }
* }
*
* bool
* NetworkOPsImp::recvValidation(
* std::shared_ptr<STValidation> const& val,
* std::string const& source)
* {
* CanProcess check(
* validationsMutex_, pendingValidations_, val->getLedgerHash());
* BypassAccept bypassAccept =
* check.canProcess() ? BypassAccept::no : BypassAccept::yes;
* handleNewValidation(app_, val, source, bypassAccept, m_journal);
* }
*
*/
template <class Mutex, class Collection, class Item>
class CanProcess
{
public:
CanProcess(Mutex& mtx, Collection& collection, Item const& item)
: mtx_(mtx), collection_(collection), item_(item), canProcess_(insert())
{
}

~CanProcess()
{
if (canProcess_)
{
std::unique_lock<Mutex> lock_(mtx_);
collection_.erase(item_);
Copy link
Collaborator

@Bronek Bronek Dec 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about replacing the (likely) O(log N) element lookup with O(1) erase of an iterator ? The iterator which is returned from insert to be specific, and currently ignored. In this case the Item wouldn't have to be stored inside CanProcess object, so that's also one less template parameter and probably also smaller object size (hashes are larger than iterators I guess).

Could even go one step further and replace all data members with std::function<void()> cleanup_ which would capture all that it needs if insert succeeded, or is empty if insert failed. In this case no template parameters would be needed at all.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about replacing the (likely) O(log N) element lookup with O(1) erase of an iterator ? The iterator which is returned from insert to be specific, and currently ignored. In this case the Item wouldn't have to be stored inside CanProcess object, so that's also one less template parameter and probably also smaller object size (hashes are larger than iterators I guess).

I didn't really consider using an iterator because I didn't want to consider the risks of it getting invalidated for any valid Collection type. For example, it looks like unordered_map would fit the template, but can invalidate iterators. One option is to force the collection to be std::set, which the current callers use.

Could even go one step further and replace all data members with std::function<void()> cleanup_ which would capture all that it needs if insert succeeded, or is empty if insert failed. In this case no template parameters would be needed at all.

So only the ctor and insert would have template parameters? That could work.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was incorrect in that last comment. unordered_map doesn't fit the template, but unordered_set does, and it also says it can potentially invalidate operators. So, I wrote a generic insert() that doesn't use iterators, and a specialized one that does, and wrote tests for both. It may be overkill....

}
}

bool
canProcess() const

Check warning on line 81 in include/xrpl/basics/CanProcess.h

View check run for this annotation

Codecov / codecov/patch

include/xrpl/basics/CanProcess.h#L81

Added line #L81 was not covered by tests
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given that it is not covered by tests, perhaps we do not need this function ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given that it is not covered by tests, perhaps we do not need this function ?

It's used in the call path for network traffic, most of which is not covered by tests. But I changed that call to use the boolean conversion, and everything seems fine.

{
return canProcess_;

Check warning on line 83 in include/xrpl/basics/CanProcess.h

View check run for this annotation

Codecov / codecov/patch

include/xrpl/basics/CanProcess.h#L83

Added line #L83 was not covered by tests
}

operator bool() const
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use explicit here; it would be consistent with std::optional and most other operator bool inside the project.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use explicit here; it would be consistent with std::optional and most other operator bool inside the project.

Fixed

{
return canProcess_;
}

private:
bool
insert()
{
std::unique_lock<Mutex> lock_(mtx_);
auto const [_, inserted] = collection_.insert(item_);
return inserted;
}

Mutex& mtx_;
Collection& collection_;
Item const item_;
bool const canProcess_;
};

#endif
7 changes: 7 additions & 0 deletions include/xrpl/basics/base_uint.h
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,13 @@ to_string(base_uint<Bits, Tag> const& a)
return strHex(a.cbegin(), a.cend());
}

template <std::size_t Bits, class Tag>
inline std::string
to_short_string(base_uint<Bits, Tag> const& a)
Bronek marked this conversation as resolved.
Show resolved Hide resolved
{
return strHex(a.cbegin(), a.cend()).substr(0, 8) + "...";
}

template <std::size_t Bits, class Tag>
inline std::ostream&
operator<<(std::ostream& out, base_uint<Bits, Tag> const& u)
Expand Down
10 changes: 10 additions & 0 deletions include/xrpl/proto/ripple.proto
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,18 @@ message TMLedgerData
required uint32 ledgerSeq = 2;
required TMLedgerInfoType type = 3;
repeated TMLedgerNode nodes = 4;
// If the peer supports "responseCookies", this field will
// never be populated.
optional uint32 requestCookie = 5;
optional TMReplyError error = 6;
// The old field is called "requestCookie", but this is
// a response, so this name makes more sense
repeated uint32 responseCookies = 7;
// If a TMGetLedger request was received without a "requestCookie",
// and the peer supports it, this flag will be set to true to
// indicate that the receiver should process the result in addition
// to forwarding it to its "responseCookies" peers.
optional bool directResponse = 8;
}

message TMPing
Expand Down
2 changes: 2 additions & 0 deletions include/xrpl/protocol/LedgerHeader.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ struct LedgerHeader

// If validated is false, it means "not yet validated."
// Once validated is true, it will never be set false at a later time.
// NOTE: If you are accessing this directly, you are probably doing it
// wrong. Use LedgerMaster::isValidated().
// VFALCO TODO Make this not mutable
bool mutable validated = false;
bool accepted = false;
Expand Down
28 changes: 28 additions & 0 deletions src/test/app/HashRouter_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,33 @@ class HashRouter_test : public beast::unit_test::suite
BEAST_EXPECT(router.shouldProcess(key, peer, flags, 1s));
}

void
testProcessPeer()
{
using namespace std::chrono_literals;
TestStopwatch stopwatch;
HashRouter router(stopwatch, 5s);
uint256 const key(1);
HashRouter::PeerShortID peer1 = 1;
HashRouter::PeerShortID peer2 = 2;
auto const timeout = 2s;

BEAST_EXPECT(router.shouldProcessForPeer(key, peer1, timeout));
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer1, timeout));
++stopwatch;
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer1, timeout));
BEAST_EXPECT(router.shouldProcessForPeer(key, peer2, timeout));
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer2, timeout));
++stopwatch;
BEAST_EXPECT(router.shouldProcessForPeer(key, peer1, timeout));
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer2, timeout));
++stopwatch;
BEAST_EXPECT(router.shouldProcessForPeer(key, peer2, timeout));
++stopwatch;
BEAST_EXPECT(router.shouldProcessForPeer(key, peer1, timeout));
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer2, timeout));
}

public:
void
run() override
Expand All @@ -252,6 +279,7 @@ class HashRouter_test : public beast::unit_test::suite
testSetFlags();
testRelay();
testProcess();
testProcessPeer();
}
};

Expand Down
5 changes: 5 additions & 0 deletions src/test/app/LedgerReplay_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,11 @@ class TestPeer : public Peer
{
return false;
}
std::set<std::optional<uint64_t>>
releaseRequestCookies(uint256 const& requestHash) override
{
return {};
}

bool ledgerReplayEnabled_;
PublicKey nodePublicKey_;
Expand Down
5 changes: 5 additions & 0 deletions src/test/basics/base_uint_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ struct base_uint_test : beast::unit_test::suite
uset.insert(u);
BEAST_EXPECT(raw.size() == u.size());
BEAST_EXPECT(to_string(u) == "0102030405060708090A0B0C");
BEAST_EXPECT(to_short_string(u) == "01020304...");
BEAST_EXPECT(*u.data() == 1);
BEAST_EXPECT(u.signum() == 1);
BEAST_EXPECT(!!u);
Expand All @@ -173,6 +174,7 @@ struct base_uint_test : beast::unit_test::suite
test96 v{~u};
uset.insert(v);
BEAST_EXPECT(to_string(v) == "FEFDFCFBFAF9F8F7F6F5F4F3");
BEAST_EXPECT(to_short_string(v) == "FEFDFCFB...");
BEAST_EXPECT(*v.data() == 0xfe);
BEAST_EXPECT(v.signum() == 1);
BEAST_EXPECT(!!v);
Expand All @@ -193,6 +195,7 @@ struct base_uint_test : beast::unit_test::suite
test96 z{beast::zero};
uset.insert(z);
BEAST_EXPECT(to_string(z) == "000000000000000000000000");
BEAST_EXPECT(to_short_string(z) == "00000000...");
BEAST_EXPECT(*z.data() == 0);
BEAST_EXPECT(*z.begin() == 0);
BEAST_EXPECT(*std::prev(z.end(), 1) == 0);
Expand All @@ -213,6 +216,7 @@ struct base_uint_test : beast::unit_test::suite
BEAST_EXPECT(n == z);
n--;
BEAST_EXPECT(to_string(n) == "FFFFFFFFFFFFFFFFFFFFFFFF");
BEAST_EXPECT(to_short_string(n) == "FFFFFFFF...");
n = beast::zero;
BEAST_EXPECT(n == z);

Expand All @@ -223,6 +227,7 @@ struct base_uint_test : beast::unit_test::suite
test96 x{zm1 ^ zp1};
uset.insert(x);
BEAST_EXPECTS(to_string(x) == "FFFFFFFFFFFFFFFFFFFFFFFE", to_string(x));
BEAST_EXPECTS(to_short_string(x) == "FFFFFFFF...", to_short_string(x));

BEAST_EXPECT(uset.size() == 4);

Expand Down
4 changes: 2 additions & 2 deletions src/test/overlay/ProtocolVersion_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ class ProtocolVersion_test : public beast::unit_test::suite
negotiateProtocolVersion("XRPL/2.2") == make_protocol(2, 2));
BEAST_EXPECT(
negotiateProtocolVersion(
"RTXP/1.2, XRPL/2.2, XRPL/2.3, XRPL/999.999") ==
make_protocol(2, 2));
"RTXP/1.2, XRPL/2.2, XRPL/2.3, XRPL/2.4, XRPL/999.999") ==
make_protocol(2, 3));
BEAST_EXPECT(
negotiateProtocolVersion("XRPL/999.999, WebSocket/1.0") ==
std::nullopt);
Expand Down
5 changes: 5 additions & 0 deletions src/test/overlay/reduce_relay_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ class PeerPartial : public Peer
removeTxQueue(const uint256&) override
{
}
std::set<std::optional<uint64_t>>
releaseRequestCookies(uint256 const& requestHash) override
{
return {};
}
};

/** Manually advanced clock. */
Expand Down
3 changes: 2 additions & 1 deletion src/xrpld/app/consensus/RCLConsensus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1061,7 +1061,8 @@
RCLConsensus::Adaptor::updateOperatingMode(std::size_t const positions) const
{
if (!positions && app_.getOPs().isFull())
app_.getOPs().setMode(OperatingMode::CONNECTED);
app_.getOPs().setMode(

Check warning on line 1064 in src/xrpld/app/consensus/RCLConsensus.cpp

View check run for this annotation

Codecov / codecov/patch

src/xrpld/app/consensus/RCLConsensus.cpp#L1064

Added line #L1064 was not covered by tests
OperatingMode::CONNECTED, "updateOperatingMode: no positions");
}

void
Expand Down
18 changes: 18 additions & 0 deletions src/xrpld/app/ledger/InboundLedger.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,24 @@
std::unique_ptr<PeerSet> mPeerSet;
};

inline std::string
to_string(InboundLedger::Reason reason)
{
using enum InboundLedger::Reason;
switch (reason)
{
case HISTORY:

Check warning on line 205 in src/xrpld/app/ledger/InboundLedger.h

View check run for this annotation

Codecov / codecov/patch

src/xrpld/app/ledger/InboundLedger.h#L205

Added line #L205 was not covered by tests
return "HISTORY";
case GENERIC:
return "GENERIC";
case CONSENSUS:
return "CONSENSUS";
default:
assert(false);

Check warning on line 212 in src/xrpld/app/ledger/InboundLedger.h

View check run for this annotation

Codecov / codecov/patch

src/xrpld/app/ledger/InboundLedger.h#L211-L212

Added lines #L211 - L212 were not covered by tests
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please replace with UNREACHABLE and a nice & short description why (see CONTRIBUTING.md , section " Contracts and instrumentation" for naming guideline)

return "unknown";
}
}

} // namespace ripple

#endif
21 changes: 15 additions & 6 deletions src/xrpld/app/ledger/detail/InboundLedger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,14 @@

if (!wasProgress)
{
checkLocal();
if (checkLocal())
{
// Done. Something else (probably consensus) built the ledger
// locally while waiting for data (or possibly before requesting)
assert(isDone());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please replace with XRPL_ASSERT and a short description why; see CONTRIBUTING.md for contracts and instrumentation naming guidelines

JLOG(journal_.info()) << "Finished while waiting " << hash_;
return;

Check warning on line 399 in src/xrpld/app/ledger/detail/InboundLedger.cpp

View check run for this annotation

Codecov / codecov/patch

src/xrpld/app/ledger/detail/InboundLedger.cpp#L399

Added line #L399 was not covered by tests
}

mByHash = true;

Expand Down Expand Up @@ -497,15 +504,17 @@

if (auto stream = journal_.debug())
{
stream << "Trigger acquiring ledger " << hash_;
std::stringstream ss;
ss << "Trigger acquiring ledger " << hash_;
if (peer)
stream << " from " << peer;
ss << " from " << peer;

if (complete_ || failed_)
stream << "complete=" << complete_ << " failed=" << failed_;
ss << " complete=" << complete_ << " failed=" << failed_;
else
stream << "header=" << mHaveHeader << " tx=" << mHaveTransactions
<< " as=" << mHaveState;
ss << " header=" << mHaveHeader << " tx=" << mHaveTransactions
<< " as=" << mHaveState;
stream << ss.str();
}

if (!mHaveHeader)
Expand Down
Loading
Loading