Skip to content

Commit

Permalink
Merge bitcoin#18309: zmq: Add support to listen on multiple interfaces
Browse files Browse the repository at this point in the history
e66870c zmq: Append address to notify log output (nthumann)
241803d test: Add zmq test to support multiple interfaces (nthumann)
a0b2e5c doc: Add release notes to support multiple interfaces (nthumann)
b1c3f18 doc: Adjust ZMQ usage to support multiple interfaces (nthumann)
347c94f zmq: Add support to listen on multiple interfaces (Nicolas Thumann)

Pull request description:

  This PR adds support for ZeroMQ to listen on multiple interfaces, just like the RPC server.
  Currently, if you specify more than one e.g. `zmqpubhashblock` paramter, only the first one will be used. Therefore a user may be forced to listen on all interfaces (e.g. `zmqpubhashblock=0.0.0.0:28332`), which can result in an increased attack surface.
  With this PR a user can specify multiple interfaces to listen on, e.g.
  `-zmqpubhashblock=tcp://127.0.0.1:28332 -zmqpubhashblock=tcp://192.168.1.123:28332`.

ACKs for top commit:
  laanwj:
    Code review ACK e66870c
  instagibbs:
    reACK bitcoin@e66870c

Tree-SHA512: f38ab4a6ff00dc821e5f4842508cefadb701e70bb3893992c1b32049be20247c8aa9476a1f886050c5f17fe7f2ce99ee30193ce2c81a7482a5a51f8fc22300c7
  • Loading branch information
laanwj authored and PastaPastaPasta committed Dec 6, 2023
1 parent 08349ee commit e29a35a
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 12 deletions.
4 changes: 4 additions & 0 deletions doc/release-notes-18309.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Command-line options
-----------------------------

The same ZeroMQ notification (e.g. `-zmqpubhashtx=address`) can now be specified multiple times to publish the same notification to different ZeroMQ sockets.
2 changes: 2 additions & 0 deletions doc/zmq.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ Currently, the following notifications are supported:

The socket type is PUB and the address must be a valid ZeroMQ socket
address. The same address can be used in more than one notification.
The same notification can be specified more than once.

The option to set the PUB socket's outbound message high water mark
(SNDHWM) may be set individually for each notification:
Expand Down Expand Up @@ -108,6 +109,7 @@ The high water mark value must be an integer greater than or equal to 0.
For instance:

$ dashd -zmqpubhashtx=tcp://127.0.0.1:28332 \
-zmqpubhashtx=tcp://192.168.1.2:28332 \
-zmqpubrawtx=ipc:///tmp/dashd.tx.raw \
-zmqpubhashtxhwm=10000

