Skip to content

Commit

Permalink
Merge #6231: backport: merge bitcoin#19572, bitcoin#20953, bitcoin#20523
Browse files Browse the repository at this point in the history
, bitcoin#21008, bitcoin#21310, bitcoin#22079, bitcoin#23471, bitcoin#24218 (zmq backports)

b75e83b merge bitcoin#24218: Fix implicit-integer-sign-change (Kittywhiskers Van Gogh)
8ecc22f merge bitcoin#23471: Improve ZMQ documentation (Kittywhiskers Van Gogh)
2965093 merge bitcoin#22079: Add support to listen on IPv6 addresses (Kittywhiskers Van Gogh)
3ac3714 merge bitcoin#21310: fix sync-up by matching notification to generated block (Kittywhiskers Van Gogh)
7b0c725 merge bitcoin#21008: fix zmq test flakiness, improve speed (Kittywhiskers Van Gogh)
5e87efd merge bitcoin#20523: deduplicate 'sequence' publisher message creation/sending (Kittywhiskers Van Gogh)
99c730f merge bitcoin#20953: dedup zmq test setup code (node restart, topics subscription) (Kittywhiskers Van Gogh)
982c1f0 merge bitcoin#19572: Create "sequence" notifier, enabling client-side mempool tracking (Kittywhiskers Van Gogh)
b0b4e0f zmq: Make `g_zmq_notification_interface` a smart pointer (Kittywhiskers Van Gogh)
0a1ffd3 zmq: extend appending address to log msg for Dash-specific notifications (Kittywhiskers Van Gogh)

