Skip to content

Commit

Permalink
feat: add group test
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadiuszos4chain committed Dec 30, 2024
1 parent 22404d4 commit a0912f8
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 29 deletions.
38 changes: 19 additions & 19 deletions cmd/arc/services/blocktx.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err
btxConfig := arcConfig.Blocktx

var (
blockStore store.BlocktxStore
mqClient blocktx.MessageQueueClient
processor *blocktx.Processor
pm *p2p.PeerManager
mcastListner *mcast.Listner
server *blocktx.Server
healthServer *grpc_opts.GrpcServer
workers *blocktx.BackgroundWorkers
blockStore store.BlocktxStore
mqClient blocktx.MessageQueueClient
processor *blocktx.Processor
pm *p2p.PeerManager
mcastListener *mcast.Listener
server *blocktx.Server
healthServer *grpc_opts.GrpcServer
workers *blocktx.BackgroundWorkers

err error
)
Expand Down Expand Up @@ -75,7 +75,7 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err

stopFn := func() {
logger.Info("Shutting down blocktx")
disposeBlockTx(logger, server, processor, pm, mcastListner, mqClient, blockStore, healthServer, workers, shutdownFns)
disposeBlockTx(logger, server, processor, pm, mcastListener, mqClient, blockStore, healthServer, workers, shutdownFns)
logger.Info("Shutdown complete")
}

Expand Down Expand Up @@ -145,7 +145,7 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err
return nil, fmt.Errorf("failed to start prometheus: %v", err)
}

