Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backport: merge bitcoin#19572, #20953, #20523, #21008, #21310, #22079, #23471, #24218 (zmq backports) #6231

Merged
merged 10 commits into from
Aug 31, 2024
Merged
21 changes: 12 additions & 9 deletions contrib/zmq/zmq_sub.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
-zmqpubrawtx=tcp://127.0.0.1:28332 \
-zmqpubrawblock=tcp://127.0.0.1:28332 \
-zmqpubhashtx=tcp://127.0.0.1:28332 \
-zmqpubhashblock=tcp://127.0.0.1:28332
-zmqpubhashblock=tcp://127.0.0.1:28332 \
-zmqpubsequence=tcp://127.0.0.1:28332

We use the asyncio library here. `self.handle()` installs itself as a
future at the end of the function. Since it never returns with the event
Expand Down Expand Up @@ -58,18 +59,14 @@ def __init__(self):
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawgovernancevote")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawgovernanceobject")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawinstantsenddoublespend")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "sequence")
self.zmqSubSocket.connect("tcp://127.0.0.1:%i" % port)

async def handle(self) :
msg = await self.zmqSubSocket.recv_multipart()
topic = msg[0]
body = msg[1]
topic, body, seq = await self.zmqSubSocket.recv_multipart()
sequence = "Unknown"

if len(msg[-1]) == 4:
msgSequence = struct.unpack('<I', msg[-1])[-1]
sequence = str(msgSequence)

if len(seq) == 4:
sequence = str(struct.unpack('<I', seq)[-1])
if topic == b"hashblock":
print('- HASH BLOCK ('+sequence+') -')
print(body.hex())
Expand Down Expand Up @@ -118,6 +115,12 @@ async def handle(self) :
elif topic == b"rawinstantsenddoublespend":
print('- RAW IS DOUBLE SPEND ('+sequence+') -')
print(body.hex())
elif topic == b"sequence":
hash = body[:32].hex()
label = chr(body[32])
mempool_sequence = None if len(body) != 32+1+8 else struct.unpack("<Q", body[32+1:])[0]
print('- SEQUENCE ('+sequence+') -')
print(hash, label, mempool_sequence)
# schedule ourselves to receive the next message
asyncio.ensure_future(self.handle())

Expand Down
56 changes: 48 additions & 8 deletions doc/zmq.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ Currently, the following notifications are supported:
-zmqpubrawgovernanceobject=address
-zmqpubrawinstantsenddoublespend=address
-zmqpubrawrecoveredsig=address
-zmqpubsequence=address

The socket type is PUB and the address must be a valid ZeroMQ socket
address. The same address can be used in more than one notification.
Expand All @@ -103,23 +104,52 @@ The option to set the PUB socket's outbound message high water mark
-zmqpubrawgovernanceobjecthwm=n
-zmqpubrawinstantsenddoublespendhwm=n
-zmqpubrawrecoveredsighwm=n
-zmqpubsequencehwm=address

The high water mark value must be an integer greater than or equal to 0.

For instance:

$ dashd -zmqpubhashtx=tcp://127.0.0.1:28332 \
-zmqpubhashtx=tcp://192.168.1.2:28332 \
-zmqpubhashblock="tcp://[::1]:28333" \
-zmqpubrawtx=ipc:///tmp/dashd.tx.raw \
-zmqpubhashtxhwm=10000

Each PUB notification has a topic and body, where the header
corresponds to the notification type. For instance, for the
notification `-zmqpubhashtx` the topic is `hashtx` (no null
terminator) and the body is the transaction hash (32
bytes).
terminator). These options can also be provided in dash.conf.

These options can also be provided in dash.conf.
The topics are:

`sequence`: the body is structured as the following based on the type of message:

<32-byte hash>C : Blockhash connected
<32-byte hash>D : Blockhash disconnected
<32-byte hash>R<8-byte LE uint> : Transactionhash removed from mempool for non-block inclusion reason
<32-byte hash>A<8-byte LE uint> : Transactionhash added mempool

Where the 8-byte uints correspond to the mempool sequence number.

`rawtx`: Notifies about all transactions, both when they are added to mempool or when a new block arrives. This means a transaction could be published multiple times. First, when it enters the mempool and then again in each block that includes it. The messages are ZMQ multipart messages with three parts. The first part is the topic (`rawtx`), the second part is the serialized transaction, and the last part is a sequence number (representing the message count to detect lost messages).

| rawtx | <serialized transaction> | <uint32 sequence number in Little Endian>

`hashtx`: Notifies about all transactions, both when they are added to mempool or when a new block arrives. This means a transaction could be published multiple times. First, when it enters the mempool and then again in each block that includes it. The messages are ZMQ multipart messages with three parts. The first part is the topic (`hashtx`), the second part is the 32-byte transaction hash, and the last part is a sequence number (representing the message count to detect lost messages).

