From 0a1ffd30b9a070afe8c5d584acc74a601304e330 Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Fri, 23 Aug 2024 18:23:31 +0000 Subject: [PATCH 01/10] zmq: extend appending address to log msg for Dash-specific notifications Continuation of e66870c5 from bitcoin#18309 --- src/zmq/zmqpublishnotifier.cpp | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp index 8cc4f9b33a1ba..edbe879882d4b 100644 --- a/src/zmq/zmqpublishnotifier.cpp +++ b/src/zmq/zmqpublishnotifier.cpp @@ -209,7 +209,7 @@ bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex) bool CZMQPublishHashChainLockNotifier::NotifyChainLock(const CBlockIndex *pindex, const std::shared_ptr& clsig) { uint256 hash = pindex->GetBlockHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish hashchainlock %s\n", hash.GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish hashchainlock %s to %s\n", hash.GetHex(), this->address); char data[32]; for (unsigned int i = 0; i < 32; i++) data[31 - i] = hash.begin()[i]; @@ -229,7 +229,7 @@ bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &t bool CZMQPublishHashTransactionLockNotifier::NotifyTransactionLock(const CTransactionRef& transaction, const std::shared_ptr& islock) { uint256 hash = transaction->GetHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish hashtxlock %s\n", hash.GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish hashtxlock %s to %s\n", hash.GetHex(), this->address); char data[32]; for (unsigned int i = 0; i < 32; i++) data[31 - i] = hash.begin()[i]; @@ -239,7 +239,7 @@ bool CZMQPublishHashTransactionLockNotifier::NotifyTransactionLock(const CTransa bool CZMQPublishHashGovernanceVoteNotifier::NotifyGovernanceVote(const CDeterministicMNList& tip_mn_list, const std::shared_ptr& vote) { uint256 hash = vote->GetHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish hashgovernancevote %s\n", hash.GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish hashgovernancevote %s to %s\n", hash.GetHex(), this->address); char data[32]; for (unsigned int i = 0; i < 32; i++) data[31 - i] = hash.begin()[i]; @@ -249,7 +249,7 @@ bool CZMQPublishHashGovernanceVoteNotifier::NotifyGovernanceVote(const CDetermin bool CZMQPublishHashGovernanceObjectNotifier::NotifyGovernanceObject(const std::shared_ptr& object) { uint256 hash = object->GetHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish hashgovernanceobject %s\n", hash.GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish hashgovernanceobject %s to %s\n", hash.GetHex(), this->address); char data[32]; for (unsigned int i = 0; i < 32; i++) data[31 - i] = hash.begin()[i]; @@ -259,7 +259,7 @@ bool CZMQPublishHashGovernanceObjectNotifier::NotifyGovernanceObject(const std:: bool CZMQPublishHashInstantSendDoubleSpendNotifier::NotifyInstantSendDoubleSpendAttempt(const CTransactionRef& currentTx, const CTransactionRef& previousTx) { uint256 currentHash = currentTx->GetHash(), previousHash = previousTx->GetHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish hashinstantsenddoublespend %s conflicts against %s\n", currentHash.ToString(), previousHash.ToString()); + LogPrint(BCLog::ZMQ, "zmq: Publish hashinstantsenddoublespend %s conflicts against %s to %s\n", currentHash.ToString(), previousHash.ToString(), this->address); char dataCurrentHash[32], dataPreviousHash[32]; for (unsigned int i = 0; i < 32; i++) { dataCurrentHash[31 - i] = currentHash.begin()[i]; @@ -271,7 +271,7 @@ bool CZMQPublishHashInstantSendDoubleSpendNotifier::NotifyInstantSendDoubleSpend bool CZMQPublishHashRecoveredSigNotifier::NotifyRecoveredSig(const std::shared_ptr &sig) { - LogPrint(BCLog::ZMQ, "zmq: Publish hashrecoveredsig %s\n", sig->getMsgHash().ToString()); + LogPrint(BCLog::ZMQ, "zmq: Publish hashrecoveredsig %s to %s\n", sig->getMsgHash().ToString(), this->address); char data[32]; for (unsigned int i = 0; i < 32; i++) data[31 - i] = sig->getMsgHash().begin()[i]; @@ -301,7 +301,7 @@ bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex) bool CZMQPublishRawChainLockNotifier::NotifyChainLock(const CBlockIndex *pindex, const std::shared_ptr& clsig) { - LogPrint(BCLog::ZMQ, "zmq: Publish rawchainlock %s\n", pindex->GetBlockHash().GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish rawchainlock %s to %s\n", pindex->GetBlockHash().GetHex(), this->address); const Consensus::Params& consensusParams = Params().GetConsensus(); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); @@ -322,7 +322,7 @@ bool CZMQPublishRawChainLockNotifier::NotifyChainLock(const CBlockIndex *pindex, bool CZMQPublishRawChainLockSigNotifier::NotifyChainLock(const CBlockIndex *pindex, const std::shared_ptr& clsig) { - LogPrint(BCLog::ZMQ, "zmq: Publish rawchainlocksig %s\n", pindex->GetBlockHash().GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish rawchainlocksig %s to %s\n", pindex->GetBlockHash().GetHex(), this->address); const Consensus::Params& consensusParams = Params().GetConsensus(); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); @@ -373,7 +373,7 @@ bool CZMQPublishRawTransactionLockSigNotifier::NotifyTransactionLock(const CTran bool CZMQPublishRawGovernanceVoteNotifier::NotifyGovernanceVote(const CDeterministicMNList& tip_mn_list, const std::shared_ptr& vote) { uint256 nHash = vote->GetHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish rawgovernanceobject: hash = %s to %s, vote = %d\n", nHash.ToString(), this->address, vote->ToString(tip_mn_list)); + LogPrint(BCLog::ZMQ, "zmq: Publish rawgovernanceobject %s with vote %d to %s\n", nHash.ToString(), vote->ToString(tip_mn_list), this->address); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); ss << *vote; return SendZmqMessage(MSG_RAWGVOTE, &(*ss.begin()), ss.size()); @@ -382,7 +382,7 @@ bool CZMQPublishRawGovernanceVoteNotifier::NotifyGovernanceVote(const CDetermini bool CZMQPublishRawGovernanceObjectNotifier::NotifyGovernanceObject(const std::shared_ptr& govobj) { uint256 nHash = govobj->GetHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish rawgovernanceobject: hash = %s to %s, type = %d\n", nHash.ToString(), this->address, ToUnderlying(govobj->type)); + LogPrint(BCLog::ZMQ, "zmq: Publish rawgovernanceobject %s with type %d to %s\n", nHash.ToString(), ToUnderlying(govobj->type), this->address); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); ss << *govobj; return SendZmqMessage(MSG_RAWGOBJ, &(*ss.begin()), ss.size()); @@ -390,7 +390,7 @@ bool CZMQPublishRawGovernanceObjectNotifier::NotifyGovernanceObject(const std::s bool CZMQPublishRawInstantSendDoubleSpendNotifier::NotifyInstantSendDoubleSpendAttempt(const CTransactionRef& currentTx, const CTransactionRef& previousTx) { - LogPrint(BCLog::ZMQ, "zmq: Publish rawinstantsenddoublespend %s conflicts with %s\n", currentTx->GetHash().ToString(), previousTx->GetHash().ToString()); + LogPrint(BCLog::ZMQ, "zmq: Publish rawinstantsenddoublespend %s conflicts with %s to %s\n", currentTx->GetHash().ToString(), previousTx->GetHash().ToString(), this->address); CDataStream ssCurrent(SER_NETWORK, PROTOCOL_VERSION), ssPrevious(SER_NETWORK, PROTOCOL_VERSION); ssCurrent << *currentTx; ssPrevious << *previousTx; @@ -400,7 +400,7 @@ bool CZMQPublishRawInstantSendDoubleSpendNotifier::NotifyInstantSendDoubleSpendA bool CZMQPublishRawRecoveredSigNotifier::NotifyRecoveredSig(const std::shared_ptr& sig) { - LogPrint(BCLog::ZMQ, "zmq: Publish rawrecoveredsig %s\n", sig->getMsgHash().ToString()); + LogPrint(BCLog::ZMQ, "zmq: Publish rawrecoveredsig %s to %s\n", sig->getMsgHash().ToString(), this->address); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); ss << *sig; From b0b4e0fa7f12e2135b3a2d0310b9ee4a37782f57 Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Mon, 17 Apr 2023 10:42:49 +0200 Subject: [PATCH 02/10] zmq: Make `g_zmq_notification_interface` a smart pointer courtesy of 8ed4ff8e from bitcoin#27125 --- src/init.cpp | 7 +++---- src/zmq/zmqnotificationinterface.cpp | 6 +++--- src/zmq/zmqnotificationinterface.h | 4 ++-- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/src/init.cpp b/src/init.cpp index 88e981898acca..eeb9514f08a91 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -360,9 +360,8 @@ void PrepareShutdown(NodeContext& node) #if ENABLE_ZMQ if (g_zmq_notification_interface) { - UnregisterValidationInterface(g_zmq_notification_interface); - delete g_zmq_notification_interface; - g_zmq_notification_interface = nullptr; + UnregisterValidationInterface(g_zmq_notification_interface.get()); + g_zmq_notification_interface.reset(); } #endif @@ -1737,7 +1736,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) g_zmq_notification_interface = CZMQNotificationInterface::Create(); if (g_zmq_notification_interface) { - RegisterValidationInterface(g_zmq_notification_interface); + RegisterValidationInterface(g_zmq_notification_interface.get()); } #endif diff --git a/src/zmq/zmqnotificationinterface.cpp b/src/zmq/zmqnotificationinterface.cpp index a0d004820c3ab..7539ca3114965 100644 --- a/src/zmq/zmqnotificationinterface.cpp +++ b/src/zmq/zmqnotificationinterface.cpp @@ -29,7 +29,7 @@ std::list CZMQNotificationInterface::GetActiveNotif return result; } -CZMQNotificationInterface* CZMQNotificationInterface::Create() +std::unique_ptr CZMQNotificationInterface::Create() { std::map factories; factories["pubhashblock"] = CZMQAbstractNotifier::Create; @@ -71,7 +71,7 @@ CZMQNotificationInterface* CZMQNotificationInterface::Create() notificationInterface->notifiers = std::move(notifiers); if (notificationInterface->Initialize()) { - return notificationInterface.release(); + return notificationInterface; } } @@ -219,4 +219,4 @@ void CZMQNotificationInterface::NotifyRecoveredSig(const std::shared_ptr g_zmq_notification_interface; diff --git a/src/zmq/zmqnotificationinterface.h b/src/zmq/zmqnotificationinterface.h index 230dd6c8ff667..8cfc12de373d9 100644 --- a/src/zmq/zmqnotificationinterface.h +++ b/src/zmq/zmqnotificationinterface.h @@ -19,7 +19,7 @@ class CZMQNotificationInterface final : public CValidationInterface std::list GetActiveNotifiers() const; - static CZMQNotificationInterface* Create(); + static std::unique_ptr Create(); protected: bool Initialize(); @@ -44,6 +44,6 @@ class CZMQNotificationInterface final : public CValidationInterface std::list> notifiers; }; -extern CZMQNotificationInterface* g_zmq_notification_interface; +extern std::unique_ptr g_zmq_notification_interface; #endif // BITCOIN_ZMQ_ZMQNOTIFICATIONINTERFACE_H From 982c1f03d4b7192db2cb9a379b49ed82dc22a5c7 Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Mon, 26 Aug 2024 15:35:12 +0000 Subject: [PATCH 03/10] merge bitcoin#19572: Create "sequence" notifier, enabling client-side mempool tracking --- contrib/zmq/zmq_sub.py | 21 +- doc/zmq.md | 29 ++- src/dsnotificationinterface.cpp | 6 +- src/dsnotificationinterface.h | 5 +- src/init.cpp | 4 + src/interfaces/chain.h | 4 +- src/node/interfaces.cpp | 10 +- src/rpc/blockchain.cpp | 39 +++- src/rpc/blockchain.h | 2 +- src/rpc/client.cpp | 1 + src/test/fuzz/tx_pool.cpp | 4 +- src/txmempool.cpp | 6 +- src/txmempool.h | 14 ++ src/validation.cpp | 2 +- src/validationinterface.cpp | 12 +- src/validationinterface.h | 9 +- src/wallet/wallet.cpp | 6 +- src/wallet/wallet.h | 4 +- src/zmq/zmqabstractnotifier.cpp | 20 ++ src/zmq/zmqabstractnotifier.h | 12 +- src/zmq/zmqnotificationinterface.cpp | 41 +++- src/zmq/zmqnotificationinterface.h | 3 +- src/zmq/zmqpublishnotifier.cpp | 48 +++++ src/zmq/zmqpublishnotifier.h | 9 + test/functional/interface_zmq.py | 308 ++++++++++++++++++++++++++- 25 files changed, 548 insertions(+), 71 deletions(-) diff --git a/contrib/zmq/zmq_sub.py b/contrib/zmq/zmq_sub.py index 9221f000b0b02..807b2d680e41e 100755 --- a/contrib/zmq/zmq_sub.py +++ b/contrib/zmq/zmq_sub.py @@ -11,7 +11,8 @@ -zmqpubrawtx=tcp://127.0.0.1:28332 \ -zmqpubrawblock=tcp://127.0.0.1:28332 \ -zmqpubhashtx=tcp://127.0.0.1:28332 \ - -zmqpubhashblock=tcp://127.0.0.1:28332 + -zmqpubhashblock=tcp://127.0.0.1:28332 \ + -zmqpubsequence=tcp://127.0.0.1:28332 We use the asyncio library here. `self.handle()` installs itself as a future at the end of the function. Since it never returns with the event @@ -58,18 +59,14 @@ def __init__(self): self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawgovernancevote") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawgovernanceobject") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawinstantsenddoublespend") + self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "sequence") self.zmqSubSocket.connect("tcp://127.0.0.1:%i" % port) async def handle(self) : - msg = await self.zmqSubSocket.recv_multipart() - topic = msg[0] - body = msg[1] + topic, body, seq = await self.zmqSubSocket.recv_multipart() sequence = "Unknown" - - if len(msg[-1]) == 4: - msgSequence = struct.unpack('C : Blockhash connected + <32-byte hash>D : Blockhash disconnected + <32-byte hash>R<8-byte LE uint> : Transactionhash removed from mempool for non-block inclusion reason + <32-byte hash>A<8-byte LE uint> : Transactionhash added mempool + +Where the 8-byte uints correspond to the mempool sequence number. These options can also be provided in dash.conf. @@ -154,13 +164,20 @@ No authentication or authorization is done on connecting clients; it is assumed that the ZeroMQ port is exposed only to trusted entities, using other means such as firewalling. -Note that when the block chain tip changes, a reorganisation may occur -and just the tip will be notified. It is up to the subscriber to -retrieve the chain from the last known block to the new tip. Also note -that no notification occurs if the tip was in the active chain - this -is the case after calling invalidateblock RPC. +Note that for `*block` topics, when the block chain tip changes, +a reorganisation may occur and just the tip will be notified. +It is up to the subscriber to retrieve the chain from the last known +block to the new tip. Also note that no notification will occur if the tip +was in the active chain--as would be the case after calling invalidateblock RPC. +In contrast, the `sequence` topic publishes all block connections and +disconnections. There are several possibilities that ZMQ notification can get lost during transmission depending on the communication type you are using. Dashd appends an up-counting sequence number to each notification which allows listeners to detect lost notifications. + +The `sequence` topic refers specifically to the mempool sequence +number, which is also published along with all mempool events. This +is a different sequence value than in ZMQ itself in order to allow a total +ordering of mempool events to be constructed. diff --git a/src/dsnotificationinterface.cpp b/src/dsnotificationinterface.cpp index 3a30fec2c0b25..3b8a8e0710a7b 100644 --- a/src/dsnotificationinterface.cpp +++ b/src/dsnotificationinterface.cpp @@ -103,7 +103,8 @@ void CDSNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, con } } -void CDSNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx, int64_t nAcceptTime) +void CDSNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx, int64_t nAcceptTime, + uint64_t mempool_sequence) { assert(m_cj_ctx && m_llmq_ctx); @@ -112,7 +113,8 @@ void CDSNotificationInterface::TransactionAddedToMempool(const CTransactionRef& m_cj_ctx->dstxman->TransactionAddedToMempool(ptx); } -void CDSNotificationInterface::TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason) +void CDSNotificationInterface::TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason, + uint64_t mempool_sequence) { assert(m_llmq_ctx); diff --git a/src/dsnotificationinterface.h b/src/dsnotificationinterface.h index dc4456de40579..c98f913880845 100644 --- a/src/dsnotificationinterface.h +++ b/src/dsnotificationinterface.h @@ -40,8 +40,9 @@ class CDSNotificationInterface : public CValidationInterface void NotifyHeaderTip(const CBlockIndex *pindexNew, bool fInitialDownload) override; void SynchronousUpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override; void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override; - void TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) override; - void TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason) override; + void TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) override; + void TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason, + uint64_t mempool_sequence) override; void BlockConnected(const std::shared_ptr& pblock, const CBlockIndex* pindex) override; void BlockDisconnected(const std::shared_ptr& pblock, const CBlockIndex* pindexDisconnected) override; void NotifyMasternodeListChanged(bool undo, const CDeterministicMNList& oldMNList, const CDeterministicMNListDiff& diff) override; diff --git a/src/init.cpp b/src/init.cpp index eeb9514f08a91..42042ab4d55ad 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -645,6 +645,7 @@ void SetupServerArgs(NodeContext& node) argsman.AddArg("-zmqpubrawtx=
", "Enable publish raw transaction in
", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubrawtxlock=
", "Enable publish raw transaction (locked via InstantSend) in
", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubrawtxlocksig=
", "Enable publish raw transaction (locked via InstantSend) and ISLOCK in
", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); + argsman.AddArg("-zmqpubsequence=
", "Enable publish hash block and tx sequence in
", ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubhashblockhwm=", strprintf("Set publish hash block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubhashchainlockhwm=", strprintf("Set publish hash chain lock outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubhashgovernanceobjecthwm=", strprintf("Set publish hash governance object outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); @@ -663,6 +664,7 @@ void SetupServerArgs(NodeContext& node) argsman.AddArg("-zmqpubrawtxhwm=", strprintf("Set publish raw transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubrawtxlockhwm=", strprintf("Set publish raw transaction lock outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); argsman.AddArg("-zmqpubrawtxlocksighwm=", strprintf("Set publish raw transaction lock signature outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); + argsman.AddArg("-zmqpubsequencehwm=", strprintf("Set publish hash sequence message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), ArgsManager::ALLOW_ANY, OptionsCategory::ZMQ); #else hidden_args.emplace_back("-zmqpubhashblock=
"); hidden_args.emplace_back("-zmqpubhashchainlock=
"); @@ -682,6 +684,7 @@ void SetupServerArgs(NodeContext& node) hidden_args.emplace_back("-zmqpubrawtx=
"); hidden_args.emplace_back("-zmqpubrawtxlock=
"); hidden_args.emplace_back("-zmqpubrawtxlocksig=
"); + hidden_args.emplace_back("-zmqpubsequence="); hidden_args.emplace_back("-zmqpubhashblockhwm="); hidden_args.emplace_back("-zmqpubhashchainlockhwm="); hidden_args.emplace_back("-zmqpubhashgovernanceobjecthwm="); @@ -700,6 +703,7 @@ void SetupServerArgs(NodeContext& node) hidden_args.emplace_back("-zmqpubrawtxhwm="); hidden_args.emplace_back("-zmqpubrawtxlockhwm="); hidden_args.emplace_back("-zmqpubrawtxlocksighwm="); + hidden_args.emplace_back("-zmqpubsequencehwm="); #endif argsman.AddArg("-checkblockindex", strprintf("Do a consistency check for the block tree, and occasionally. (default: %u, regtest: %u)", defaultChainParams->DefaultConsistencyChecks(), regtestChainParams->DefaultConsistencyChecks()), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::DEBUG_TEST); diff --git a/src/interfaces/chain.h b/src/interfaces/chain.h index dcc399a29de9d..0e8f302dfcfc9 100644 --- a/src/interfaces/chain.h +++ b/src/interfaces/chain.h @@ -258,8 +258,8 @@ class Chain { public: virtual ~Notifications() {} - virtual void transactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) {} - virtual void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) {} + virtual void transactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) {} + virtual void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {} virtual void blockConnected(const CBlock& block, int height) {} virtual void blockDisconnected(const CBlock& block, int height) {} virtual void updatedBlockTip() {} diff --git a/src/node/interfaces.cpp b/src/node/interfaces.cpp index eba23b01952b8..e2d19d3ff3e15 100644 --- a/src/node/interfaces.cpp +++ b/src/node/interfaces.cpp @@ -611,13 +611,13 @@ class NotificationsProxy : public CValidationInterface explicit NotificationsProxy(std::shared_ptr notifications) : m_notifications(std::move(notifications)) {} virtual ~NotificationsProxy() = default; - void TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) override + void TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) override { - m_notifications->transactionAddedToMempool(tx, nAcceptTime); + m_notifications->transactionAddedToMempool(tx, nAcceptTime, mempool_sequence); } - void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) override + void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override { - m_notifications->transactionRemovedFromMempool(tx, reason); + m_notifications->transactionRemovedFromMempool(tx, reason, mempool_sequence); } void BlockConnected(const std::shared_ptr& block, const CBlockIndex* index) override { @@ -997,7 +997,7 @@ class ChainImpl : public Chain if (!m_node.mempool) return; LOCK2(::cs_main, m_node.mempool->cs); for (const CTxMemPoolEntry& entry : m_node.mempool->mapTx) { - notifications.transactionAddedToMempool(entry.GetSharedTx(), 0); + notifications.transactionAddedToMempool(entry.GetSharedTx(), /* nAcceptTime = */ 0, /* mempool_sequence = */ 0); } } NodeContext& m_node; diff --git a/src/rpc/blockchain.cpp b/src/rpc/blockchain.cpp index cd0fd147eec6e..b8592153a60bf 100644 --- a/src/rpc/blockchain.cpp +++ b/src/rpc/blockchain.cpp @@ -532,9 +532,12 @@ static void entryToJSON(const CTxMemPool& pool, UniValue& info, const CTxMemPool info.pushKV("unbroadcast", pool.IsUnbroadcastTx(tx.GetHash())); } -UniValue MempoolToJSON(const CTxMemPool& pool, llmq::CInstantSendManager* isman, bool verbose) +UniValue MempoolToJSON(const CTxMemPool& pool, llmq::CInstantSendManager* isman, bool verbose, bool include_mempool_sequence) { if (verbose) { + if (include_mempool_sequence) { + throw JSONRPCError(RPC_INVALID_PARAMETER, "Verbose results cannot contain mempool sequence values."); + } LOCK(pool.cs); UniValue o(UniValue::VOBJ); for (const CTxMemPoolEntry& e : pool.mapTx) { @@ -548,14 +551,25 @@ UniValue MempoolToJSON(const CTxMemPool& pool, llmq::CInstantSendManager* isman, } return o; } else { + uint64_t mempool_sequence; std::vector vtxid; - pool.queryHashes(vtxid); - + { + LOCK(pool.cs); + pool.queryHashes(vtxid); + mempool_sequence = pool.GetSequence(); + } UniValue a(UniValue::VARR); for (const uint256& hash : vtxid) a.push_back(hash.ToString()); - return a; + if (!include_mempool_sequence) { + return a; + } else { + UniValue o(UniValue::VOBJ); + o.pushKV("txids", a); + o.pushKV("mempool_sequence", mempool_sequence); + return o; + } } } @@ -566,6 +580,7 @@ static RPCHelpMan getrawmempool() "\nHint: use getmempoolentry to fetch a specific transaction from the mempool.\n", { {"verbose", RPCArg::Type::BOOL, /* default */ "false", "True for a json object, false for array of transaction ids"}, + {"mempool_sequence", RPCArg::Type::BOOL, /* default */ "false", "If verbose=false, returns a json object with transaction list and mempool sequence number attached."}, }, { RPCResult{"for verbose = false", @@ -578,6 +593,15 @@ static RPCHelpMan getrawmempool() { {RPCResult::Type::OBJ, "transactionid", "", MempoolEntryDescription()}, }}, + RPCResult{"for verbose = false and mempool_sequence = true", + RPCResult::Type::OBJ, "", "", + { + {RPCResult::Type::ARR, "txids", "", + { + {RPCResult::Type::STR_HEX, "", "The transaction id"}, + }}, + {RPCResult::Type::NUM, "mempool_sequence", "The mempool sequence value."}, + }}, }, RPCExamples{ HelpExampleCli("getrawmempool", "true") @@ -589,10 +613,15 @@ static RPCHelpMan getrawmempool() if (!request.params[0].isNull()) fVerbose = request.params[0].get_bool(); + bool include_mempool_sequence = false; + if (!request.params[1].isNull()) { + include_mempool_sequence = request.params[1].get_bool(); + } + const NodeContext& node = EnsureAnyNodeContext(request.context); const CTxMemPool& mempool = EnsureMemPool(node); LLMQContext& llmq_ctx = EnsureLLMQContext(node); - return MempoolToJSON(mempool, llmq_ctx.isman, fVerbose); + return MempoolToJSON(mempool, llmq_ctx.isman, fVerbose, include_mempool_sequence); }, }; } diff --git a/src/rpc/blockchain.h b/src/rpc/blockchain.h index f8fc13d0817a6..b67055bb66439 100644 --- a/src/rpc/blockchain.h +++ b/src/rpc/blockchain.h @@ -47,7 +47,7 @@ UniValue blockToJSON(BlockManager& blockman, const CBlock& block, const CBlockIn UniValue MempoolInfoToJSON(const CTxMemPool& pool, llmq::CInstantSendManager& isman); /** Mempool to JSON */ -UniValue MempoolToJSON(const CTxMemPool& pool, llmq::CInstantSendManager* isman, bool verbose = false); +UniValue MempoolToJSON(const CTxMemPool& pool, llmq::CInstantSendManager* isman, bool verbose = false, bool include_mempool_sequence = false); /** Block header to JSON */ UniValue blockheaderToJSON(const CBlockIndex* tip, const CBlockIndex* blockindex, llmq::CChainLocksHandler& clhandler, llmq::CInstantSendManager& isman) LOCKS_EXCLUDED(cs_main); diff --git a/src/rpc/client.cpp b/src/rpc/client.cpp index b7b2e79604dd0..be89fea835d6a 100644 --- a/src/rpc/client.cpp +++ b/src/rpc/client.cpp @@ -168,6 +168,7 @@ static const CRPCConvertParam vRPCConvertParams[] = { "pruneblockchain", 0, "height" }, { "keypoolrefill", 0, "newsize" }, { "getrawmempool", 0, "verbose" }, + { "getrawmempool", 1, "mempool_sequence" }, { "estimatesmartfee", 0, "conf_target" }, { "estimaterawfee", 0, "conf_target" }, { "estimaterawfee", 1, "threshold" }, diff --git a/src/test/fuzz/tx_pool.cpp b/src/test/fuzz/tx_pool.cpp index a11c258b1abc5..cbdffe29d874f 100644 --- a/src/test/fuzz/tx_pool.cpp +++ b/src/test/fuzz/tx_pool.cpp @@ -51,12 +51,12 @@ struct TransactionsDelta final : public CValidationInterface { explicit TransactionsDelta(std::set& r, std::set& a) : m_removed{r}, m_added{a} {} - void TransactionAddedToMempool(const CTransactionRef& tx, int64_t /* nAcceptTime */) override + void TransactionAddedToMempool(const CTransactionRef& tx, int64_t /* nAcceptTime */, uint64_t) override { Assert(m_added.insert(tx).second); } - void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) override + void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t) override { Assert(m_removed.insert(tx).second); } diff --git a/src/txmempool.cpp b/src/txmempool.cpp index 1f125a4c171bc..4f771dac8a951 100644 --- a/src/txmempool.cpp +++ b/src/txmempool.cpp @@ -607,12 +607,16 @@ void CTxMemPool::addUncheckedProTx(indexed_transaction_set::iterator& newit, con void CTxMemPool::removeUnchecked(txiter it, MemPoolRemovalReason reason) { + // We increment mempool sequence value no matter removal reason + // even if not directly reported below. + uint64_t mempool_sequence = GetAndIncrementSequence(); + if (reason != MemPoolRemovalReason::BLOCK) { // Notify clients that a transaction has been removed from the mempool // for any reason except being included in a block. Clients interested // in transactions included in blocks can subscribe to the BlockConnected // notification. - GetMainSignals().TransactionRemovedFromMempool(it->GetSharedTx(), reason); + GetMainSignals().TransactionRemovedFromMempool(it->GetSharedTx(), reason, mempool_sequence); } const uint256 hash = it->GetTx().GetHash(); diff --git a/src/txmempool.h b/src/txmempool.h index 12efa08b5147b..8581e2122b55c 100644 --- a/src/txmempool.h +++ b/src/txmempool.h @@ -485,6 +485,11 @@ class CTxMemPool mutable double rollingMinimumFeeRate GUARDED_BY(cs); //!< minimum fee to get into the pool, decreases exponentially mutable Epoch m_epoch GUARDED_BY(cs); + // In-memory counter for external mempool tracking purposes. + // This number is incremented once every time a transaction + // is added or removed from the mempool for any reason. + mutable uint64_t m_sequence_number{1}; + void trackPackageRemoved(const CFeeRate& rate) EXCLUSIVE_LOCKS_REQUIRED(cs); bool m_is_loaded GUARDED_BY(cs){false}; @@ -810,6 +815,15 @@ class CTxMemPool return (m_unbroadcast_txids.count(txid) != 0); } + /** Guards this internal counter for external reporting */ + uint64_t GetAndIncrementSequence() const EXCLUSIVE_LOCKS_REQUIRED(cs) { + return m_sequence_number++; + } + + uint64_t GetSequence() const EXCLUSIVE_LOCKS_REQUIRED(cs) { + return m_sequence_number; + } + private: /** UpdateForDescendants is used by UpdateTransactionsFromBlock to update * the descendants for a single transaction that has been added to the diff --git a/src/validation.cpp b/src/validation.cpp index b4f7c2c422ba1..04fddb4cb33e6 100644 --- a/src/validation.cpp +++ b/src/validation.cpp @@ -889,7 +889,7 @@ MempoolAcceptResult MemPoolAccept::AcceptSingleTransaction(const CTransactionRef if (!Finalize(args, ws)) return MempoolAcceptResult::Failure(ws.m_state); const int64_t nAcceptTime = args.m_accept_time; - GetMainSignals().TransactionAddedToMempool(ptx, nAcceptTime); + GetMainSignals().TransactionAddedToMempool(ptx, nAcceptTime, m_pool.GetAndIncrementSequence()); const CTransaction& tx = *ptx; auto finish = Now(); diff --git a/src/validationinterface.cpp b/src/validationinterface.cpp index ac8b19dcdcfbc..e00a8bdb533e4 100644 --- a/src/validationinterface.cpp +++ b/src/validationinterface.cpp @@ -208,17 +208,17 @@ void CMainSignals::SynchronousUpdatedBlockTip(const CBlockIndex *pindexNew, cons m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.SynchronousUpdatedBlockTip(pindexNew, pindexFork, fInitialDownload); }); } -void CMainSignals::TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) { - auto event = [tx, nAcceptTime, this] { - m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionAddedToMempool(tx, nAcceptTime); }); +void CMainSignals::TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) { + auto event = [tx, nAcceptTime, mempool_sequence, this] { + m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionAddedToMempool(tx, nAcceptTime, mempool_sequence); }); }; ENQUEUE_AND_LOG_EVENT(event, "%s: txid=%s", __func__, tx->GetHash().ToString()); } -void CMainSignals::TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) { - auto event = [tx, reason, this] { - m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionRemovedFromMempool(tx, reason); }); +void CMainSignals::TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) { + auto event = [tx, reason, mempool_sequence, this] { + m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionRemovedFromMempool(tx, reason, mempool_sequence); }); }; ENQUEUE_AND_LOG_EVENT(event, "%s: txid=%s", __func__, tx->GetHash().ToString()); diff --git a/src/validationinterface.h b/src/validationinterface.h index c66b25a65398e..c860f7fb862a3 100644 --- a/src/validationinterface.h +++ b/src/validationinterface.h @@ -117,7 +117,8 @@ class CValidationInterface { * * Called on a background thread. */ - virtual void TransactionAddedToMempool(const CTransactionRef &xn, int64_t nAcceptTime) {} + virtual void TransactionAddedToMempool(const CTransactionRef &xn, int64_t nAcceptTime, uint64_t mempool_sequence) {} + /** * Notifies listeners of a transaction leaving mempool. * @@ -149,7 +150,7 @@ class CValidationInterface { * * Called on a background thread. */ - virtual void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) {} + virtual void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {} /** * Notifies listeners of a block being connected. * Provides a vector of transactions evicted from the mempool as a result. @@ -226,8 +227,8 @@ class CMainSignals { void NotifyHeaderTip(const CBlockIndex *pindexNew, bool fInitialDownload); void UpdatedBlockTip(const CBlockIndex *, const CBlockIndex *, bool fInitialDownload); void SynchronousUpdatedBlockTip(const CBlockIndex *, const CBlockIndex *, bool fInitialDownload); - void TransactionAddedToMempool(const CTransactionRef&, int64_t); - void TransactionRemovedFromMempool(const CTransactionRef&, MemPoolRemovalReason); + void TransactionAddedToMempool(const CTransactionRef&, int64_t, uint64_t mempool_sequence); + void TransactionRemovedFromMempool(const CTransactionRef&, MemPoolRemovalReason, uint64_t mempool_sequence); void BlockConnected(const std::shared_ptr &, const CBlockIndex *pindex); void BlockDisconnected(const std::shared_ptr &, const CBlockIndex* pindex); void NotifyTransactionLock(const CTransactionRef &tx, const std::shared_ptr& islock); diff --git a/src/wallet/wallet.cpp b/src/wallet/wallet.cpp index f648fc2ceddc8..4257cb6f5787a 100644 --- a/src/wallet/wallet.cpp +++ b/src/wallet/wallet.cpp @@ -1267,7 +1267,7 @@ void CWallet::SyncTransaction(const CTransactionRef& ptx, CWalletTx::Confirmatio fAnonymizableTallyCachedNonDenom = false; } -void CWallet::transactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) { +void CWallet::transactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) { LOCK(cs_wallet); CWalletTx::Confirmation confirm(CWalletTx::Status::UNCONFIRMED, /* block_height */ 0, {}, /* nIndex */ 0); WalletBatch batch(GetDatabase()); @@ -1279,7 +1279,7 @@ void CWallet::transactionAddedToMempool(const CTransactionRef& tx, int64_t nAcce } } -void CWallet::transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) { +void CWallet::transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) { if (reason != MemPoolRemovalReason::CONFLICT) { LOCK(cs_wallet); auto it = mapWallet.find(tx->GetHash()); @@ -1330,7 +1330,7 @@ void CWallet::blockConnected(const CBlock& block, int height) WalletBatch batch(GetDatabase()); for (size_t index = 0; index < block.vtx.size(); index++) { SyncTransaction(block.vtx[index], {CWalletTx::Status::CONFIRMED, height, block_hash, (int)index}, batch); - transactionRemovedFromMempool(block.vtx[index], MemPoolRemovalReason::BLOCK); + transactionRemovedFromMempool(block.vtx[index], MemPoolRemovalReason::BLOCK, 0 /* mempool_sequence */); } // reset cache to make sure no longer immature coins are included diff --git a/src/wallet/wallet.h b/src/wallet/wallet.h index a94217ee9d8cd..46cfe3f1b00df 100644 --- a/src/wallet/wallet.h +++ b/src/wallet/wallet.h @@ -1078,7 +1078,7 @@ class CWallet final : public WalletStorage, public interfaces::Chain::Notificati CWalletTx* AddToWallet(CTransactionRef tx, const CWalletTx::Confirmation& confirm, const UpdateWalletTxFn& update_wtx=nullptr, bool fFlushOnClose=true); bool LoadToWallet(const uint256& hash, const UpdateWalletTxFn& fill_wtx) EXCLUSIVE_LOCKS_REQUIRED(cs_wallet); - void transactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) override; + void transactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) override; void blockConnected(const CBlock& block, int height) override; void blockDisconnected(const CBlock& block, int height) override; void updatedBlockTip() override; @@ -1100,7 +1100,7 @@ class CWallet final : public WalletStorage, public interfaces::Chain::Notificati uint256 last_failed_block; }; ScanResult ScanForWalletTransactions(const uint256& start_block, int start_height, std::optional max_height, const WalletRescanReserver& reserver, bool fUpdate); - void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason) override; + void transactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override; void ReacceptWalletTransactions() EXCLUSIVE_LOCKS_REQUIRED(cs_wallet); void ResendWalletTransactions(); struct Balance { diff --git a/src/zmq/zmqabstractnotifier.cpp b/src/zmq/zmqabstractnotifier.cpp index 75d394326d03a..e3a1d98cfe710 100644 --- a/src/zmq/zmqabstractnotifier.cpp +++ b/src/zmq/zmqabstractnotifier.cpp @@ -28,6 +28,26 @@ bool CZMQAbstractNotifier::NotifyTransaction(const CTransaction &/*transaction*/ return true; } +bool CZMQAbstractNotifier::NotifyBlockConnect(const CBlockIndex * /*CBlockIndex*/) +{ + return true; +} + +bool CZMQAbstractNotifier::NotifyBlockDisconnect(const CBlockIndex * /*CBlockIndex*/) +{ + return true; +} + +bool CZMQAbstractNotifier::NotifyTransactionAcceptance(const CTransaction &/*transaction*/, uint64_t mempool_sequence) +{ + return true; +} + +bool CZMQAbstractNotifier::NotifyTransactionRemoval(const CTransaction &/*transaction*/, uint64_t mempool_sequence) +{ + return true; +} + bool CZMQAbstractNotifier::NotifyTransactionLock(const CTransactionRef &/*transaction*/, const std::shared_ptr& /*islock*/) { return true; diff --git a/src/zmq/zmqabstractnotifier.h b/src/zmq/zmqabstractnotifier.h index b2e0b10bf7996..534082d64aa28 100644 --- a/src/zmq/zmqabstractnotifier.h +++ b/src/zmq/zmqabstractnotifier.h @@ -58,9 +58,19 @@ class CZMQAbstractNotifier virtual bool Initialize(void *pcontext) = 0; virtual void Shutdown() = 0; + // Notifies of ConnectTip result, i.e., new active tip only virtual bool NotifyBlock(const CBlockIndex *pindex); - virtual bool NotifyChainLock(const CBlockIndex *pindex, const std::shared_ptr& clsig); + // Notifies of every block connection + virtual bool NotifyBlockConnect(const CBlockIndex *pindex); + // Notifies of every block disconnection + virtual bool NotifyBlockDisconnect(const CBlockIndex *pindex); + // Notifies of every mempool acceptance + virtual bool NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence); + // Notifies of every mempool removal, except inclusion in blocks + virtual bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence); + // Notifies of transactions added to mempool or appearing in blocks virtual bool NotifyTransaction(const CTransaction &transaction); + virtual bool NotifyChainLock(const CBlockIndex *pindex, const std::shared_ptr& clsig); virtual bool NotifyTransactionLock(const CTransactionRef& transaction, const std::shared_ptr& islock); virtual bool NotifyGovernanceVote(const CDeterministicMNList& tip_mn_list, const std::shared_ptr& vote); virtual bool NotifyGovernanceObject(const std::shared_ptr& object); diff --git a/src/zmq/zmqnotificationinterface.cpp b/src/zmq/zmqnotificationinterface.cpp index 7539ca3114965..eaf83097ea6f0 100644 --- a/src/zmq/zmqnotificationinterface.cpp +++ b/src/zmq/zmqnotificationinterface.cpp @@ -50,6 +50,7 @@ std::unique_ptr CZMQNotificationInterface::Create() factories["pubrawgovernanceobject"] = CZMQAbstractNotifier::Create; factories["pubrawinstantsenddoublespend"] = CZMQAbstractNotifier::Create; factories["pubrawrecoveredsig"] = CZMQAbstractNotifier::Create; + factories["pubsequence"] = CZMQAbstractNotifier::Create; std::list> notifiers; for (const auto& entry : factories) @@ -157,31 +158,53 @@ void CZMQNotificationInterface::NotifyChainLock(const CBlockIndex *pindex, const }); } -void CZMQNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx, int64_t nAcceptTime) +void CZMQNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx, int64_t nAcceptTime, uint64_t mempool_sequence) { - // Used by BlockConnected and BlockDisconnected as well, because they're - // all the same external callback. const CTransaction& tx = *ptx; - TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) { - return notifier->NotifyTransaction(tx); + TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) { + return notifier->NotifyTransaction(tx) && notifier->NotifyTransactionAcceptance(tx, mempool_sequence); + }); +} + +void CZMQNotificationInterface::TransactionRemovedFromMempool(const CTransactionRef& ptx, MemPoolRemovalReason reason, uint64_t mempool_sequence) +{ + // Called for all non-block inclusion reasons + const CTransaction& tx = *ptx; + + TryForEachAndRemoveFailed(notifiers, [&tx, mempool_sequence](CZMQAbstractNotifier* notifier) { + return notifier->NotifyTransactionRemoval(tx, mempool_sequence); }); } void CZMQNotificationInterface::BlockConnected(const std::shared_ptr& pblock, const CBlockIndex* pindexConnected) { for (const CTransactionRef& ptx : pblock->vtx) { - // Do a normal notify for each transaction added in the block - TransactionAddedToMempool(ptx, 0); + const CTransaction& tx = *ptx; + TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) { + return notifier->NotifyTransaction(tx); + }); } + + // Next we notify BlockConnect listeners for *all* blocks + TryForEachAndRemoveFailed(notifiers, [pindexConnected](CZMQAbstractNotifier* notifier) { + return notifier->NotifyBlockConnect(pindexConnected); + }); } void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr& pblock, const CBlockIndex* pindexDisconnected) { for (const CTransactionRef& ptx : pblock->vtx) { - // Do a normal notify for each transaction removed in block disconnection - TransactionAddedToMempool(ptx, 0); + const CTransaction& tx = *ptx; + TryForEachAndRemoveFailed(notifiers, [&tx](CZMQAbstractNotifier* notifier) { + return notifier->NotifyTransaction(tx); + }); } + + // Next we notify BlockDisconnect listeners for *all* blocks + TryForEachAndRemoveFailed(notifiers, [pindexDisconnected](CZMQAbstractNotifier* notifier) { + return notifier->NotifyBlockDisconnect(pindexDisconnected); + }); } void CZMQNotificationInterface::NotifyTransactionLock(const CTransactionRef& tx, const std::shared_ptr& islock) diff --git a/src/zmq/zmqnotificationinterface.h b/src/zmq/zmqnotificationinterface.h index 8cfc12de373d9..4dd078a45faff 100644 --- a/src/zmq/zmqnotificationinterface.h +++ b/src/zmq/zmqnotificationinterface.h @@ -26,7 +26,8 @@ class CZMQNotificationInterface final : public CValidationInterface void Shutdown(); // CValidationInterface - void TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime) override; + void TransactionAddedToMempool(const CTransactionRef& tx, int64_t nAcceptTime, uint64_t mempool_sequence) override; + void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) override; void BlockConnected(const std::shared_ptr& pblock, const CBlockIndex* pindexConnected) override; void BlockDisconnected(const std::shared_ptr& pblock, const CBlockIndex* pindexDisconnected) override; void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override; diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp index edbe879882d4b..5b863d426c5e2 100644 --- a/src/zmq/zmqpublishnotifier.cpp +++ b/src/zmq/zmqpublishnotifier.cpp @@ -46,6 +46,7 @@ static const char *MSG_RAWGVOTE = "rawgovernancevote"; static const char *MSG_RAWGOBJ = "rawgovernanceobject"; static const char *MSG_RAWISCON = "rawinstantsenddoublespend"; static const char *MSG_RAWRECSIG = "rawrecoveredsig"; +static const char *MSG_SEQUENCE = "sequence"; // Internal function to send multipart message static int zmq_send_multipart(void *sock, const void* data, size_t size, ...) @@ -351,6 +352,53 @@ bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &tr return SendZmqMessage(MSG_RAWTX, &(*ss.begin()), ss.size()); } +// TODO: Dedup this code to take label char, log string +bool CZMQPublishSequenceNotifier::NotifyBlockConnect(const CBlockIndex *pindex) +{ + uint256 hash = pindex->GetBlockHash(); + LogPrint(BCLog::ZMQ, "zmq: Publish sequence block connect %s to %s\n", hash.GetHex(), this->address); + char data[sizeof(uint256)+1]; + for (unsigned int i = 0; i < sizeof(uint256); i++) + data[sizeof(uint256) - 1 - i] = hash.begin()[i]; + data[sizeof(data) - 1] = 'C'; // Block (C)onnect + return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data)); +} + +bool CZMQPublishSequenceNotifier::NotifyBlockDisconnect(const CBlockIndex *pindex) +{ + uint256 hash = pindex->GetBlockHash(); + LogPrint(BCLog::ZMQ, "zmq: Publish sequence block disconnect %s to %s\n", hash.GetHex(), this->address); + char data[sizeof(uint256)+1]; + for (unsigned int i = 0; i < sizeof(uint256); i++) + data[sizeof(uint256) - 1 - i] = hash.begin()[i]; + data[sizeof(data) - 1] = 'D'; // Block (D)isconnect + return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data)); +} + +bool CZMQPublishSequenceNotifier::NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence) +{ + uint256 hash = transaction.GetHash(); + LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool acceptance %s to %s\n", hash.GetHex(), this->address); + unsigned char data[sizeof(uint256)+sizeof(mempool_sequence)+1]; + for (unsigned int i = 0; i < sizeof(uint256); i++) + data[sizeof(uint256) - 1 - i] = hash.begin()[i]; + data[sizeof(uint256)] = 'A'; // Mempool (A)cceptance + WriteLE64(data+sizeof(uint256)+1, mempool_sequence); + return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data)); +} + +bool CZMQPublishSequenceNotifier::NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence) +{ + uint256 hash = transaction.GetHash(); + LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool removal %s to %s\n", hash.GetHex(), this->address); + unsigned char data[sizeof(uint256)+sizeof(mempool_sequence)+1]; + for (unsigned int i = 0; i < sizeof(uint256); i++) + data[sizeof(uint256) - 1 - i] = hash.begin()[i]; + data[sizeof(uint256)] = 'R'; // Mempool (R)emoval + WriteLE64(data+sizeof(uint256)+1, mempool_sequence); + return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data)); +} + bool CZMQPublishRawTransactionLockNotifier::NotifyTransactionLock(const CTransactionRef& transaction, const std::shared_ptr& islock) { uint256 hash = transaction->GetHash(); diff --git a/src/zmq/zmqpublishnotifier.h b/src/zmq/zmqpublishnotifier.h index d674ef38916de..7dc632d077dbb 100644 --- a/src/zmq/zmqpublishnotifier.h +++ b/src/zmq/zmqpublishnotifier.h @@ -108,6 +108,15 @@ class CZMQPublishRawTransactionNotifier : public CZMQAbstractPublishNotifier bool NotifyTransaction(const CTransaction &transaction) override; }; +class CZMQPublishSequenceNotifier : public CZMQAbstractPublishNotifier +{ +public: + bool NotifyBlockConnect(const CBlockIndex *pindex) override; + bool NotifyBlockDisconnect(const CBlockIndex *pindex) override; + bool NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence) override; + bool NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence) override; +}; + class CZMQPublishRawTransactionLockNotifier : public CZMQAbstractPublishNotifier { public: diff --git a/test/functional/interface_zmq.py b/test/functional/interface_zmq.py index 35227ae70bc18..88ecc7186d4a3 100755 --- a/test/functional/interface_zmq.py +++ b/test/functional/interface_zmq.py @@ -5,15 +5,26 @@ """Test the ZMQ notification interface.""" import struct -from test_framework.address import ADDRESS_BCRT1_UNSPENDABLE +from test_framework.address import ADDRESS_BCRT1_UNSPENDABLE, ADDRESS_BCRT1_P2SH_OP_TRUE +from test_framework.blocktools import create_block, create_coinbase from test_framework.test_framework import BitcoinTestFramework from test_framework.messages import ( dashhash, hash256, + tx_from_hex, +) +from test_framework.util import ( + assert_equal, + assert_raises_rpc_error, ) -from test_framework.util import assert_equal from time import sleep +# Test may be skipped and not have zmq installed +try: + import zmq +except ImportError: + pass + def hash256_reversed(byte_str): return hash256(byte_str)[::-1] @@ -26,7 +37,6 @@ def __init__(self, socket, topic): self.socket = socket self.topic = topic - import zmq self.socket.setsockopt(zmq.SUBSCRIBE, self.topic) def receive(self): @@ -38,6 +48,22 @@ def receive(self): self.sequence += 1 return body + def receive_sequence(self): + topic, body, seq = self.socket.recv_multipart() + # Topic should match the subscriber topic. + assert_equal(topic, self.topic) + # Sequence should be incremental. + assert_equal(struct.unpack('C : Blockhash connected + <32-byte hash>D : Blockhash disconnected + <32-byte hash>R<8-byte LE uint> : Transactionhash removed from mempool for non-block inclusion reason + <32-byte hash>A<8-byte LE uint> : Transactionhash added mempool + """ + self.log.info("Testing 'sequence' publisher") + address = 'tcp://127.0.0.1:28333' + socket = self.ctx.socket(zmq.SUB) + socket.set(zmq.RCVTIMEO, 60000) + seq = ZMQSubscriber(socket, b'sequence') + + self.restart_node(0, ['-zmqpub%s=%s' % (seq.topic.decode(), address)]) + socket.connect(address) + # Relax so that the subscriber is ready before publishing zmq messages + sleep(0.2) + + # Mempool sequence number starts at 1 + seq_num = 1 + + # Generate 1 block in nodes[0] and receive all notifications + dc_block = self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)[0] + + # Note: We are not notified of any block transactions, coinbase or mined + assert_equal((self.nodes[0].getbestblockhash(), "C", None), seq.receive_sequence()) + + # Generate 2 blocks in nodes[1] to a different address to ensure a chain split + self.nodes[1].generatetoaddress(2, ADDRESS_BCRT1_P2SH_OP_TRUE) + + # nodes[0] will reorg chain after connecting back nodes[1] + self.connect_nodes(0, 1) + + # Then we receive all block (dis)connect notifications for the 2 block reorg + assert_equal((dc_block, "D", None), seq.receive_sequence()) + block_count = self.nodes[1].getblockcount() + assert_equal((self.nodes[1].getblockhash(block_count-1), "C", None), seq.receive_sequence()) + assert_equal((self.nodes[1].getblockhash(block_count), "C", None), seq.receive_sequence()) + + # Rest of test requires wallet functionality + if self.is_wallet_compiled(): + self.log.info("Wait for tx from second node") + payment_txid = self.nodes[1].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=5.0) + self.sync_all() + self.log.info("Testing sequence notifications with mempool sequence values") + + # Should receive the broadcasted txid. + assert_equal((payment_txid, "A", seq_num), seq.receive_sequence()) + seq_num += 1 + + # Doesn't get published when mined, make a block and tx to "flush" the possibility + # though the mempool sequence number does go up by the number of transactions + # removed from the mempool by the block mining it. + mempool_size = len(self.nodes[0].getrawmempool()) + c_block = self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)[0] + self.sync_all() + # Make sure the number of mined transactions matches the number of txs out of mempool + mempool_size_delta = mempool_size - len(self.nodes[0].getrawmempool()) + assert_equal(len(self.nodes[0].getblock(c_block)["tx"])-1, mempool_size_delta) + seq_num += mempool_size_delta + payment_txid_2 = self.nodes[1].sendtoaddress(self.nodes[0].getnewaddress(), 1.0) + self.sync_all() + assert_equal((c_block, "C", None), seq.receive_sequence()) + assert_equal((payment_txid_2, "A", seq_num), seq.receive_sequence()) + seq_num += 1 + + # Spot check getrawmempool results that they only show up when asked for + assert type(self.nodes[0].getrawmempool()) is list + assert type(self.nodes[0].getrawmempool(mempool_sequence=False)) is list + assert "mempool_sequence" not in self.nodes[0].getrawmempool(verbose=True) + assert_raises_rpc_error(-8, "Verbose results cannot contain mempool sequence values.", self.nodes[0].getrawmempool, True, True) + assert_equal(self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"], seq_num) + + self.log.info("Testing reorg notifications") + # Manually invalidate the last block to test mempool re-entry + # N.B. This part could be made more lenient in exact ordering + # since it greatly depends on inner-workings of blocks/mempool + # during "deep" re-orgs. Probably should "re-construct" + # blockchain/mempool state from notifications instead. + block_count = self.nodes[0].getblockcount() + best_hash = self.nodes[0].getbestblockhash() + self.nodes[0].invalidateblock(best_hash) + sleep(2) # Bit of room to make sure transaction things happened + + # Make sure getrawmempool mempool_sequence results aren't "queued" but immediately reflective + # of the time they were gathered. + assert self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"] > seq_num + + assert_equal((best_hash, "D", None), seq.receive_sequence()) + assert_equal((payment_txid, "A", seq_num), seq.receive_sequence()) + seq_num += 1 + + # Other things may happen but aren't wallet-deterministic so we don't test for them currently + self.nodes[0].reconsiderblock(best_hash) + self.nodes[1].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE) + self.sync_all() + + self.log.info("Evict mempool transaction by block conflict") + orig_txid = self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=1.0) + + # More to be simply mined + more_tx = [] + for _ in range(5): + more_tx.append(self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), 0.1)) + + raw_tx = self.nodes[0].getrawtransaction(orig_txid) + # Mine the tx + block = create_block(int(self.nodes[0].getbestblockhash(), 16), create_coinbase(self.nodes[0].getblockcount()+1)) + tx = tx_from_hex(raw_tx) + block.vtx.append(tx) + for txid in more_tx: + tx = tx_from_hex(self.nodes[0].getrawtransaction(txid)) + block.vtx.append(tx) + block.hashMerkleRoot = block.calc_merkle_root() + block.solve() + assert_equal(self.nodes[0].submitblock(block.serialize().hex()), None) + tip = self.nodes[0].getbestblockhash() + assert_equal(int(tip, 16), block.sha256) + orig_txid_2 = self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=1.0) + + # Flush old notifications until evicted tx original entry + (hash_str, label, mempool_seq) = seq.receive_sequence() + while hash_str != orig_txid: + (hash_str, label, mempool_seq) = seq.receive_sequence() + mempool_seq += 1 + + # Added original tx + assert_equal(label, "A") + # More transactions to be simply mined + for i in range(len(more_tx)): + assert_equal((more_tx[i], "A", mempool_seq), seq.receive_sequence()) + mempool_seq += 1 + + mempool_seq += 1 + assert_equal((tip, "C", None), seq.receive_sequence()) + mempool_seq += len(more_tx) + # Last tx + assert_equal((orig_txid_2, "A", mempool_seq), seq.receive_sequence()) + mempool_seq += 1 + self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE) + self.sync_all() # want to make sure we didn't break "consensus" for other tests + + def test_mempool_sync(self): + """ + Use sequence notification plus getrawmempool sequence results to "sync mempool" + """ + if not self.is_wallet_compiled(): + self.log.info("Skipping mempool sync test") + return + + self.log.info("Testing 'mempool sync' usage of sequence notifier") + address = 'tcp://127.0.0.1:28333' + socket = self.ctx.socket(zmq.SUB) + socket.set(zmq.RCVTIMEO, 60000) + seq = ZMQSubscriber(socket, b'sequence') + + self.restart_node(0, ['-zmqpub%s=%s' % (seq.topic.decode(), address)]) + self.connect_nodes(0, 1) + socket.connect(address) + # Relax so that the subscriber is ready before publishing zmq messages + sleep(0.2) + + # In-memory counter, should always start at 1 + next_mempool_seq = self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"] + assert_equal(next_mempool_seq, 1) + + # Some transactions have been happening but we aren't consuming zmq notifications yet + # or we lost a ZMQ message somehow and want to start over + txids = [] + num_txs = 5 + for _ in range(num_txs): + txids.append(self.nodes[1].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=1.0)) + self.sync_all() + + # 1) Consume backlog until we get a mempool sequence number + (hash_str, label, zmq_mem_seq) = seq.receive_sequence() + while zmq_mem_seq is None: + (hash_str, label, zmq_mem_seq) = seq.receive_sequence() + + assert label == "A" or label == "R" + assert hash_str is not None + + # 2) We need to "seed" our view of the mempool + mempool_snapshot = self.nodes[0].getrawmempool(mempool_sequence=True) + mempool_view = set(mempool_snapshot["txids"]) + get_raw_seq = mempool_snapshot["mempool_sequence"] + assert_equal(get_raw_seq, 6) + # Snapshot may be too old compared to zmq message we read off latest + while zmq_mem_seq >= get_raw_seq: + sleep(2) + mempool_snapshot = self.nodes[0].getrawmempool(mempool_sequence=True) + mempool_view = set(mempool_snapshot["txids"]) + get_raw_seq = mempool_snapshot["mempool_sequence"] + + # Things continue to happen in the "interim" while waiting for snapshot results + for _ in range(num_txs): + txids.append(self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=0.1)) + self.sync_all() + self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE) + final_txid = self.nodes[0].sendtoaddress(address=self.nodes[0].getnewaddress(), amount=0.1) + + # 3) Consume ZMQ backlog until we get to "now" for the mempool snapshot + while True: + if zmq_mem_seq == get_raw_seq - 1: + break + (hash_str, label, mempool_sequence) = seq.receive_sequence() + if mempool_sequence is not None: + zmq_mem_seq = mempool_sequence + if zmq_mem_seq > get_raw_seq: + raise Exception("We somehow jumped mempool sequence numbers! zmq_mem_seq: {} > get_raw_seq: {}".format(zmq_mem_seq, get_raw_seq)) + + # 4) Moving forward, we apply the delta to our local view + # remaining txs(5) + 1 block connect + 1 final tx + expected_sequence = get_raw_seq + r_gap = 0 + for _ in range(num_txs + 1 + 1): + (hash_str, label, mempool_sequence) = seq.receive_sequence() + if mempool_sequence is not None: + if mempool_sequence != expected_sequence: + # Detected "R" gap, means this a conflict eviction, and mempool tx are being evicted before its + # position in the incoming block message "C" + if label == "R": + assert mempool_sequence > expected_sequence + r_gap += mempool_sequence - expected_sequence + else: + raise Exception(f"WARNING: txhash has unexpected mempool sequence value: {mempool_sequence} vs expected {expected_sequence}") + if label == "A": + assert hash_str not in mempool_view + mempool_view.add(hash_str) + expected_sequence = mempool_sequence + 1 + elif label == "R": + assert hash_str in mempool_view + mempool_view.remove(hash_str) + expected_sequence = mempool_sequence + 1 + elif label == "C": + # (Attempt to) remove all txids from known block connects + block_txids = self.nodes[0].getblock(hash_str)["tx"][1:] + for txid in block_txids: + if txid in mempool_view: + expected_sequence += 1 + mempool_view.remove(txid) + expected_sequence -= r_gap + r_gap = 0 + elif label == "D": + # Not useful for mempool tracking per se + continue + else: + raise Exception("Unexpected ZMQ sequence label!") + + assert_equal(self.nodes[0].getrawmempool(), [final_txid]) + assert_equal(self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"], expected_sequence) + + # 5) If you miss a zmq/mempool sequence number, go back to step (2) + + self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE) + def test_multiple_interfaces(self): - import zmq # Set up two subscribers with different addresses subscribers = [] for i in range(2): From 99c730f0f33074dcb7163b3e5a3938ff26fc4104 Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Sun, 17 Jan 2021 02:44:45 +0100 Subject: [PATCH 04/10] merge bitcoin#20953: dedup zmq test setup code (node restart, topics subscription) --- test/functional/interface_zmq.py | 106 +++++++++++-------------------- 1 file changed, 37 insertions(+), 69 deletions(-) diff --git a/test/functional/interface_zmq.py b/test/functional/interface_zmq.py index 88ecc7186d4a3..b7b3acdd65a52 100755 --- a/test/functional/interface_zmq.py +++ b/test/functional/interface_zmq.py @@ -94,6 +94,28 @@ def run_test(self): self.log.debug("Destroying ZMQ context") self.ctx.destroy(linger=None) + # Restart node with the specified zmq notifications enabled, subscribe to + # all of them and return the corresponding ZMQSubscriber objects. + def setup_zmq_test(self, services, recv_timeout=60, connect_nodes=False): + subscribers = [] + for topic, address in services: + socket = self.ctx.socket(zmq.SUB) + socket.set(zmq.RCVTIMEO, recv_timeout*1000) + subscribers.append(ZMQSubscriber(socket, topic.encode())) + + self.restart_node(0, ["-zmqpub%s=%s" % (topic, address) for topic, address in services]) + + if connect_nodes: + self.connect_nodes(0, 1) + + for i, sub in enumerate(subscribers): + sub.socket.connect(services[i][1]) + + # Relax so that the subscribers are ready before publishing zmq messages + sleep(0.2) + + return subscribers + def test_basic(self): # Invalid zmq arguments don't take down the node, see #17185. @@ -101,28 +123,15 @@ def test_basic(self): self.zmq_context = zmq.Context() address = 'tcp://127.0.0.1:28332' - sockets = [] - subs = [] - services = [b"hashblock", b"hashtx", b"rawblock", b"rawtx"] - for service in services: - sockets.append(self.ctx.socket(zmq.SUB)) - sockets[-1].set(zmq.RCVTIMEO, 60000) - subs.append(ZMQSubscriber(sockets[-1], service)) - - # Subscribe to all available topics. + subs = self.setup_zmq_test( + [(topic, address) for topic in ["hashblock", "hashtx", "rawblock", "rawtx"]], + connect_nodes=True) + hashblock = subs[0] hashtx = subs[1] rawblock = subs[2] rawtx = subs[3] - self.restart_node(0, ["-zmqpub%s=%s" % (sub.topic.decode(), address) for sub in [hashblock, hashtx, rawblock, rawtx]]) - self.connect_nodes(0, 1) - for socket in sockets: - socket.connect(address) - - # Relax so that the subscriber is ready before publishing zmq messages - sleep(0.2) - num_blocks = 5 self.log.info("Generate %(n)d blocks (and %(n)d coinbase txes)" % {"n": num_blocks}) genhashes = self.nodes[0].generatetoaddress(num_blocks, ADDRESS_BCRT1_UNSPENDABLE) @@ -190,25 +199,10 @@ def test_reorg(self): address = 'tcp://127.0.0.1:28333' - services = [b"hashblock", b"hashtx"] - sockets = [] - subs = [] - for service in services: - sockets.append(self.ctx.socket(zmq.SUB)) - # 2 second timeout to check end of notifications - sockets[-1].set(zmq.RCVTIMEO, 2000) - subs.append(ZMQSubscriber(sockets[-1], service)) - - # Subscribe to all available topics. - hashblock = subs[0] - hashtx = subs[1] - # Should only notify the tip if a reorg occurs - self.restart_node(0, ["-zmqpub%s=%s" % (sub.topic.decode(), address) for sub in [hashblock, hashtx]]) - for socket in sockets: - socket.connect(address) - # Relax so that the subscriber is ready before publishing zmq messages - sleep(0.2) + hashblock, hashtx = self.setup_zmq_test( + [(topic, address) for topic in ["hashblock", "hashtx"]], + recv_timeout=2) # 2 second timeout to check end of notifications # Generate 1 block in nodes[0] with 1 mempool tx and receive all notifications payment_txid = self.nodes[0].sendtoaddress(self.nodes[0].getnewaddress(), 1.0) @@ -256,15 +250,7 @@ def test_sequence(self): <32-byte hash>A<8-byte LE uint> : Transactionhash added mempool """ self.log.info("Testing 'sequence' publisher") - address = 'tcp://127.0.0.1:28333' - socket = self.ctx.socket(zmq.SUB) - socket.set(zmq.RCVTIMEO, 60000) - seq = ZMQSubscriber(socket, b'sequence') - - self.restart_node(0, ['-zmqpub%s=%s' % (seq.topic.decode(), address)]) - socket.connect(address) - # Relax so that the subscriber is ready before publishing zmq messages - sleep(0.2) + [seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")]) # Mempool sequence number starts at 1 seq_num = 1 @@ -399,16 +385,7 @@ def test_mempool_sync(self): return self.log.info("Testing 'mempool sync' usage of sequence notifier") - address = 'tcp://127.0.0.1:28333' - socket = self.ctx.socket(zmq.SUB) - socket.set(zmq.RCVTIMEO, 60000) - seq = ZMQSubscriber(socket, b'sequence') - - self.restart_node(0, ['-zmqpub%s=%s' % (seq.topic.decode(), address)]) - self.connect_nodes(0, 1) - socket.connect(address) - # Relax so that the subscriber is ready before publishing zmq messages - sleep(0.2) + [seq] = self.setup_zmq_test([("sequence", "tcp://127.0.0.1:28333")], connect_nodes=True) # In-memory counter, should always start at 1 next_mempool_seq = self.nodes[0].getrawmempool(mempool_sequence=True)["mempool_sequence"] @@ -506,26 +483,17 @@ def test_mempool_sync(self): def test_multiple_interfaces(self): # Set up two subscribers with different addresses - subscribers = [] - for i in range(2): - address = 'tcp://127.0.0.1:%d' % (28334 + i) - socket = self.ctx.socket(zmq.SUB) - socket.set(zmq.RCVTIMEO, 60000) - hashblock = ZMQSubscriber(socket, b"hashblock") - socket.connect(address) - subscribers.append({'address': address, 'hashblock': hashblock}) - - self.restart_node(0, ['-zmqpub%s=%s' % (subscriber['hashblock'].topic.decode(), subscriber['address']) for subscriber in subscribers]) - - # Relax so that the subscriber is ready before publishing zmq messages - sleep(0.2) + subscribers = self.setup_zmq_test([ + ("hashblock", "tcp://127.0.0.1:28334"), + ("hashblock", "tcp://127.0.0.1:28335"), + ]) # Generate 1 block in nodes[0] and receive all notifications self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE) # Should receive the same block hash on both subscribers - assert_equal(self.nodes[0].getbestblockhash(), subscribers[0]['hashblock'].receive().hex()) - assert_equal(self.nodes[0].getbestblockhash(), subscribers[1]['hashblock'].receive().hex()) + assert_equal(self.nodes[0].getbestblockhash(), subscribers[0].receive().hex()) + assert_equal(self.nodes[0].getbestblockhash(), subscribers[1].receive().hex()) if __name__ == '__main__': From 5e87efd04b06f71613bbf886691b4e81d2b734a7 Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Sat, 28 Nov 2020 18:30:32 +0100 Subject: [PATCH 05/10] merge bitcoin#20523: deduplicate 'sequence' publisher message creation/sending --- src/zmq/zmqpublishnotifier.cpp | 41 +++++++++++++++------------------- 1 file changed, 18 insertions(+), 23 deletions(-) diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp index 5b863d426c5e2..bbeb4c117d093 100644 --- a/src/zmq/zmqpublishnotifier.cpp +++ b/src/zmq/zmqpublishnotifier.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include @@ -352,51 +353,45 @@ bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &tr return SendZmqMessage(MSG_RAWTX, &(*ss.begin()), ss.size()); } -// TODO: Dedup this code to take label char, log string +// Helper function to send a 'sequence' topic message with the following structure: +// <32-byte hash> | <1-byte label> | <8-byte LE sequence> (optional) +static bool SendSequenceMsg(CZMQAbstractPublishNotifier& notifier, uint256 hash, char label, std::optional sequence = {}) +{ + unsigned char data[sizeof(hash) + sizeof(label) + sizeof(uint64_t)]; + for (unsigned int i = 0; i < sizeof(hash); ++i) { + data[sizeof(hash) - 1 - i] = hash.begin()[i]; + } + data[sizeof(hash)] = label; + if (sequence) WriteLE64(data + sizeof(hash) + sizeof(label), *sequence); + return notifier.SendZmqMessage(MSG_SEQUENCE, data, sequence ? sizeof(data) : sizeof(hash) + sizeof(label)); +} + bool CZMQPublishSequenceNotifier::NotifyBlockConnect(const CBlockIndex *pindex) { uint256 hash = pindex->GetBlockHash(); LogPrint(BCLog::ZMQ, "zmq: Publish sequence block connect %s to %s\n", hash.GetHex(), this->address); - char data[sizeof(uint256)+1]; - for (unsigned int i = 0; i < sizeof(uint256); i++) - data[sizeof(uint256) - 1 - i] = hash.begin()[i]; - data[sizeof(data) - 1] = 'C'; // Block (C)onnect - return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data)); + return SendSequenceMsg(*this, hash, /* Block (C)onnect */ 'C'); } bool CZMQPublishSequenceNotifier::NotifyBlockDisconnect(const CBlockIndex *pindex) { uint256 hash = pindex->GetBlockHash(); LogPrint(BCLog::ZMQ, "zmq: Publish sequence block disconnect %s to %s\n", hash.GetHex(), this->address); - char data[sizeof(uint256)+1]; - for (unsigned int i = 0; i < sizeof(uint256); i++) - data[sizeof(uint256) - 1 - i] = hash.begin()[i]; - data[sizeof(data) - 1] = 'D'; // Block (D)isconnect - return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data)); + return SendSequenceMsg(*this, hash, /* Block (D)isconnect */ 'D'); } bool CZMQPublishSequenceNotifier::NotifyTransactionAcceptance(const CTransaction &transaction, uint64_t mempool_sequence) { uint256 hash = transaction.GetHash(); LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool acceptance %s to %s\n", hash.GetHex(), this->address); - unsigned char data[sizeof(uint256)+sizeof(mempool_sequence)+1]; - for (unsigned int i = 0; i < sizeof(uint256); i++) - data[sizeof(uint256) - 1 - i] = hash.begin()[i]; - data[sizeof(uint256)] = 'A'; // Mempool (A)cceptance - WriteLE64(data+sizeof(uint256)+1, mempool_sequence); - return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data)); + return SendSequenceMsg(*this, hash, /* Mempool (A)cceptance */ 'A', mempool_sequence); } bool CZMQPublishSequenceNotifier::NotifyTransactionRemoval(const CTransaction &transaction, uint64_t mempool_sequence) { uint256 hash = transaction.GetHash(); LogPrint(BCLog::ZMQ, "zmq: Publish hashtx mempool removal %s to %s\n", hash.GetHex(), this->address); - unsigned char data[sizeof(uint256)+sizeof(mempool_sequence)+1]; - for (unsigned int i = 0; i < sizeof(uint256); i++) - data[sizeof(uint256) - 1 - i] = hash.begin()[i]; - data[sizeof(uint256)] = 'R'; // Mempool (R)emoval - WriteLE64(data+sizeof(uint256)+1, mempool_sequence); - return SendZmqMessage(MSG_SEQUENCE, data, sizeof(data)); + return SendSequenceMsg(*this, hash, /* Mempool (R)emoval */ 'R', mempool_sequence); } bool CZMQPublishRawTransactionLockNotifier::NotifyTransactionLock(const CTransactionRef& transaction, const std::shared_ptr& islock) From 7b0c725c594f9f0ee85c02dc20bd8adce7d9ac85 Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Sat, 23 Jan 2021 22:19:15 +0100 Subject: [PATCH 06/10] merge bitcoin#21008: fix zmq test flakiness, improve speed --- test/functional/interface_zmq.py | 76 +++++++++++++++++++++++--------- 1 file changed, 54 insertions(+), 22 deletions(-) diff --git a/test/functional/interface_zmq.py b/test/functional/interface_zmq.py index b7b3acdd65a52..2d11984ea5784 100755 --- a/test/functional/interface_zmq.py +++ b/test/functional/interface_zmq.py @@ -33,28 +33,31 @@ def dashhash_reversed(byte_str): class ZMQSubscriber: def __init__(self, socket, topic): - self.sequence = 0 + self.sequence = None # no sequence number received yet self.socket = socket self.topic = topic self.socket.setsockopt(zmq.SUBSCRIBE, self.topic) - def receive(self): + # Receive message from publisher and verify that topic and sequence match + def _receive_from_publisher_and_check(self): topic, body, seq = self.socket.recv_multipart() # Topic should match the subscriber topic. assert_equal(topic, self.topic) # Sequence should be incremental. - assert_equal(struct.unpack(' Date: Sat, 27 Feb 2021 18:32:48 +0100 Subject: [PATCH 07/10] merge bitcoin#21310: fix sync-up by matching notification to generated block --- test/functional/interface_zmq.py | 42 ++++++++++++++++++++++++++------ 1 file changed, 34 insertions(+), 8 deletions(-) diff --git a/test/functional/interface_zmq.py b/test/functional/interface_zmq.py index 2d11984ea5784..929ba467c1f7b 100755 --- a/test/functional/interface_zmq.py +++ b/test/functional/interface_zmq.py @@ -68,6 +68,31 @@ def receive_sequence(self): return (hash, label, mempool_sequence) +class ZMQTestSetupBlock: + """Helper class for setting up a ZMQ test via the "sync up" procedure. + Generates a block on the specified node on instantiation and provides a + method to check whether a ZMQ notification matches, i.e. the event was + caused by this generated block. Assumes that a notification either contains + the generated block's hash, it's (coinbase) transaction id, the raw block or + raw transaction data. + """ + + def __init__(self, node): + self.block_hash = node.generate(1)[0] + coinbase = node.getblock(self.block_hash, 2)['tx'][0] + self.tx_hash = coinbase['txid'] + self.raw_tx = coinbase['hex'] + self.raw_block = node.getblock(self.block_hash, 0) + + def caused_notification(self, notification): + return ( + self.block_hash in notification + or self.tx_hash in notification + or self.raw_block in notification + or self.raw_tx in notification + ) + + class ZMQTest (BitcoinTestFramework): def set_test_params(self): self.num_nodes = 2 @@ -117,17 +142,18 @@ def setup_zmq_test(self, services, *, recv_timeout=60, sync_blocks=True): # Ensure that all zmq publisher notification interfaces are ready by # running the following "sync up" procedure: # 1. Generate a block on the node - # 2. Try to receive a notification on all subscribers - # 3. If all subscribers get a message within the timeout (1 second), + # 2. Try to receive the corresponding notification on all subscribers + # 3. If all subscribers get the message within the timeout (1 second), # we are done, otherwise repeat starting from step 1 for sub in subscribers: sub.socket.set(zmq.RCVTIMEO, 1000) while True: - self.nodes[0].generate(1) + test_block = ZMQTestSetupBlock(self.nodes[0]) recv_failed = False for sub in subscribers: try: - sub.receive() + while not test_block.caused_notification(sub.receive().hex()): + self.log.debug("Ignoring sync-up notification for previously generated block.") except zmq.error.Again: self.log.debug("Didn't receive sync-up notification, trying again.") recv_failed = True @@ -345,7 +371,7 @@ def test_sequence(self): block_count = self.nodes[0].getblockcount() best_hash = self.nodes[0].getbestblockhash() self.nodes[0].invalidateblock(best_hash) - sleep(2) # Bit of room to make sure transaction things happened + sleep(2) # Bit of room to make sure transaction things happened # Make sure getrawmempool mempool_sequence results aren't "queued" but immediately reflective # of the time they were gathered. @@ -393,8 +419,8 @@ def test_sequence(self): assert_equal(label, "A") # More transactions to be simply mined for i in range(len(more_tx)): - assert_equal((more_tx[i], "A", mempool_seq), seq.receive_sequence()) - mempool_seq += 1 + assert_equal((more_tx[i], "A", mempool_seq), seq.receive_sequence()) + mempool_seq += 1 mempool_seq += 1 assert_equal((tip, "C", None), seq.receive_sequence()) @@ -403,7 +429,7 @@ def test_sequence(self): assert_equal((orig_txid_2, "A", mempool_seq), seq.receive_sequence()) mempool_seq += 1 self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE) - self.sync_all() # want to make sure we didn't break "consensus" for other tests + self.sync_all() # want to make sure we didn't break "consensus" for other tests def test_mempool_sync(self): """ From 2965093c4aabef46b61f9eda5498ed544845ba9e Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Wed, 26 May 2021 19:18:58 +0200 Subject: [PATCH 08/10] merge bitcoin#22079: Add support to listen on IPv6 addresses --- doc/zmq.md | 4 ++++ src/zmq/zmqpublishnotifier.cpp | 24 ++++++++++++++++++++++++ test/functional/interface_zmq.py | 22 +++++++++++++++++++++- 3 files changed, 49 insertions(+), 1 deletion(-) diff --git a/doc/zmq.md b/doc/zmq.md index 96384effb432d..7ebee938c1cc0 100644 --- a/doc/zmq.md +++ b/doc/zmq.md @@ -112,6 +112,7 @@ For instance: $ dashd -zmqpubhashtx=tcp://127.0.0.1:28332 \ -zmqpubhashtx=tcp://192.168.1.2:28332 \ + -zmqpubhashblock="tcp://[::1]:28333" \ -zmqpubrawtx=ipc:///tmp/dashd.tx.raw \ -zmqpubhashtxhwm=10000 @@ -153,6 +154,9 @@ Setting the keepalive values appropriately for your operating environment may improve connectivity in situations where long-lived connections are silently dropped by network middle boxes. +Also, the socket's ZMQ_IPV6 option is enabled to accept connections from IPv6 +hosts as well. If needed, this option has to be set on the client side too. + ## Remarks From the perspective of dashd, the ZeroMQ socket is write-only; PUB diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp index bbeb4c117d093..8c4cf77da7647 100644 --- a/src/zmq/zmqpublishnotifier.cpp +++ b/src/zmq/zmqpublishnotifier.cpp @@ -6,6 +6,7 @@ #include #include +#include #include #include #include @@ -92,6 +93,20 @@ static int zmq_send_multipart(void *sock, const void* data, size_t size, ...) return 0; } +static bool IsZMQAddressIPV6(const std::string &zmq_address) +{ + const std::string tcp_prefix = "tcp://"; + const size_t tcp_index = zmq_address.rfind(tcp_prefix); + const size_t colon_index = zmq_address.rfind(":"); + if (tcp_index == 0 && colon_index != std::string::npos) { + const std::string ip = zmq_address.substr(tcp_prefix.length(), colon_index - tcp_prefix.length()); + CNetAddr addr; + LookupHost(ip, addr, false); + if (addr.IsIPv6()) return true; + } + return false; +} + bool CZMQAbstractPublishNotifier::Initialize(void *pcontext) { assert(!psocket); @@ -126,6 +141,15 @@ bool CZMQAbstractPublishNotifier::Initialize(void *pcontext) return false; } + // On some systems (e.g. OpenBSD) the ZMQ_IPV6 must not be enabled, if the address to bind isn't IPv6 + const int enable_ipv6 { IsZMQAddressIPV6(address) ? 1 : 0}; + rc = zmq_setsockopt(psocket, ZMQ_IPV6, &enable_ipv6, sizeof(enable_ipv6)); + if (rc != 0) { + zmqError("Failed to set ZMQ_IPV6"); + zmq_close(psocket); + return false; + } + rc = zmq_bind(psocket, address.c_str()); if (rc != 0) { diff --git a/test/functional/interface_zmq.py b/test/functional/interface_zmq.py index 929ba467c1f7b..aebc07f39b751 100755 --- a/test/functional/interface_zmq.py +++ b/test/functional/interface_zmq.py @@ -17,6 +17,7 @@ assert_equal, assert_raises_rpc_error, ) +from test_framework.netutil import test_ipv6_local from time import sleep # Test may be skipped and not have zmq installed @@ -120,6 +121,7 @@ def run_test(self): self.test_mempool_sync() self.test_reorg() self.test_multiple_interfaces() + self.test_ipv6() finally: # Destroy the ZMQ context. self.log.debug("Destroying ZMQ context") @@ -127,10 +129,12 @@ def run_test(self): # Restart node with the specified zmq notifications enabled, subscribe to # all of them and return the corresponding ZMQSubscriber objects. - def setup_zmq_test(self, services, *, recv_timeout=60, sync_blocks=True): + def setup_zmq_test(self, services, *, recv_timeout=60, sync_blocks=True, ipv6=False): subscribers = [] for topic, address in services: socket = self.ctx.socket(zmq.SUB) + if ipv6: + socket.setsockopt(zmq.IPV6, 1) subscribers.append(ZMQSubscriber(socket, topic.encode())) self.restart_node(0, ["-zmqpub%s=%s" % (topic, address) for topic, address in services] + @@ -553,6 +557,22 @@ def test_multiple_interfaces(self): assert_equal(self.nodes[0].getbestblockhash(), subscribers[0].receive().hex()) assert_equal(self.nodes[0].getbestblockhash(), subscribers[1].receive().hex()) + def test_ipv6(self): + if not test_ipv6_local(): + self.log.info("Skipping IPv6 test, because IPv6 is not supported.") + return + self.log.info("Testing IPv6") + # Set up subscriber using IPv6 loopback address + subscribers = self.setup_zmq_test([ + ("hashblock", "tcp://[::1]:28332") + ], ipv6=True) + + # Generate 1 block in nodes[0] + self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE) + + # Should receive the same block hash + assert_equal(self.nodes[0].getbestblockhash(), subscribers[0].receive().hex()) + if __name__ == '__main__': ZMQTest().main() From 8ecc22f51f456eb9c9eef6712333bc20726ac4bf Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Mon, 8 Nov 2021 19:53:29 -0800 Subject: [PATCH 09/10] merge bitcoin#23471: Improve ZMQ documentation --- doc/zmq.md | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/doc/zmq.md b/doc/zmq.md index 7ebee938c1cc0..80a4e98ede173 100644 --- a/doc/zmq.md +++ b/doc/zmq.md @@ -119,9 +119,11 @@ For instance: Each PUB notification has a topic and body, where the header corresponds to the notification type. For instance, for the notification `-zmqpubhashtx` the topic is `hashtx` (no null -terminator) and the body is the transaction hash (32 -bytes) for all but `sequence` topic. For `sequence`, the body -is structured as the following based on the type of message: +terminator). These options can also be provided in dash.conf. + +The topics are: + +`sequence`: the body is structured as the following based on the type of message: <32-byte hash>C : Blockhash connected <32-byte hash>D : Blockhash disconnected @@ -130,7 +132,24 @@ is structured as the following based on the type of message: Where the 8-byte uints correspond to the mempool sequence number. -These options can also be provided in dash.conf. +`rawtx`: Notifies about all transactions, both when they are added to mempool or when a new block arrives. This means a transaction could be published multiple times. First, when it enters the mempool and then again in each block that includes it. The messages are ZMQ multipart messages with three parts. The first part is the topic (`rawtx`), the second part is the serialized transaction, and the last part is a sequence number (representing the message count to detect lost messages). + + | rawtx | | + +`hashtx`: Notifies about all transactions, both when they are added to mempool or when a new block arrives. This means a transaction could be published multiple times. First, when it enters the mempool and then again in each block that includes it. The messages are ZMQ multipart messages with three parts. The first part is the topic (`hashtx`), the second part is the 32-byte transaction hash, and the last part is a sequence number (representing the message count to detect lost messages). + + | hashtx | <32-byte transaction hash in Little Endian> | + + +`rawblock`: Notifies when the chain tip is updated. Messages are ZMQ multipart messages with three parts. The first part is the topic (`rawblock`), the second part is the serialized block, and the last part is a sequence number (representing the message count to detect lost messages). + + | rawblock | | + +`hashblock`: Notifies when the chain tip is updated. Messages are ZMQ multipart messages with three parts. The first part is the topic (`hashblock`), the second part is the 32-byte block hash, and the last part is a sequence number (representing the message count to detect lost messages). + + | hashblock | <32-byte block hash in Little Endian> | + +**_NOTE:_** Note that the 32-byte hashes are in Little Endian and not in the Big Endian format that the RPC interface and block explorers use to display transaction and block hashes. ZeroMQ endpoint specifiers for TCP (and others) are documented in the [ZeroMQ API](http://api.zeromq.org/4-0:_start). From b75e83b298377f59e242479fc243a457ff04c2e0 Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Mon, 31 Jan 2022 17:03:54 +0100 Subject: [PATCH 10/10] merge bitcoin#24218: Fix implicit-integer-sign-change --- src/zmq/zmqpublishnotifier.cpp | 10 ++++++---- test/sanitizer_suppressions/ubsan | 1 - 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp index 8c4cf77da7647..bebe1003e34fa 100644 --- a/src/zmq/zmqpublishnotifier.cpp +++ b/src/zmq/zmqpublishnotifier.cpp @@ -226,9 +226,10 @@ bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex) { uint256 hash = pindex->GetBlockHash(); LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s to %s\n", hash.GetHex(), this->address); - char data[32]; - for (unsigned int i = 0; i < 32; i++) + uint8_t data[32]; + for (unsigned int i = 0; i < 32; i++) { data[31 - i] = hash.begin()[i]; + } return SendZmqMessage(MSG_HASHBLOCK, data, 32); } @@ -246,9 +247,10 @@ bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &t { uint256 hash = transaction.GetHash(); LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s to %s\n", hash.GetHex(), this->address); - char data[32]; - for (unsigned int i = 0; i < 32; i++) + uint8_t data[32]; + for (unsigned int i = 0; i < 32; i++) { data[31 - i] = hash.begin()[i]; + } return SendZmqMessage(MSG_HASHTX, data, 32); } diff --git a/test/sanitizer_suppressions/ubsan b/test/sanitizer_suppressions/ubsan index af17ca71d37b1..1fd1b388193c6 100644 --- a/test/sanitizer_suppressions/ubsan +++ b/test/sanitizer_suppressions/ubsan @@ -75,7 +75,6 @@ implicit-integer-sign-change:txmempool.cpp implicit-integer-sign-change:util/strencodings.cpp implicit-integer-sign-change:util/strencodings.h implicit-integer-sign-change:validation.cpp -implicit-integer-sign-change:zmq/zmqpublishnotifier.cpp implicit-signed-integer-truncation,implicit-integer-sign-change:chain.h implicit-signed-integer-truncation,implicit-integer-sign-change:test/skiplist_tests.cpp implicit-signed-integer-truncation:addrman.cpp