pm, mcastListner, err = setupBcNetworkCommunication(logger, arcConfig, blockStore, blockRequestCh, blockProcessCh)
pm, mcastListener, err = setupBcNetworkCommunication(logger, arcConfig, blockStore, blockRequestCh, blockProcessCh)
if err != nil {
stopFn()
return nil, fmt.Errorf("failed to establish connection with network: %v", err)
Expand Down Expand Up @@ -209,7 +209,7 @@ func NewBlocktxStore(logger *slog.Logger, dbConfig *config.DbConfig, tracingConf
return s, err
}

func setupBcNetworkCommunication(l *slog.Logger, arcConfig *config.ArcConfig, store store.BlocktxStore, blockRequestCh chan<- blocktx_p2p.BlockRequest, blockProcessCh chan<- *bcnet.BlockMessage) (manager *p2p.PeerManager, mcastListner *mcast.Listner, err error) {
func setupBcNetworkCommunication(l *slog.Logger, arcConfig *config.ArcConfig, store store.BlocktxStore, blockRequestCh chan<- blocktx_p2p.BlockRequest, blockProcessCh chan<- *bcnet.BlockMessage) (manager *p2p.PeerManager, mcastListener *mcast.Listener, err error) {
defer func() {
// cleanup on error
if err == nil {
Expand All @@ -220,8 +220,8 @@ func setupBcNetworkCommunication(l *slog.Logger, arcConfig *config.ArcConfig, st
manager.Shutdown()
}

if mcastListner != nil {
mcastListner.Disconnect()
if mcastListener != nil {
mcastListener.Disconnect()
}
}()

Expand Down Expand Up @@ -265,12 +265,12 @@ func setupBcNetworkCommunication(l *slog.Logger, arcConfig *config.ArcConfig, st
// connect to mcast
if cfg.Mode == "hybrid" {
if cfg.Mcast == nil {
return manager, mcastListner, errors.New("mcast config is required")
return manager, mcastListener, errors.New("mcast config is required")
}

// TODO: add net interfaces
mcastListner = mcast.NewMcastListner(l, cfg.Mcast.McastBlock.Address, network, store, blockProcessCh)
ok := mcastListner.Connect()
mcastListener = mcast.NewMcastListener(l, cfg.Mcast.McastBlock.Address, network, store, blockProcessCh)
ok := mcastListener.Connect()
if !ok {
return manager, nil, fmt.Errorf("error connecting to mcast %s: %w", cfg.Mcast.McastBlock, err)
}
Expand Down Expand Up @@ -325,7 +325,7 @@ func connectToPeers(l *slog.Logger, network wire.BitcoinNet, msgHandler p2p.Mess
}

func disposeBlockTx(l *slog.Logger, server *blocktx.Server, processor *blocktx.Processor,
pm *p2p.PeerManager, mcastListner *mcast.Listner, mqClient blocktx.MessageQueueClient,
pm *p2p.PeerManager, mcastListener *mcast.Listener, mqClient blocktx.MessageQueueClient,
store store.BlocktxStore, healthServer *grpc_opts.GrpcServer, workers *blocktx.BackgroundWorkers,
shutdownFns []func(),
) {
Expand All @@ -351,8 +351,8 @@ func disposeBlockTx(l *slog.Logger, server *blocktx.Server, processor *blocktx.P
if pm != nil {
pm.Shutdown()
}
if mcastListner != nil {
mcastListner.Disconnect()
if mcastListener != nil {
mcastListener.Disconnect()
}
if mqClient != nil {
mqClient.Shutdown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import (

var ErrUnableToCastWireMessage = errors.New("unable to cast wire.Message to blockchain.BlockMessage")

var _ multicast.MessageHandlerI = (*Listner)(nil)
var _ multicast.MessageHandlerI = (*Listener)(nil)

type Listner struct {
type Listener struct {
hostname string

logger *slog.Logger
Expand All @@ -26,11 +26,11 @@ type Listner struct {
blockGroup *multicast.Group[*bcnet.BlockMessage]
}

func NewMcastListner(l *slog.Logger, addr string, network wire.BitcoinNet, store store.BlocktxStore, receiveCh chan<- *bcnet.BlockMessage) *Listner {
func NewMcastListener(l *slog.Logger, addr string, network wire.BitcoinNet, store store.BlocktxStore, receiveCh chan<- *bcnet.BlockMessage) *Listener {
hostname, _ := os.Hostname()

listner := Listner{
logger: l.With("module", "mcast-listner"),
listner := Listener{
logger: l.With("module", "mcast-listener"),
hostname: hostname,
store: store,
receiveCh: receiveCh,
Expand All @@ -40,16 +40,16 @@ func NewMcastListner(l *slog.Logger, addr string, network wire.BitcoinNet, store
return &listner
}

func (l *Listner) Connect() bool {
func (l *Listener) Connect() bool {
return l.blockGroup.Connect()
}

func (l *Listner) Disconnect() {
func (l *Listener) Disconnect() {
l.blockGroup.Disconnect()
}

// OnReceive should be fire & forget
func (l *Listner) OnReceive(msg wire.Message) {
func (l *Listener) OnReceive(msg wire.Message) {
if msg.Command() == wire.CmdBlock {
blockMsg, ok := msg.(*bcnet.BlockMessage)
if !ok {
Expand Down Expand Up @@ -82,6 +82,6 @@ func (l *Listner) OnReceive(msg wire.Message) {
}

// OnSend should be fire & forget
func (l *Listner) OnSend(_ wire.Message) {
func (l *Listener) OnSend(_ wire.Message) {
// ignore
}
3 changes: 2 additions & 1 deletion internal/multicast/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (g *Group[T]) connect() bool {
return false
}

conn, err := net.ListenUDP("udp6", udpAddr)
conn, err := net.ListenPacket("udp6", udpAddr.String())
if err != nil {
g.logger.Error("Failed to dial node", slog.String("err", err.Error()))
return false
Expand All @@ -146,6 +146,7 @@ func (g *Group[T]) connect() bool {
}

if g.mode.Has(Write) {
g.mcastConn.dst = udpAddr
g.sendMessages()
}

Expand Down
120 changes: 120 additions & 0 deletions internal/multicast/mocks/message_handler_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions internal/multicast/multicast_mocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package multicast

//go:generate moq -pkg mocks -out ./mocks/message_handler_mock.go ./ MessageHandlerI
45 changes: 45 additions & 0 deletions internal/multicast/tests/group_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package multicast_test

import (
"log/slog"
"testing"
"time"

"github.com/bitcoin-sv/arc/internal/multicast"
"github.com/bitcoin-sv/arc/internal/multicast/mocks"
"github.com/libsv/go-p2p/wire"
"github.com/stretchr/testify/require"
)

var (
addr = "[ff02::1]:1234"
bcNet = wire.TestNet
)

func TestGroupCommunication(t *testing.T) {
// given
lMsgHandler := &mocks.MessageHandlerIMock{OnReceiveFunc: func(_ wire.Message) {}}
listener := multicast.NewGroup[*wire.MsgPing](slog.Default(), lMsgHandler, addr, multicast.Read, bcNet)
require.True(t, listener.Connect())
defer listener.Disconnect()

wMsgHandler := &mocks.MessageHandlerIMock{OnSendFunc: func(_ wire.Message) {}}
writer := multicast.NewGroup[*wire.MsgPing](slog.Default(), wMsgHandler, addr, multicast.Write, bcNet)
require.True(t, writer.Connect())
defer writer.Disconnect()

msg := wire.NewMsgPing(825906425)

// when
writer.WriteMsg(msg)
time.Sleep(200 * time.Millisecond)

// then
sentMsgs := wMsgHandler.OnSendCalls()
require.Len(t, sentMsgs, 1, "writer didn't send message")
require.Equal(t, msg, (sentMsgs[0].Msg).(*wire.MsgPing))

receivedMsgs := lMsgHandler.OnReceiveCalls()
require.Len(t, receivedMsgs, 1, "listener didn't receive message")
require.Equal(t, msg, (receivedMsgs[0].Msg).(*wire.MsgPing))
}

0 comments on commit a0912f8

Please sign in to comment.