| hashtx | <32-byte transaction hash in Little Endian> | <uint32 sequence number in Little Endian>


`rawblock`: Notifies when the chain tip is updated. Messages are ZMQ multipart messages with three parts. The first part is the topic (`rawblock`), the second part is the serialized block, and the last part is a sequence number (representing the message count to detect lost messages).

| rawblock | <serialized block> | <uint32 sequence number in Little Endian>

`hashblock`: Notifies when the chain tip is updated. Messages are ZMQ multipart messages with three parts. The first part is the topic (`hashblock`), the second part is the 32-byte block hash, and the last part is a sequence number (representing the message count to detect lost messages).

| hashblock | <32-byte block hash in Little Endian> | <uint32 sequence number in Little Endian>

**_NOTE:_** Note that the 32-byte hashes are in Little Endian and not in the Big Endian format that the RPC interface and block explorers use to display transaction and block hashes.

ZeroMQ endpoint specifiers for TCP (and others) are documented in the
[ZeroMQ API](http://api.zeromq.org/4-0:_start).
Expand All @@ -143,6 +173,9 @@ Setting the keepalive values appropriately for your operating environment may
improve connectivity in situations where long-lived connections are silently
dropped by network middle boxes.

Also, the socket's ZMQ_IPV6 option is enabled to accept connections from IPv6
hosts as well. If needed, this option has to be set on the client side too.

## Remarks

From the perspective of dashd, the ZeroMQ socket is write-only; PUB
Expand All @@ -154,13 +187,20 @@ No authentication or authorization is done on connecting clients; it
is assumed that the ZeroMQ port is exposed only to trusted entities,
using other means such as firewalling.

Note that when the block chain tip changes, a reorganisation may occur
and just the tip will be notified. It is up to the subscriber to
retrieve the chain from the last known block to the new tip. Also note
that no notification occurs if the tip was in the active chain - this
is the case after calling invalidateblock RPC.
Note that for `*block` topics, when the block chain tip changes,
a reorganisation may occur and just the tip will be notified.
It is up to the subscriber to retrieve the chain from the last known
block to the new tip. Also note that no notification will occur if the tip
was in the active chain--as would be the case after calling invalidateblock RPC.
In contrast, the `sequence` topic publishes all block connections and
disconnections.

There are several possibilities that ZMQ notification can get lost
during transmission depending on the communication type you are
using. Dashd appends an up-counting sequence number to each
notification which allows listeners to detect lost notifications.

The `sequence` topic refers specifically to the mempool sequence
number, which is also published along with all mempool events. This
is a different sequence value than in ZMQ itself in order to allow a total
ordering of mempool events to be constructed.
6 changes: 4 additions & 2 deletions src/dsnotificationinterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ void CDSNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, con
}
}

void CDSNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx, int64_t nAcceptTime)
void CDSNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx, int64_t nAcceptTime,
uint64_t mempool_sequence)
{
assert(m_cj_ctx && m_llmq_ctx);

Expand All @@ -112,7 +113,8 @@ void CDSNotificationInterface::TransactionAddedToMempool(const CTransactionRef&
m_cj_ctx->dstxman->TransactionAddedToMempool(ptx);
}

void CDSNotificationInterface::TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason)
void CDSNotificationInterface::TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason,
uint64_t mempool_sequence)
{
assert(m_llmq_ctx);

Expand Down
5 changes: 3 additions & 2 deletions src/dsnotificationinterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ class CDSNotificationInterface : public CValidationInterface
void NotifyHeaderTip(const CBlockIndex *pindexNew, bool fInitialDownload) override;
void SynchronousUpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override;
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override;
void TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) override;
void TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason) override;
void TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) override;
void TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason,
uint64_t mempool_sequence) override;
void BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindex) override;
void BlockDisconnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexDisconnected) override;
void NotifyMasternodeListChanged(bool undo, const CDeterministicMNList& oldMNList, const CDeterministicMNListDiff& diff) override;
Expand Down
11 changes: 7 additions & 4 deletions src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,8 @@ void PrepareShutdown(NodeContext& node)

#if ENABLE_ZMQ
if (g_zmq_notification_interface) {
UnregisterValidationInterface(g_zmq_notification_interface);
delete g_zmq_notification_interface;
g_zmq_notification_interface = nullptr;
UnregisterValidationInterface(g_zmq_notification_interface.get());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ecea54c: shouldn't it be named partial merge bitcoin#27125: ...?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was planning to make changes of this nature anyway (see a5f9110 for a similar change to pdsNotificationInterface, will be part of a PR dedicated to misc. changes) and the change isn't strictly speaking essential for the functionality of the backports in this PR nor is the change something that doesn't serve value standalone (as opposed to being a part of a larger backport valuable in this immediate PR).

Added attribution in commit comment.

g_zmq_notification_interface.reset();
}
#endif