Pull request description:

  ## Additional Information

  * [bitcoin#19572](bitcoin#19572) introduces tests in `interface_zmq.py` that validate newly introduced "sequence" reporting of all message types (`C`, `D`, `R` and `A`). The `R` message type (removed from mempool) is tested by leveraging the RBF mechanism, which isn't present in Dash.

    In order to allow the tests to successfully pass, all fee bumping and RBF-specific code had to be removed from the tests. This test also involves creating a new block to test the `C` message (connected block) that would return a `time-too-new` error and fail unless mocktime has been disabled (which has been done in this test).

  * When backporting [bitcoin#18309](bitcoin#18309) ([dash#5728](#5728)), Dash-specific ZMQ notifications did not have those changes applied to them and that particular backport was merged upstream *after* [bitcoin#19572](bitcoin#19572), meaning, for completion, the changes from [bitcoin#18309](bitcoin#18309) have been integrated into [bitcoin#19572](bitcoin#19572 backport.

    As for the Dash-specific notifications, they have been covered to ensure uniformity in a separate commit.

  * The ZMQ notification interface has been converted to a smart pointer in the interest of safety (this change is eventually done upstream in 8ed4ff8 ([bitcoin#27125](bitcoin#27125)))

  ## Breaking Changes

  None expected.

  ## Checklist:

  - [x] I have performed a self-review of my own code
  - [x] I have commented my code, particularly in hard-to-understand areas **(note: N/A)**
  - [x] I have added or updated relevant unit/integration/functional/e2e tests
  - [x] I have made corresponding changes to the documentation
  - [x] I have assigned this pull request to a milestone _(for repository code-owners and collaborators only)_

ACKs for top commit:
  UdjinM6:
    utACK b75e83b
  PastaPastaPasta:
    utACK b75e83b
  knst:
    utACK b75e83b

Tree-SHA512: 9f860d1203bebe0914a5102f101f646873d14754830d651fb91ed0d1285a6c1a58ffc492b07d4768324d94f53171c9a4da974cf4a0b1e5c665979eace289f6f0
  • Loading branch information
PastaPastaPasta committed Aug 31, 2024
2 parents 630254e + b75e83b commit 7a9e475
Show file tree
Hide file tree
Showing 26 changed files with 717 additions and 152 deletions.
21 changes: 12 additions & 9 deletions contrib/zmq/zmq_sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
-zmqpubrawtx=tcp://127.0.0.1:28332 \
-zmqpubrawblock=tcp://127.0.0.1:28332 \
-zmqpubhashtx=tcp://127.0.0.1:28332 \
-zmqpubhashblock=tcp://127.0.0.1:28332
-zmqpubhashblock=tcp://127.0.0.1:28332 \
-zmqpubsequence=tcp://127.0.0.1:28332
We use the asyncio library here. `self.handle()` installs itself as a
future at the end of the function. Since it never returns with the event
Expand Down Expand Up @@ -58,18 +59,14 @@ def __init__(self):
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawgovernancevote")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawgovernanceobject")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawinstantsenddoublespend")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "sequence")
self.zmqSubSocket.connect("tcp://127.0.0.1:%i" % port)

async def handle(self) :
msg = await self.zmqSubSocket.recv_multipart()
topic = msg[0]
body = msg[1]
topic, body, seq = await self.zmqSubSocket.recv_multipart()
sequence = "Unknown"

if len(msg[-1]) == 4:
msgSequence = struct.unpack('<I', msg[-1])[-1]
sequence = str(msgSequence)

if len(seq) == 4:
sequence = str(struct.unpack('<I', seq)[-1])
if topic == b"hashblock":
print('- HASH BLOCK ('+sequence+') -')
print(body.hex())
Expand Down Expand Up @@ -118,6 +115,12 @@ async def handle(self) :
elif topic == b"rawinstantsenddoublespend":
print('- RAW IS DOUBLE SPEND ('+sequence+') -')
print(body.hex())
elif topic == b"sequence":
hash = body[:32].hex()
label = chr(body[32])
mempool_sequence = None if len(body) != 32+1+8 else struct.unpack("<Q", body[32+1:])[0]
print('- SEQUENCE ('+sequence+') -')
print(hash, label, mempool_sequence)
# schedule ourselves to receive the next message
asyncio.ensure_future(self.handle())

Expand Down
56 changes: 48 additions & 8 deletions doc/zmq.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ Currently, the following notifications are supported:
-zmqpubrawgovernanceobject=address
-zmqpubrawinstantsenddoublespend=address
-zmqpubrawrecoveredsig=address
-zmqpubsequence=address

The socket type is PUB and the address must be a valid ZeroMQ socket
address. The same address can be used in more than one notification.
Expand All @@ -103,23 +104,52 @@ The option to set the PUB socket's outbound message high water mark
-zmqpubrawgovernanceobjecthwm=n
-zmqpubrawinstantsenddoublespendhwm=n
-zmqpubrawrecoveredsighwm=n
-zmqpubsequencehwm=address

The high water mark value must be an integer greater than or equal to 0.

For instance:

$ dashd -zmqpubhashtx=tcp://127.0.0.1:28332 \
-zmqpubhashtx=tcp://192.168.1.2:28332 \
-zmqpubhashblock="tcp://[::1]:28333" \
-zmqpubrawtx=ipc:///tmp/dashd.tx.raw \
-zmqpubhashtxhwm=10000

Each PUB notification has a topic and body, where the header
corresponds to the notification type. For instance, for the
notification `-zmqpubhashtx` the topic is `hashtx` (no null
terminator) and the body is the transaction hash (32
bytes).
terminator). These options can also be provided in dash.conf.

These options can also be provided in dash.conf.
The topics are:

`sequence`: the body is structured as the following based on the type of message:

<32-byte hash>C : Blockhash connected
<32-byte hash>D : Blockhash disconnected
<32-byte hash>R<8-byte LE uint> : Transactionhash removed from mempool for non-block inclusion reason
<32-byte hash>A<8-byte LE uint> : Transactionhash added mempool

Where the 8-byte uints correspond to the mempool sequence number.

`rawtx`: Notifies about all transactions, both when they are added to mempool or when a new block arrives. This means a transaction could be published multiple times. First, when it enters the mempool and then again in each block that includes it. The messages are ZMQ multipart messages with three parts. The first part is the topic (`rawtx`), the second part is the serialized transaction, and the last part is a sequence number (representing the message count to detect lost messages).

| rawtx | <serialized transaction> | <uint32 sequence number in Little Endian>

`hashtx`: Notifies about all transactions, both when they are added to mempool or when a new block arrives. This means a transaction could be published multiple times. First, when it enters the mempool and then again in each block that includes it. The messages are ZMQ multipart messages with three parts. The first part is the topic (`hashtx`), the second part is the 32-byte transaction hash, and the last part is a sequence number (representing the message count to detect lost messages).

| hashtx | <32-byte transaction hash in Little Endian> | <uint32 sequence number in Little Endian>


`rawblock`: Notifies when the chain tip is updated. Messages are ZMQ multipart messages with three parts. The first part is the topic (`rawblock`), the second part is the serialized block, and the last part is a sequence number (representing the message count to detect lost messages).

| rawblock | <serialized block> | <uint32 sequence number in Little Endian>

`hashblock`: Notifies when the chain tip is updated. Messages are ZMQ multipart messages with three parts. The first part is the topic (`hashblock`), the second part is the 32-byte block hash, and the last part is a sequence number (representing the message count to detect lost messages).

| hashblock | <32-byte block hash in Little Endian> | <uint32 sequence number in Little Endian>

**_NOTE:_** Note that the 32-byte hashes are in Little Endian and not in the Big Endian format that the RPC interface and block explorers use to display transaction and block hashes.

ZeroMQ endpoint specifiers for TCP (and others) are documented in the
[ZeroMQ API](http://api.zeromq.org/4-0:_start).
Expand All @@ -143,6 +173,9 @@ Setting the keepalive values appropriately for your operating environment may
improve connectivity in situations where long-lived connections are silently
dropped by network middle boxes.

Also, the socket's ZMQ_IPV6 option is enabled to accept connections from IPv6
hosts as well. If needed, this option has to be set on the client side too.

## Remarks

From the perspective of dashd, the ZeroMQ socket is write-only; PUB
Expand All @@ -154,13 +187,20 @@ No authentication or authorization is done on connecting clients; it
is assumed that the ZeroMQ port is exposed only to trusted entities,
using other means such as firewalling.

Note that when the block chain tip changes, a reorganisation may occur
and just the tip will be notified. It is up to the subscriber to
retrieve the chain from the last known block to the new tip. Also note
that no notification occurs if the tip was in the active chain - this
is the case after calling invalidateblock RPC.
Note that for `*block` topics, when the block chain tip changes,
a reorganisation may occur and just the tip will be notified.
It is up to the subscriber to retrieve the chain from the last known
block to the new tip. Also note that no notification will occur if the tip
was in the active chain--as would be the case after calling invalidateblock RPC.
In contrast, the `sequence` topic publishes all block connections and
disconnections.

There are several possibilities that ZMQ notification can get lost
during transmission depending on the communication type you are
using. Dashd appends an up-counting sequence number to each
notification which allows listeners to detect lost notifications.

The `sequence` topic refers specifically to the mempool sequence
number, which is also published along with all mempool events. This
is a different sequence value than in ZMQ itself in order to allow a total
ordering of mempool events to be constructed.
6 changes: 4 additions & 2 deletions src/dsnotificationinterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ void CDSNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, con
}
}

void CDSNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx, int64_t nAcceptTime)
void CDSNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx, int64_t nAcceptTime,
uint64_t mempool_sequence)
{
assert(m_cj_ctx && m_llmq_ctx);

Expand All @@ -111,7 +112,8 @@ void CDSNotificationInterface::TransactionAddedToMempool(const CTransactionRef&
m_cj_ctx->dstxman->TransactionAddedToMempool(ptx);
}

void CDSNotificationInterface::TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason)
void CDSNotificationInterface::TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason,
uint64_t mempool_sequence)
{
assert(m_llmq_ctx);

Expand Down
5 changes: 3 additions & 2 deletions src/dsnotificationinterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ class CDSNotificationInterface : public CValidationInterface
void NotifyHeaderTip(const CBlockIndex *pindexNew, bool fInitialDownload) override;
void SynchronousUpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override;
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override;
void TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) override;
void TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason) override;
void TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) override;
void TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason,
uint64_t mempool_sequence) override;
void BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindex) override;
void BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected) override;
void NotifyMasternodeListChanged(bool undo, const CDeterministicMNList& oldMNList, const CDeterministicMNListDiff& diff) override;
Expand Down
11 changes: 7 additions & 4 deletions src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,8 @@ void PrepareShutdown(NodeContext& node)

