diff --git a/doc/release-notes-18309.md b/doc/release-notes-18309.md new file mode 100644 index 0000000000000..b31f85eb6ed0e --- /dev/null +++ b/doc/release-notes-18309.md @@ -0,0 +1,4 @@ +Command-line options +----------------------------- + +The same ZeroMQ notification (e.g. `-zmqpubhashtx=address`) can now be specified multiple times to publish the same notification to different ZeroMQ sockets. \ No newline at end of file diff --git a/doc/zmq.md b/doc/zmq.md index ee8fd82430431..9447bf2928cc1 100644 --- a/doc/zmq.md +++ b/doc/zmq.md @@ -80,6 +80,7 @@ Currently, the following notifications are supported: The socket type is PUB and the address must be a valid ZeroMQ socket address. The same address can be used in more than one notification. +The same notification can be specified more than once. The option to set the PUB socket's outbound message high water mark (SNDHWM) may be set individually for each notification: @@ -108,6 +109,7 @@ The high water mark value must be an integer greater than or equal to 0. For instance: $ dashd -zmqpubhashtx=tcp://127.0.0.1:28332 \ + -zmqpubhashtx=tcp://192.168.1.2:28332 \ -zmqpubrawtx=ipc:///tmp/dashd.tx.raw \ -zmqpubhashtxhwm=10000 diff --git a/src/zmq/zmqnotificationinterface.cpp b/src/zmq/zmqnotificationinterface.cpp index 904b380a37ca5..073ba4ae9d9da 100644 --- a/src/zmq/zmqnotificationinterface.cpp +++ b/src/zmq/zmqnotificationinterface.cpp @@ -55,10 +55,8 @@ CZMQNotificationInterface* CZMQNotificationInterface::Create() for (const auto& entry : factories) { std::string arg("-zmq" + entry.first); - if (gArgs.IsArgSet(arg)) - { - const auto& factory = entry.second; - const std::string address = gArgs.GetArg(arg, ""); + const auto& factory = entry.second; + for (const std::string& address : gArgs.GetArgs(arg)) { std::unique_ptr notifier = factory(); notifier->SetType(entry.first); notifier->SetAddress(address); diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp index 0f4444c535085..fd10dce13c871 100644 --- a/src/zmq/zmqpublishnotifier.cpp +++ b/src/zmq/zmqpublishnotifier.cpp @@ -198,7 +198,7 @@ bool CZMQAbstractPublishNotifier::SendZmqMessage(const char *command, const void bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex) { uint256 hash = pindex->GetBlockHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s\n", hash.GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s to %s\n", hash.GetHex(), this->address); char data[32]; for (unsigned int i = 0; i < 32; i++) data[31 - i] = hash.begin()[i]; @@ -218,7 +218,7 @@ bool CZMQPublishHashChainLockNotifier::NotifyChainLock(const CBlockIndex *pindex bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction) { uint256 hash = transaction.GetHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s\n", hash.GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s to %s\n", hash.GetHex(), this->address); char data[32]; for (unsigned int i = 0; i < 32; i++) data[31 - i] = hash.begin()[i]; @@ -279,7 +279,7 @@ bool CZMQPublishHashRecoveredSigNotifier::NotifyRecoveredSig(const std::shared_p bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex) { - LogPrint(BCLog::ZMQ, "zmq: Publish rawblock %s\n", pindex->GetBlockHash().GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish rawblock %s to %s\n", pindex->GetBlockHash().GetHex(), this->address); const Consensus::Params& consensusParams = Params().GetConsensus(); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); @@ -344,7 +344,7 @@ bool CZMQPublishRawChainLockSigNotifier::NotifyChainLock(const CBlockIndex *pind bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction) { uint256 hash = transaction.GetHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s\n", hash.GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s to %s\n", hash.GetHex(), this->address); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); ss << transaction; return SendZmqMessage(MSG_RAWTX, &(*ss.begin()), ss.size()); @@ -353,7 +353,7 @@ bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &tr bool CZMQPublishRawTransactionLockNotifier::NotifyTransactionLock(const CTransactionRef& transaction, const std::shared_ptr& islock) { uint256 hash = transaction->GetHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish rawtxlock %s\n", hash.GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish rawtxlock %s to %s\n", hash.GetHex(), this->address); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); ss << *transaction; return SendZmqMessage(MSG_RAWTXLOCK, &(*ss.begin()), ss.size()); @@ -362,7 +362,7 @@ bool CZMQPublishRawTransactionLockNotifier::NotifyTransactionLock(const CTransac bool CZMQPublishRawTransactionLockSigNotifier::NotifyTransactionLock(const CTransactionRef& transaction, const std::shared_ptr& islock) { uint256 hash = transaction->GetHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish rawtxlocksig %s\n", hash.GetHex()); + LogPrint(BCLog::ZMQ, "zmq: Publish rawtxlocksig %s to %s\n", hash.GetHex(), this->address); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); ss << *transaction; ss << *islock; @@ -372,7 +372,7 @@ bool CZMQPublishRawTransactionLockSigNotifier::NotifyTransactionLock(const CTran bool CZMQPublishRawGovernanceVoteNotifier::NotifyGovernanceVote(const std::shared_ptr& vote) { uint256 nHash = vote->GetHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish rawgovernanceobject: hash = %s, vote = %d\n", nHash.ToString(), vote->ToString()); + LogPrint(BCLog::ZMQ, "zmq: Publish rawgovernanceobject: hash = %s to %s, vote = %d\n", nHash.ToString(), this->address, vote->ToString()); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); ss << *vote; return SendZmqMessage(MSG_RAWGVOTE, &(*ss.begin()), ss.size()); @@ -381,7 +381,7 @@ bool CZMQPublishRawGovernanceVoteNotifier::NotifyGovernanceVote(const std::share bool CZMQPublishRawGovernanceObjectNotifier::NotifyGovernanceObject(const std::shared_ptr& govobj) { uint256 nHash = govobj->GetHash(); - LogPrint(BCLog::ZMQ, "zmq: Publish rawgovernanceobject: hash = %s, type = %d\n", nHash.ToString(), ToUnderlying(govobj->GetObjectType())); + LogPrint(BCLog::ZMQ, "zmq: Publish rawgovernanceobject: hash = %s to %s, type = %d\n", nHash.ToString(), this->address, ToUnderlying(govobj->GetObjectType())); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); ss << *govobj; return SendZmqMessage(MSG_RAWGOBJ, &(*ss.begin()), ss.size()); diff --git a/test/functional/interface_zmq.py b/test/functional/interface_zmq.py index 322d516df373c..bf2bcba99187c 100755 --- a/test/functional/interface_zmq.py +++ b/test/functional/interface_zmq.py @@ -78,6 +78,7 @@ def setup_nodes(self): def run_test(self): try: self._zmq_test() + self.test_multiple_interfaces() finally: # Destroy the ZMQ context. self.log.debug("Destroying ZMQ context") @@ -131,5 +132,29 @@ def _zmq_test(self): assert_equal(self.nodes[1].getzmqnotifications(), []) + def test_multiple_interfaces(self): + import zmq + # Set up two subscribers with different addresses + subscribers = [] + for i in range(2): + address = 'tcp://127.0.0.1:%d' % (28334 + i) + socket = self.zmq_context.socket(zmq.SUB) + socket.set(zmq.RCVTIMEO, 60000) + hashblock = ZMQSubscriber(socket, b"hashblock") + socket.connect(address) + subscribers.append({'address': address, 'hashblock': hashblock}) + + self.restart_node(0, ['-zmqpub%s=%s' % (subscriber['hashblock'].topic.decode(), subscriber['address']) for subscriber in subscribers]) + + # Relax so that the subscriber is ready before publishing zmq messages + sleep(0.2) + + # Generate 1 block in nodes[0] and receive all notifications + self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE) + + # Should receive the same block hash on both subscribers + assert_equal(self.nodes[0].getbestblockhash(), subscribers[0]['hashblock'].receive().hex()) + assert_equal(self.nodes[0].getbestblockhash(), subscribers[1]['hashblock'].receive().hex()) + if __name__ == '__main__': ZMQTest().main()