Expand Down
6 changes: 2 additions & 4 deletions src/zmq/zmqnotificationinterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,8 @@ CZMQNotificationInterface* CZMQNotificationInterface::Create()
for (const auto& entry : factories)
{
std::string arg("-zmq" + entry.first);
if (gArgs.IsArgSet(arg))
{
const auto& factory = entry.second;
const std::string address = gArgs.GetArg(arg, "");
const auto& factory = entry.second;
for (const std::string& address : gArgs.GetArgs(arg)) {
std::unique_ptr<CZMQAbstractNotifier> notifier = factory();
notifier->SetType(entry.first);
notifier->SetAddress(address);
Expand Down
16 changes: 8 additions & 8 deletions src/zmq/zmqpublishnotifier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ bool CZMQAbstractPublishNotifier::SendZmqMessage(const char *command, const void
bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
{
uint256 hash = pindex->GetBlockHash();
LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s\n", hash.GetHex());
LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s to %s\n", hash.GetHex(), this->address);
char data[32];
for (unsigned int i = 0; i < 32; i++)
data[31 - i] = hash.begin()[i];
Expand All @@ -218,7 +218,7 @@ bool CZMQPublishHashChainLockNotifier::NotifyChainLock(const CBlockIndex *pindex
bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
{
uint256 hash = transaction.GetHash();
LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s\n", hash.GetHex());
LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s to %s\n", hash.GetHex(), this->address);
char data[32];
for (unsigned int i = 0; i < 32; i++)
data[31 - i] = hash.begin()[i];
Expand Down Expand Up @@ -279,7 +279,7 @@ bool CZMQPublishHashRecoveredSigNotifier::NotifyRecoveredSig(const std::shared_p

bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
{
LogPrint(BCLog::ZMQ, "zmq: Publish rawblock %s\n", pindex->GetBlockHash().GetHex());
LogPrint(BCLog::ZMQ, "zmq: Publish rawblock %s to %s\n", pindex->GetBlockHash().GetHex(), this->address);

const Consensus::Params& consensusParams = Params().GetConsensus();
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
Expand Down Expand Up @@ -344,7 +344,7 @@ bool CZMQPublishRawChainLockSigNotifier::NotifyChainLock(const CBlockIndex *pind
bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
{
uint256 hash = transaction.GetHash();
LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s\n", hash.GetHex());
LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s to %s\n", hash.GetHex(), this->address);
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
ss << transaction;
return SendZmqMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
Expand All @@ -353,7 +353,7 @@ bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &tr
bool CZMQPublishRawTransactionLockNotifier::NotifyTransactionLock(const CTransactionRef& transaction, const std::shared_ptr<const llmq::CInstantSendLock>& islock)
{
uint256 hash = transaction->GetHash();
LogPrint(BCLog::ZMQ, "zmq: Publish rawtxlock %s\n", hash.GetHex());
LogPrint(BCLog::ZMQ, "zmq: Publish rawtxlock %s to %s\n", hash.GetHex(), this->address);
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
ss << *transaction;
return SendZmqMessage(MSG_RAWTXLOCK, &(*ss.begin()), ss.size());
Expand All @@ -362,7 +362,7 @@ bool CZMQPublishRawTransactionLockNotifier::NotifyTransactionLock(const CTransac
bool CZMQPublishRawTransactionLockSigNotifier::NotifyTransactionLock(const CTransactionRef& transaction, const std::shared_ptr<const llmq::CInstantSendLock>& islock)
{
uint256 hash = transaction->GetHash();
LogPrint(BCLog::ZMQ, "zmq: Publish rawtxlocksig %s\n", hash.GetHex());
LogPrint(BCLog::ZMQ, "zmq: Publish rawtxlocksig %s to %s\n", hash.GetHex(), this->address);
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
ss << *transaction;
ss << *islock;
Expand All @@ -372,7 +372,7 @@ bool CZMQPublishRawTransactionLockSigNotifier::NotifyTransactionLock(const CTran
bool CZMQPublishRawGovernanceVoteNotifier::NotifyGovernanceVote(const std::shared_ptr<const CGovernanceVote>& vote)
{
uint256 nHash = vote->GetHash();
LogPrint(BCLog::ZMQ, "zmq: Publish rawgovernanceobject: hash = %s, vote = %d\n", nHash.ToString(), vote->ToString());
LogPrint(BCLog::ZMQ, "zmq: Publish rawgovernanceobject: hash = %s to %s, vote = %d\n", nHash.ToString(), this->address, vote->ToString());
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
ss << *vote;
return SendZmqMessage(MSG_RAWGVOTE, &(*ss.begin()), ss.size());
Expand All @@ -381,7 +381,7 @@ bool CZMQPublishRawGovernanceVoteNotifier::NotifyGovernanceVote(const std::share
bool CZMQPublishRawGovernanceObjectNotifier::NotifyGovernanceObject(const std::shared_ptr<const CGovernanceObject>& govobj)
{
uint256 nHash = govobj->GetHash();
LogPrint(BCLog::ZMQ, "zmq: Publish rawgovernanceobject: hash = %s, type = %d\n", nHash.ToString(), ToUnderlying(govobj->GetObjectType()));
LogPrint(BCLog::ZMQ, "zmq: Publish rawgovernanceobject: hash = %s to %s, type = %d\n", nHash.ToString(), this->address, ToUnderlying(govobj->GetObjectType()));
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
ss << *govobj;
return SendZmqMessage(MSG_RAWGOBJ, &(*ss.begin()), ss.size());
Expand Down
25 changes: 25 additions & 0 deletions test/functional/interface_zmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ def setup_nodes(self):
def run_test(self):
try:
self._zmq_test()
self.test_multiple_interfaces()
finally:
# Destroy the ZMQ context.
self.log.debug("Destroying ZMQ context")
Expand Down Expand Up @@ -131,5 +132,29 @@ def _zmq_test(self):

assert_equal(self.nodes[1].getzmqnotifications(), [])

def test_multiple_interfaces(self):
import zmq
# Set up two subscribers with different addresses
subscribers = []
for i in range(2):
address = 'tcp://127.0.0.1:%d' % (28334 + i)
socket = self.zmq_context.socket(zmq.SUB)
socket.set(zmq.RCVTIMEO, 60000)
hashblock = ZMQSubscriber(socket, b"hashblock")
socket.connect(address)
subscribers.append({'address': address, 'hashblock': hashblock})

self.restart_node(0, ['-zmqpub%s=%s' % (subscriber['hashblock'].topic.decode(), subscriber['address']) for subscriber in subscribers])

# Relax so that the subscriber is ready before publishing zmq messages
sleep(0.2)

# Generate 1 block in nodes[0] and receive all notifications
self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)

# Should receive the same block hash on both subscribers
assert_equal(self.nodes[0].getbestblockhash(), subscribers[0]['hashblock'].receive().hex())
assert_equal(self.nodes[0].getbestblockhash(), subscribers[1]['hashblock'].receive().hex())

if __name__ == '__main__':
ZMQTest().main()

0 comments on commit e29a35a

Please sign in to comment.