#if ENABLE_ZMQ
if (g_zmq_notification_interface) {
UnregisterValidationInterface(g_zmq_notification_interface);
delete g_zmq_notification_interface;
g_zmq_notification_interface = nullptr;
UnregisterValidationInterface(g_zmq_notification_interface.get());
g_zmq_notification_interface.reset();
}
#endif

Expand Down Expand Up @@ -646,6 +645,7 @@ void SetupServerArgs(NodeContext& node)
argsman.AddArg("-zmqpubrawtx=<address>", "Enable publish raw transaction in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawtxlock=<address>", "Enable publish raw transaction (locked via InstantSend) in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawtxlocksig=<address>", "Enable publish raw transaction (locked via InstantSend) and ISLOCK in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubsequence=<address>", "Enable publish hash block and tx sequence in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubhashblockhwm=<n>", strprintf("Set publish hash block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubhashchainlockhwm=<n>", strprintf("Set publish hash chain lock outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubhashgovernanceobjecthwm=<n>", strprintf("Set publish hash governance object outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
Expand All @@ -664,6 +664,7 @@ void SetupServerArgs(NodeContext& node)
argsman.AddArg("-zmqpubrawtxhwm=<n>", strprintf("Set publish raw transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawtxlockhwm=<n>", strprintf("Set publish raw transaction lock outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawtxlocksighwm=<n>", strprintf("Set publish raw transaction lock signature outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubsequencehwm=<n>", strprintf("Set publish hash sequence message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
#else
hidden_args.emplace_back("-zmqpubhashblock=<address>");
hidden_args.emplace_back("-zmqpubhashchainlock=<address>");
Expand All @@ -683,6 +684,7 @@ void SetupServerArgs(NodeContext& node)
hidden_args.emplace_back("-zmqpubrawtx=<address>");
hidden_args.emplace_back("-zmqpubrawtxlock=<address>");
hidden_args.emplace_back("-zmqpubrawtxlocksig=<address>");
hidden_args.emplace_back("-zmqpubsequence=<n>");
hidden_args.emplace_back("-zmqpubhashblockhwm=<n>");
hidden_args.emplace_back("-zmqpubhashchainlockhwm=<n>");
hidden_args.emplace_back("-zmqpubhashgovernanceobjecthwm=<n>");
Expand All @@ -701,6 +703,7 @@ void SetupServerArgs(NodeContext& node)
hidden_args.emplace_back("-zmqpubrawtxhwm=<n>");
hidden_args.emplace_back("-zmqpubrawtxlockhwm=<n>");
hidden_args.emplace_back("-zmqpubrawtxlocksighwm=<n>");
hidden_args.emplace_back("-zmqpubsequencehwm=<n>");
#endif

argsman.AddArg("-checkblockindex", strprintf("Do a consistency check for the block tree, and occasionally. (default: %u, regtest: %u)", defaultChainParams->DefaultConsistencyChecks(), regtestChainParams->DefaultConsistencyChecks()), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST);
Expand Down Expand Up @@ -1736,7 +1739,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
g_zmq_notification_interface = CZMQNotificationInterface::Create();

if (g_zmq_notification_interface) {
RegisterValidationInterface(g_zmq_notification_interface);
RegisterValidationInterface(g_zmq_notification_interface.get());
}
#endif

Expand Down
4 changes: 2 additions & 2 deletions src/interfaces/chain.h
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,8 @@ class Chain
{
public:
virtual ~Notifications() {}
virtual void transactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) {}
virtual void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) {}
virtual void transactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) {}
virtual void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {}
virtual void blockConnected(const CBlock& block, int height) {}
virtual void blockDisconnected(const CBlock& block, int height) {}
virtual void updatedBlockTip() {}
Expand Down
10 changes: 5 additions & 5 deletions src/node/interfaces.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -611,13 +611,13 @@ class NotificationsProxy : public CValidationInterface
explicit NotificationsProxy(std::shared_ptr<Chain::Notifications> notifications)
: m_notifications(std::move(notifications)) {}
virtual ~NotificationsProxy() = default;
void TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) override
void TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) override
{
m_notifications->transactionAddedToMempool(tx, nAcceptTime);
m_notifications->transactionAddedToMempool(tx, nAcceptTime, mempool_sequence);
}
void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) override
void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override
{
m_notifications->transactionRemovedFromMempool(tx, reason);
m_notifications->transactionRemovedFromMempool(tx, reason, mempool_sequence);
}
void BlockConnected(const std::shared_ptr<const CBlock>& block, const CBlockIndex* index) override
{
Expand Down Expand Up @@ -997,7 +997,7 @@ class ChainImpl : public Chain
if (!m_node.mempool) return;
LOCK2(::cs_main, m_node.mempool->cs);
for (const CTxMemPoolEntry& entry : m_node.mempool->mapTx) {
notifications.transactionAddedToMempool(entry.GetSharedTx(), 0);
notifications.transactionAddedToMempool(entry.GetSharedTx(), /* nAcceptTime = */ 0, /* mempool_sequence = */ 0);
}
}
NodeContext& m_node;
Expand Down
Loading

0 comments on commit 7a9e475

Please sign in to comment.