Skip to content

Commit

Permalink
feat(ARCO-279): add custom zmq listener (#639)
Browse files Browse the repository at this point in the history
  • Loading branch information
pawellewandowski98 authored Nov 11, 2024
1 parent 262b528 commit 12acbb3
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 27 deletions.
16 changes: 5 additions & 11 deletions cmd/arc/services/metamorph.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,13 @@ import (
"log/slog"
"net/url"
"os"
"strconv"
"time"

"github.com/bitcoin-sv/arc/internal/cache"
"github.com/bitcoin-sv/arc/internal/tracing"

"github.com/libsv/go-p2p"
"github.com/ordishs/go-bitcoin"
"github.com/sirupsen/logrus"
"google.golang.org/grpc"

"github.com/bitcoin-sv/arc/config"
Expand Down Expand Up @@ -202,19 +200,15 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore
continue
}

zmq := metamorph.NewZMQ(zmqURL, statusMessageCh, logger)

port, err := strconv.Atoi(zmqURL.Port())
zmqHandler := metamorph.NewZMQHandler(context.Background(), zmqURL, logger)
zmq, err := metamorph.NewZMQ(zmqURL, statusMessageCh, zmqHandler, logger)
if err != nil {
stopFn()
return nil, fmt.Errorf("failed to parse port from peer settings: %v", err)
return nil, fmt.Errorf("failed to create ZMQ: %v", err)
}
logger.Info("Listening to ZMQ", slog.String("host", zmqURL.Hostname()), slog.String("port", zmqURL.Port()))

logger.Info("Listening to ZMQ", slog.String("host", zmqURL.Hostname()), slog.Int("port", port))

zmqLogger := logrus.New()
zmqLogger.SetFormatter(&logrus.JSONFormatter{})
err = zmq.Start(bitcoin.NewZMQ(zmqURL.Hostname(), port, zmqLogger))
err = zmq.Start()
if err != nil {
stopFn()
return nil, fmt.Errorf("failed to start ZMQ: %v", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,9 @@ func TestDoubleSpendDetection(t *testing.T) {
zmqURL, err := url.Parse("https://some-url.com")
require.NoError(t, err)

zmq := metamorph.NewZMQ(zmqURL, statusMessageChannel, logger)
err = zmq.Start(mockedZMQ)
zmq, err := metamorph.NewZMQ(zmqURL, statusMessageChannel, mockedZMQ, logger)
require.NoError(t, err)
err = zmq.Start()
require.NoError(t, err)

// give metamorph time to parse ZMQ message
Expand Down
56 changes: 46 additions & 10 deletions internal/metamorph/zmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,31 @@ import (
"github.com/libsv/go-p2p/chaincfg/chainhash"
)

var allowedTopics = []string{
"hashblock",
"hashblock2",
"hashtx",
"hashtx2",
"rawblock",
"rawblock2",
"rawtx",
"rawtx2",
"discardedfrommempool",
"removedfrommempoolblock",
"invalidtx",
}

var ErrNilZMQHandler = errors.New("zmq handler is nil")

type subscriptionRequest struct {
topic string
ch chan []string
}

type ZMQ struct {
url *url.URL
statusMessageCh chan<- *PeerTxMessage
handler ZMQI
logger *slog.Logger
}

Expand Down Expand Up @@ -56,21 +78,26 @@ type CollidingTx struct {
Size int `json:"size"`
}

func NewZMQ(zmqURL *url.URL, statusMessageCh chan<- *PeerTxMessage, logger *slog.Logger) *ZMQ {
type ZMQI interface {
Subscribe(string, chan []string) error
}

func NewZMQ(zmqURL *url.URL, statusMessageCh chan<- *PeerTxMessage, zmqHandler ZMQI, logger *slog.Logger) (*ZMQ, error) {
if zmqHandler == nil {
return nil, ErrNilZMQHandler
}

z := &ZMQ{
url: zmqURL,
statusMessageCh: statusMessageCh,
handler: zmqHandler,
logger: logger,
}

return z
}

type ZMQI interface {
Subscribe(string, chan []string) error
return z, nil
}

func (z *ZMQ) Start(zmqi ZMQI) error {
func (z *ZMQ) Start() error {
ch := make(chan []string)

const hashtxTopic = "hashtx2"
Expand Down Expand Up @@ -140,15 +167,15 @@ func (z *ZMQ) Start(zmqi ZMQI) error {
}
}()

if err := zmqi.Subscribe(hashtxTopic, ch); err != nil {
if err := z.handler.Subscribe(hashtxTopic, ch); err != nil {
return err
}

if err := zmqi.Subscribe(invalidTxTopic, ch); err != nil {
if err := z.handler.Subscribe(invalidTxTopic, ch); err != nil {
return err
}

if err := zmqi.Subscribe(discardedFromMempoolTopic, ch); err != nil {
if err := z.handler.Subscribe(discardedFromMempoolTopic, ch); err != nil {
return err
}

Expand Down Expand Up @@ -304,3 +331,12 @@ func (z *ZMQ) parseDiscardedInfo(c []string) (*ZMQDiscardFromMempool, error) {
}
return &txInfo, nil
}

func contains(s []string, e string) bool {
for _, a := range s {
if a == e {
return true
}
}
return false
}
161 changes: 161 additions & 0 deletions internal/metamorph/zmq_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package metamorph

import (
"context"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"github.com/go-zeromq/zmq4"
"log/slog"
"net/url"
"strconv"
"time"
)

type ZMQHandler struct {
address string
socket zmq4.Socket
connected bool
err error
subscriptions map[string][]chan []string
addSubscription chan subscriptionRequest
removeSubscription chan subscriptionRequest
logger *slog.Logger
}

func NewZMQHandler(ctx context.Context, zmqURL *url.URL, logger *slog.Logger) *ZMQHandler {
zmq := &ZMQHandler{
address: fmt.Sprintf("tcp://%s:%s", zmqURL.Hostname(), zmqURL.Port()),
subscriptions: make(map[string][]chan []string),
addSubscription: make(chan subscriptionRequest, 10),
removeSubscription: make(chan subscriptionRequest, 10),
logger: logger.With(slog.String("module", "zmq-handler")),
}

go zmq.start(ctx)

return zmq
}

func (zmqHandler *ZMQHandler) start(ctx context.Context) {
for {
zmqHandler.socket = zmq4.NewSub(ctx, zmq4.WithID(zmq4.SocketIdentity("sub")))
defer func() {
if zmqHandler.connected {
zmqHandler.socket.Close()
zmqHandler.connected = false
}
}()

if err := zmqHandler.socket.Dial(zmqHandler.address); err != nil {
zmqHandler.err = err
zmqHandler.logger.Error("Could not dial ZMQ", slog.String("address", zmqHandler.address), slog.String("error", err.Error()))
zmqHandler.logger.Info("Attempting to re-establish ZMQ connection in 10 seconds...")
time.Sleep(10 * time.Second)
continue
}

zmqHandler.logger.Info("ZMQ: Connecting", slog.String("address", zmqHandler.address))

for topic := range zmqHandler.subscriptions {
if err := zmqHandler.socket.SetOption(zmq4.OptionSubscribe, topic); err != nil {
zmqHandler.err = fmt.Errorf("%+v", err)
return
}
zmqHandler.logger.Info("ZMQ: Subscribed", slog.String("topic", topic))
}

OUT:
for {
select {
case <-ctx.Done():
zmqHandler.logger.Info("ZMQ: Context done, exiting")
return
case req := <-zmqHandler.addSubscription:
if err := zmqHandler.socket.SetOption(zmq4.OptionSubscribe, req.topic); err != nil {
zmqHandler.logger.Error("ZMQ: Failed to subscribe", slog.String("topic", req.topic))
} else {
zmqHandler.logger.Info("ZMQ: Subscribed", slog.String("topic", req.topic))
}

subscribers := zmqHandler.subscriptions[req.topic]
subscribers = append(subscribers, req.ch)

zmqHandler.subscriptions[req.topic] = subscribers

case req := <-zmqHandler.removeSubscription:
subscribers := zmqHandler.subscriptions[req.topic]
for i, subscriber := range subscribers {
if subscriber == req.ch {
subscribers = append(subscribers[:i], subscribers[i+1:]...)
zmqHandler.logger.Info("Removed subscription", slog.String("topic", req.topic))
break
}
}
zmqHandler.subscriptions[req.topic] = subscribers

default:
msg, err := zmqHandler.socket.Recv()
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
zmqHandler.logger.Error("zmqHandler.socket.Recv()", slog.String("error", err.Error()))
break OUT
} else {
if !zmqHandler.connected {
zmqHandler.connected = true
zmqHandler.logger.Info("ZMQ: Connection observed", slog.String("address", zmqHandler.address))
}

subscribers := zmqHandler.subscriptions[string(msg.Frames[0])]

sequence := "N/A"

if len(msg.Frames) > 2 && len(msg.Frames[2]) == 4 {
s := binary.LittleEndian.Uint32(msg.Frames[2])
sequence = strconv.FormatInt(int64(s), 10)
}

for _, subscriber := range subscribers {
subscriber <- []string{string(msg.Frames[0]), hex.EncodeToString(msg.Frames[1]), sequence}
}
}
}
}

if zmqHandler.connected {
zmqHandler.socket.Close()
zmqHandler.connected = false
}
zmqHandler.logger.Info("Attempting to re-establish ZMQ connection in 10 seconds...")
time.Sleep(10 * time.Second)
}
}

func (zmqHandler *ZMQHandler) Subscribe(topic string, ch chan []string) error {
if !contains(allowedTopics, topic) {
return fmt.Errorf("topic must be %+v, received %q", allowedTopics, topic)
}

zmqHandler.addSubscription <- subscriptionRequest{
topic: topic,
ch: ch,
}

return nil
}

func (zmqHandler *ZMQHandler) Unsubscribe(topic string, ch chan []string) error {
if !contains(allowedTopics, topic) {
return fmt.Errorf("topic must be %+v, received %q", allowedTopics, topic)
}

zmqHandler.removeSubscription <- subscriptionRequest{
topic: topic,
ch: ch,
}

return nil
}
10 changes: 6 additions & 4 deletions internal/metamorph/zmq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,11 @@ func TestZMQ(t *testing.T) {
zmqURL, err := url.Parse("https://some-url.com")
require.NoError(t, err)

sut := metamorph.NewZMQ(zmqURL, statuses, logger)
sut, err := metamorph.NewZMQ(zmqURL, statuses, mockedZMQ, logger)
require.NoError(t, err)

// when
err = sut.Start(mockedZMQ)
err = sut.Start()
require.NoError(t, err)

// then
Expand Down Expand Up @@ -123,10 +124,11 @@ func TestZMQDoubleSpend(t *testing.T) {
require.NoError(t, err)

logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))
sut := metamorph.NewZMQ(zmqURL, statuses, logger)
sut, err := metamorph.NewZMQ(zmqURL, statuses, mockedZMQ, logger)
require.NoError(t, err)

// when
err = sut.Start(mockedZMQ)
err = sut.Start()
require.NoError(t, err)

// then
Expand Down

0 comments on commit 12acbb3

Please sign in to comment.