Expand Down Expand Up @@ -646,6 +645,7 @@ void SetupServerArgs(NodeContext& node)
argsman.AddArg("-zmqpubrawtx=<address>", "Enable publish raw transaction in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawtxlock=<address>", "Enable publish raw transaction (locked via InstantSend) in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawtxlocksig=<address>", "Enable publish raw transaction (locked via InstantSend) and ISLOCK in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubsequence=<address>", "Enable publish hash block and tx sequence in <address>", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubhashblockhwm=<n>", strprintf("Set publish hash block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubhashchainlockhwm=<n>", strprintf("Set publish hash chain lock outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubhashgovernanceobjecthwm=<n>", strprintf("Set publish hash governance object outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
Expand All @@ -664,6 +664,7 @@ void SetupServerArgs(NodeContext& node)
argsman.AddArg("-zmqpubrawtxhwm=<n>", strprintf("Set publish raw transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawtxlockhwm=<n>", strprintf("Set publish raw transaction lock outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubrawtxlocksighwm=<n>", strprintf("Set publish raw transaction lock signature outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
argsman.AddArg("-zmqpubsequencehwm=<n>", strprintf("Set publish hash sequence message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ);
#else
hidden_args.emplace_back("-zmqpubhashblock=<address>");
hidden_args.emplace_back("-zmqpubhashchainlock=<address>");
Expand All @@ -683,6 +684,7 @@ void SetupServerArgs(NodeContext& node)
hidden_args.emplace_back("-zmqpubrawtx=<address>");
hidden_args.emplace_back("-zmqpubrawtxlock=<address>");
hidden_args.emplace_back("-zmqpubrawtxlocksig=<address>");
hidden_args.emplace_back("-zmqpubsequence=<n>");
hidden_args.emplace_back("-zmqpubhashblockhwm=<n>");
hidden_args.emplace_back("-zmqpubhashchainlockhwm=<n>");
hidden_args.emplace_back("-zmqpubhashgovernanceobjecthwm=<n>");
Expand All @@ -701,6 +703,7 @@ void SetupServerArgs(NodeContext& node)
hidden_args.emplace_back("-zmqpubrawtxhwm=<n>");
hidden_args.emplace_back("-zmqpubrawtxlockhwm=<n>");
hidden_args.emplace_back("-zmqpubrawtxlocksighwm=<n>");
hidden_args.emplace_back("-zmqpubsequencehwm=<n>");
#endif

argsman.AddArg("-checkblockindex", strprintf("Do a consistency check for the block tree, and occasionally. (default: %u, regtest: %u)", defaultChainParams->DefaultConsistencyChecks(), regtestChainParams->DefaultConsistencyChecks()), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST);
Expand Down Expand Up @@ -1737,7 +1740,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
g_zmq_notification_interface = CZMQNotificationInterface::Create();

if (g_zmq_notification_interface) {
RegisterValidationInterface(g_zmq_notification_interface);
RegisterValidationInterface(g_zmq_notification_interface.get());
}
#endif

Expand Down
4 changes: 2 additions & 2 deletions src/interfaces/chain.h
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,8 @@ class Chain
{
public:
virtual ~Notifications() {}
virtual void transactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) {}
virtual void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) {}
virtual void transactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) {}
virtual void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {}
virtual void blockConnected(const CBlock& block, int height) {}
virtual void blockDisconnected(const CBlock& block, int height) {}
virtual void updatedBlockTip() {}
Expand Down
10 changes: 5 additions & 5 deletions src/node/interfaces.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -611,13 +611,13 @@ class NotificationsProxy : public CValidationInterface
explicit NotificationsProxy(std::shared_ptr<Chain::Notifications> notifications)
: m_notifications(std::move(notifications)) {}
virtual ~NotificationsProxy() = default;
void TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) override
void TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) override
{
m_notifications->transactionAddedToMempool(tx, nAcceptTime);
m_notifications->transactionAddedToMempool(tx, nAcceptTime, mempool_sequence);
}
void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) override
void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override
{
m_notifications->transactionRemovedFromMempool(tx, reason);
m_notifications->transactionRemovedFromMempool(tx, reason, mempool_sequence);
}
void BlockConnected(const std::shared_ptr<const CBlock>& block, const CBlockIndex* index) override
{
Expand Down Expand Up @@ -997,7 +997,7 @@ class ChainImpl : public Chain
if (!m_node.mempool) return;
LOCK2(::cs_main, m_node.mempool->cs);
for (const CTxMemPoolEntry& entry : m_node.mempool->mapTx) {
notifications.transactionAddedToMempool(entry.GetSharedTx(), 0);
notifications.transactionAddedToMempool(entry.GetSharedTx(), /* nAcceptTime = */ 0, /* mempool_sequence = */ 0);
}
}
NodeContext& m_node;
Expand Down
Loading
Loading