From a0912f8107c7f7302e2fe5932db9857330bd8c17 Mon Sep 17 00:00:00 2001 From: Arkadiusz Osowski Date: Mon, 30 Dec 2024 08:18:11 +0100 Subject: [PATCH] feat: add group test --- cmd/arc/services/blocktx.go | 38 +++--- .../bcnet/mcast/{listner.go => listener.go} | 18 +-- internal/multicast/group.go | 3 +- .../multicast/mocks/message_handler_mock.go | 120 ++++++++++++++++++ internal/multicast/multicast_mocks.go | 3 + internal/multicast/tests/group_test.go | 45 +++++++ 6 files changed, 198 insertions(+), 29 deletions(-) rename internal/blocktx/bcnet/mcast/{listner.go => listener.go} (80%) create mode 100644 internal/multicast/mocks/message_handler_mock.go create mode 100644 internal/multicast/multicast_mocks.go create mode 100644 internal/multicast/tests/group_test.go diff --git a/cmd/arc/services/blocktx.go b/cmd/arc/services/blocktx.go index 643112466..53506ebda 100644 --- a/cmd/arc/services/blocktx.go +++ b/cmd/arc/services/blocktx.go @@ -40,14 +40,14 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err btxConfig := arcConfig.Blocktx var ( - blockStore store.BlocktxStore - mqClient blocktx.MessageQueueClient - processor *blocktx.Processor - pm *p2p.PeerManager - mcastListner *mcast.Listner - server *blocktx.Server - healthServer *grpc_opts.GrpcServer - workers *blocktx.BackgroundWorkers + blockStore store.BlocktxStore + mqClient blocktx.MessageQueueClient + processor *blocktx.Processor + pm *p2p.PeerManager + mcastListener *mcast.Listener + server *blocktx.Server + healthServer *grpc_opts.GrpcServer + workers *blocktx.BackgroundWorkers err error ) @@ -75,7 +75,7 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err stopFn := func() { logger.Info("Shutting down blocktx") - disposeBlockTx(logger, server, processor, pm, mcastListner, mqClient, blockStore, healthServer, workers, shutdownFns) + disposeBlockTx(logger, server, processor, pm, mcastListener, mqClient, blockStore, healthServer, workers, shutdownFns) logger.Info("Shutdown complete") } @@ -145,7 +145,7 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err return nil, fmt.Errorf("failed to start prometheus: %v", err) } - pm, mcastListner, err = setupBcNetworkCommunication(logger, arcConfig, blockStore, blockRequestCh, blockProcessCh) + pm, mcastListener, err = setupBcNetworkCommunication(logger, arcConfig, blockStore, blockRequestCh, blockProcessCh) if err != nil { stopFn() return nil, fmt.Errorf("failed to establish connection with network: %v", err) @@ -209,7 +209,7 @@ func NewBlocktxStore(logger *slog.Logger, dbConfig *config.DbConfig, tracingConf return s, err } -func setupBcNetworkCommunication(l *slog.Logger, arcConfig *config.ArcConfig, store store.BlocktxStore, blockRequestCh chan<- blocktx_p2p.BlockRequest, blockProcessCh chan<- *bcnet.BlockMessage) (manager *p2p.PeerManager, mcastListner *mcast.Listner, err error) { +func setupBcNetworkCommunication(l *slog.Logger, arcConfig *config.ArcConfig, store store.BlocktxStore, blockRequestCh chan<- blocktx_p2p.BlockRequest, blockProcessCh chan<- *bcnet.BlockMessage) (manager *p2p.PeerManager, mcastListener *mcast.Listener, err error) { defer func() { // cleanup on error if err == nil { @@ -220,8 +220,8 @@ func setupBcNetworkCommunication(l *slog.Logger, arcConfig *config.ArcConfig, st manager.Shutdown() } - if mcastListner != nil { - mcastListner.Disconnect() + if mcastListener != nil { + mcastListener.Disconnect() } }() @@ -265,12 +265,12 @@ func setupBcNetworkCommunication(l *slog.Logger, arcConfig *config.ArcConfig, st // connect to mcast if cfg.Mode == "hybrid" { if cfg.Mcast == nil { - return manager, mcastListner, errors.New("mcast config is required") + return manager, mcastListener, errors.New("mcast config is required") } // TODO: add net interfaces - mcastListner = mcast.NewMcastListner(l, cfg.Mcast.McastBlock.Address, network, store, blockProcessCh) - ok := mcastListner.Connect() + mcastListener = mcast.NewMcastListener(l, cfg.Mcast.McastBlock.Address, network, store, blockProcessCh) + ok := mcastListener.Connect() if !ok { return manager, nil, fmt.Errorf("error connecting to mcast %s: %w", cfg.Mcast.McastBlock, err) } @@ -325,7 +325,7 @@ func connectToPeers(l *slog.Logger, network wire.BitcoinNet, msgHandler p2p.Mess } func disposeBlockTx(l *slog.Logger, server *blocktx.Server, processor *blocktx.Processor, - pm *p2p.PeerManager, mcastListner *mcast.Listner, mqClient blocktx.MessageQueueClient, + pm *p2p.PeerManager, mcastListener *mcast.Listener, mqClient blocktx.MessageQueueClient, store store.BlocktxStore, healthServer *grpc_opts.GrpcServer, workers *blocktx.BackgroundWorkers, shutdownFns []func(), ) { @@ -351,8 +351,8 @@ func disposeBlockTx(l *slog.Logger, server *blocktx.Server, processor *blocktx.P if pm != nil { pm.Shutdown() } - if mcastListner != nil { - mcastListner.Disconnect() + if mcastListener != nil { + mcastListener.Disconnect() } if mqClient != nil { mqClient.Shutdown() diff --git a/internal/blocktx/bcnet/mcast/listner.go b/internal/blocktx/bcnet/mcast/listener.go similarity index 80% rename from internal/blocktx/bcnet/mcast/listner.go rename to internal/blocktx/bcnet/mcast/listener.go index 119e3266a..d6d9e9bd0 100644 --- a/internal/blocktx/bcnet/mcast/listner.go +++ b/internal/blocktx/bcnet/mcast/listener.go @@ -14,9 +14,9 @@ import ( var ErrUnableToCastWireMessage = errors.New("unable to cast wire.Message to blockchain.BlockMessage") -var _ multicast.MessageHandlerI = (*Listner)(nil) +var _ multicast.MessageHandlerI = (*Listener)(nil) -type Listner struct { +type Listener struct { hostname string logger *slog.Logger @@ -26,11 +26,11 @@ type Listner struct { blockGroup *multicast.Group[*bcnet.BlockMessage] } -func NewMcastListner(l *slog.Logger, addr string, network wire.BitcoinNet, store store.BlocktxStore, receiveCh chan<- *bcnet.BlockMessage) *Listner { +func NewMcastListener(l *slog.Logger, addr string, network wire.BitcoinNet, store store.BlocktxStore, receiveCh chan<- *bcnet.BlockMessage) *Listener { hostname, _ := os.Hostname() - listner := Listner{ - logger: l.With("module", "mcast-listner"), + listner := Listener{ + logger: l.With("module", "mcast-listener"), hostname: hostname, store: store, receiveCh: receiveCh, @@ -40,16 +40,16 @@ func NewMcastListner(l *slog.Logger, addr string, network wire.BitcoinNet, store return &listner } -func (l *Listner) Connect() bool { +func (l *Listener) Connect() bool { return l.blockGroup.Connect() } -func (l *Listner) Disconnect() { +func (l *Listener) Disconnect() { l.blockGroup.Disconnect() } // OnReceive should be fire & forget -func (l *Listner) OnReceive(msg wire.Message) { +func (l *Listener) OnReceive(msg wire.Message) { if msg.Command() == wire.CmdBlock { blockMsg, ok := msg.(*bcnet.BlockMessage) if !ok { @@ -82,6 +82,6 @@ func (l *Listner) OnReceive(msg wire.Message) { } // OnSend should be fire & forget -func (l *Listner) OnSend(_ wire.Message) { +func (l *Listener) OnSend(_ wire.Message) { // ignore } diff --git a/internal/multicast/group.go b/internal/multicast/group.go index 770617f1f..13369a324 100644 --- a/internal/multicast/group.go +++ b/internal/multicast/group.go @@ -126,7 +126,7 @@ func (g *Group[T]) connect() bool { return false } - conn, err := net.ListenUDP("udp6", udpAddr) + conn, err := net.ListenPacket("udp6", udpAddr.String()) if err != nil { g.logger.Error("Failed to dial node", slog.String("err", err.Error())) return false @@ -146,6 +146,7 @@ func (g *Group[T]) connect() bool { } if g.mode.Has(Write) { + g.mcastConn.dst = udpAddr g.sendMessages() } diff --git a/internal/multicast/mocks/message_handler_mock.go b/internal/multicast/mocks/message_handler_mock.go new file mode 100644 index 000000000..64da79f87 --- /dev/null +++ b/internal/multicast/mocks/message_handler_mock.go @@ -0,0 +1,120 @@ +// Code generated by moq; DO NOT EDIT. +// github.com/matryer/moq + +package mocks + +import ( + "github.com/bitcoin-sv/arc/internal/multicast" + "github.com/libsv/go-p2p/wire" + "sync" +) + +// Ensure, that MessageHandlerIMock does implement multicast.MessageHandlerI. +// If this is not the case, regenerate this file with moq. +var _ multicast.MessageHandlerI = &MessageHandlerIMock{} + +// MessageHandlerIMock is a mock implementation of multicast.MessageHandlerI. +// +// func TestSomethingThatUsesMessageHandlerI(t *testing.T) { +// +// // make and configure a mocked multicast.MessageHandlerI +// mockedMessageHandlerI := &MessageHandlerIMock{ +// OnReceiveFunc: func(msg wire.Message) { +// panic("mock out the OnReceive method") +// }, +// OnSendFunc: func(msg wire.Message) { +// panic("mock out the OnSend method") +// }, +// } +// +// // use mockedMessageHandlerI in code that requires multicast.MessageHandlerI +// // and then make assertions. +// +// } +type MessageHandlerIMock struct { + // OnReceiveFunc mocks the OnReceive method. + OnReceiveFunc func(msg wire.Message) + + // OnSendFunc mocks the OnSend method. + OnSendFunc func(msg wire.Message) + + // calls tracks calls to the methods. + calls struct { + // OnReceive holds details about calls to the OnReceive method. + OnReceive []struct { + // Msg is the msg argument value. + Msg wire.Message + } + // OnSend holds details about calls to the OnSend method. + OnSend []struct { + // Msg is the msg argument value. + Msg wire.Message + } + } + lockOnReceive sync.RWMutex + lockOnSend sync.RWMutex +} + +// OnReceive calls OnReceiveFunc. +func (mock *MessageHandlerIMock) OnReceive(msg wire.Message) { + if mock.OnReceiveFunc == nil { + panic("MessageHandlerIMock.OnReceiveFunc: method is nil but MessageHandlerI.OnReceive was just called") + } + callInfo := struct { + Msg wire.Message + }{ + Msg: msg, + } + mock.lockOnReceive.Lock() + mock.calls.OnReceive = append(mock.calls.OnReceive, callInfo) + mock.lockOnReceive.Unlock() + mock.OnReceiveFunc(msg) +} + +// OnReceiveCalls gets all the calls that were made to OnReceive. +// Check the length with: +// +// len(mockedMessageHandlerI.OnReceiveCalls()) +func (mock *MessageHandlerIMock) OnReceiveCalls() []struct { + Msg wire.Message +} { + var calls []struct { + Msg wire.Message + } + mock.lockOnReceive.RLock() + calls = mock.calls.OnReceive + mock.lockOnReceive.RUnlock() + return calls +} + +// OnSend calls OnSendFunc. +func (mock *MessageHandlerIMock) OnSend(msg wire.Message) { + if mock.OnSendFunc == nil { + panic("MessageHandlerIMock.OnSendFunc: method is nil but MessageHandlerI.OnSend was just called") + } + callInfo := struct { + Msg wire.Message + }{ + Msg: msg, + } + mock.lockOnSend.Lock() + mock.calls.OnSend = append(mock.calls.OnSend, callInfo) + mock.lockOnSend.Unlock() + mock.OnSendFunc(msg) +} + +// OnSendCalls gets all the calls that were made to OnSend. +// Check the length with: +// +// len(mockedMessageHandlerI.OnSendCalls()) +func (mock *MessageHandlerIMock) OnSendCalls() []struct { + Msg wire.Message +} { + var calls []struct { + Msg wire.Message + } + mock.lockOnSend.RLock() + calls = mock.calls.OnSend + mock.lockOnSend.RUnlock() + return calls +} diff --git a/internal/multicast/multicast_mocks.go b/internal/multicast/multicast_mocks.go new file mode 100644 index 000000000..f7a95fdb2 --- /dev/null +++ b/internal/multicast/multicast_mocks.go @@ -0,0 +1,3 @@ +package multicast + +//go:generate moq -pkg mocks -out ./mocks/message_handler_mock.go ./ MessageHandlerI diff --git a/internal/multicast/tests/group_test.go b/internal/multicast/tests/group_test.go new file mode 100644 index 000000000..be0e0f406 --- /dev/null +++ b/internal/multicast/tests/group_test.go @@ -0,0 +1,45 @@ +package multicast_test + +import ( + "log/slog" + "testing" + "time" + + "github.com/bitcoin-sv/arc/internal/multicast" + "github.com/bitcoin-sv/arc/internal/multicast/mocks" + "github.com/libsv/go-p2p/wire" + "github.com/stretchr/testify/require" +) + +var ( + addr = "[ff02::1]:1234" + bcNet = wire.TestNet +) + +func TestGroupCommunication(t *testing.T) { + // given + lMsgHandler := &mocks.MessageHandlerIMock{OnReceiveFunc: func(_ wire.Message) {}} + listener := multicast.NewGroup[*wire.MsgPing](slog.Default(), lMsgHandler, addr, multicast.Read, bcNet) + require.True(t, listener.Connect()) + defer listener.Disconnect() + + wMsgHandler := &mocks.MessageHandlerIMock{OnSendFunc: func(_ wire.Message) {}} + writer := multicast.NewGroup[*wire.MsgPing](slog.Default(), wMsgHandler, addr, multicast.Write, bcNet) + require.True(t, writer.Connect()) + defer writer.Disconnect() + + msg := wire.NewMsgPing(825906425) + + // when + writer.WriteMsg(msg) + time.Sleep(200 * time.Millisecond) + + // then + sentMsgs := wMsgHandler.OnSendCalls() + require.Len(t, sentMsgs, 1, "writer didn't send message") + require.Equal(t, msg, (sentMsgs[0].Msg).(*wire.MsgPing)) + + receivedMsgs := lMsgHandler.OnReceiveCalls() + require.Len(t, receivedMsgs, 1, "listener didn't receive message") + require.Equal(t, msg, (receivedMsgs[0].Msg).(*wire.MsgPing)) +}