Skip to content

Commit

Permalink
feat: add group test
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadiuszos4chain committed Dec 30, 2024
1 parent 22404d4 commit 21082c8
Show file tree
Hide file tree
Showing 10 changed files with 237 additions and 57 deletions.
39 changes: 20 additions & 19 deletions cmd/arc/services/blocktx.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err
btxConfig := arcConfig.Blocktx

var (
blockStore store.BlocktxStore
mqClient blocktx.MessageQueueClient
processor *blocktx.Processor
pm *p2p.PeerManager
mcastListner *mcast.Listner
server *blocktx.Server
healthServer *grpc_opts.GrpcServer
workers *blocktx.BackgroundWorkers
blockStore store.BlocktxStore
mqClient blocktx.MessageQueueClient
processor *blocktx.Processor
pm *p2p.PeerManager
mcastListener *mcast.Listener
server *blocktx.Server
healthServer *grpc_opts.GrpcServer
workers *blocktx.BackgroundWorkers

err error
)
Expand Down Expand Up @@ -75,7 +75,7 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err

stopFn := func() {
logger.Info("Shutting down blocktx")
disposeBlockTx(logger, server, processor, pm, mcastListner, mqClient, blockStore, healthServer, workers, shutdownFns)
disposeBlockTx(logger, server, processor, pm, mcastListener, mqClient, blockStore, healthServer, workers, shutdownFns)
logger.Info("Shutdown complete")
}

Expand Down Expand Up @@ -145,7 +145,7 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err
return nil, fmt.Errorf("failed to start prometheus: %v", err)
}

pm, mcastListner, err = setupBcNetworkCommunication(logger, arcConfig, blockStore, blockRequestCh, blockProcessCh)
pm, mcastListener, err = setupBcNetworkCommunication(logger, arcConfig, blockStore, blockRequestCh, blockProcessCh)
if err != nil {
stopFn()
return nil, fmt.Errorf("failed to establish connection with network: %v", err)
Expand Down Expand Up @@ -209,7 +209,7 @@ func NewBlocktxStore(logger *slog.Logger, dbConfig *config.DbConfig, tracingConf
return s, err
}

func setupBcNetworkCommunication(l *slog.Logger, arcConfig *config.ArcConfig, store store.BlocktxStore, blockRequestCh chan<- blocktx_p2p.BlockRequest, blockProcessCh chan<- *bcnet.BlockMessage) (manager *p2p.PeerManager, mcastListner *mcast.Listner, err error) {
func setupBcNetworkCommunication(l *slog.Logger, arcConfig *config.ArcConfig, store store.BlocktxStore, blockRequestCh chan<- blocktx_p2p.BlockRequest, blockProcessCh chan<- *bcnet.BlockMessage) (manager *p2p.PeerManager, mcastListener *mcast.Listener, err error) {
defer func() {
// cleanup on error
if err == nil {
Expand All @@ -220,8 +220,8 @@ func setupBcNetworkCommunication(l *slog.Logger, arcConfig *config.ArcConfig, st
manager.Shutdown()
}

if mcastListner != nil {
mcastListner.Disconnect()
if mcastListener != nil {
mcastListener.Disconnect()
}
}()

Expand All @@ -239,6 +239,7 @@ func setupBcNetworkCommunication(l *slog.Logger, arcConfig *config.ArcConfig, st
if cfg.Mode == "classic" {
msgHandler = blocktx_p2p.NewMsgHandler(l, blockRequestCh, blockProcessCh)
} else if cfg.Mode == "hybrid" {
l.Info("!!! Blocktx will communicate with blockchain in HYBRID mode (via p2p and multicast groups) !!!")
msgHandler = blocktx_p2p.NewHybridMsgHandler(l, blockProcessCh)
} else {
return nil, nil, fmt.Errorf("unsupported communication type: %s", cfg.Mode)
Expand All @@ -265,12 +266,12 @@ func setupBcNetworkCommunication(l *slog.Logger, arcConfig *config.ArcConfig, st
// connect to mcast
if cfg.Mode == "hybrid" {
if cfg.Mcast == nil {
return manager, mcastListner, errors.New("mcast config is required")
return manager, mcastListener, errors.New("mcast config is required")
}

// TODO: add net interfaces
mcastListner = mcast.NewMcastListner(l, cfg.Mcast.McastBlock.Address, network, store, blockProcessCh)
ok := mcastListner.Connect()
mcastListener = mcast.NewMcastListener(l, cfg.Mcast.McastBlock.Address, network, store, blockProcessCh)
ok := mcastListener.Connect()
if !ok {
return manager, nil, fmt.Errorf("error connecting to mcast %s: %w", cfg.Mcast.McastBlock, err)
}
Expand Down Expand Up @@ -325,7 +326,7 @@ func connectToPeers(l *slog.Logger, network wire.BitcoinNet, msgHandler p2p.Mess
}

func disposeBlockTx(l *slog.Logger, server *blocktx.Server, processor *blocktx.Processor,
pm *p2p.PeerManager, mcastListner *mcast.Listner, mqClient blocktx.MessageQueueClient,
pm *p2p.PeerManager, mcastListener *mcast.Listener, mqClient blocktx.MessageQueueClient,
store store.BlocktxStore, healthServer *grpc_opts.GrpcServer, workers *blocktx.BackgroundWorkers,
shutdownFns []func(),
) {
Expand All @@ -351,8 +352,8 @@ func disposeBlockTx(l *slog.Logger, server *blocktx.Server, processor *blocktx.P
if pm != nil {
pm.Shutdown()
}
if mcastListner != nil {
mcastListner.Disconnect()
if mcastListener != nil {
mcastListener.Disconnect()
}
if mqClient != nil {
mqClient.Shutdown()
Expand Down
3 changes: 2 additions & 1 deletion cmd/arc/services/callbacker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"log/slog"
"net/http"
"os"
"reflect"
"time"

"github.com/bitcoin-sv/arc/config"
Expand Down Expand Up @@ -195,7 +196,7 @@ func dispose(l *slog.Logger, server *callbacker.Server, workers *callbacker.Back
processor.GracefulStop()
}

if mqClient != nil {
if mqClient != nil && !(reflect.ValueOf(mqClient).IsNil()) {
mqClient.Shutdown()
}

Expand Down
1 change: 1 addition & 0 deletions cmd/arc/services/metamorph.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ func setupMtmBcNetworkCommunication(l *slog.Logger, s store.MetamorphStore, arcC
if cfg.Mode == "classic" {
msgHandler = metamorph_p2p.NewMsgHandler(l, s, messageCh)
} else if cfg.Mode == "hybrid" {
l.Info("!!! Metamorph will communicate with blockchain in HYBRID mode (via p2p and multicast groups) !!!")
msgHandler = metamorph_p2p.NewHybridMsgHandler(l, messageCh)
} else {
err = fmt.Errorf("unsupported communication type: %s", cfg.Mode)
Expand Down
24 changes: 0 additions & 24 deletions config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,6 @@ func getMetamorphConfig() *MetamorphConfig {
ZMQ: 28332,
},
},
{
Host: "localhost",
Port: &PeerPortConfig{
P2P: 18334,
},
},
{
Host: "localhost",
Port: &PeerPortConfig{
P2P: 18335,
},
},
},
},
}
Expand Down Expand Up @@ -130,18 +118,6 @@ func getBlocktxConfig() *BlocktxConfig {
ZMQ: 28332,
},
},
{
Host: "localhost",
Port: &PeerPortConfig{
P2P: 18334,
},
},
{
Host: "localhost",
Port: &PeerPortConfig{
P2P: 18335,
},
},
},
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import (

var ErrUnableToCastWireMessage = errors.New("unable to cast wire.Message to blockchain.BlockMessage")

var _ multicast.MessageHandlerI = (*Listner)(nil)
var _ multicast.MessageHandlerI = (*Listener)(nil)

type Listner struct {
type Listener struct {
hostname string

logger *slog.Logger
Expand All @@ -26,11 +26,11 @@ type Listner struct {
blockGroup *multicast.Group[*bcnet.BlockMessage]
}

func NewMcastListner(l *slog.Logger, addr string, network wire.BitcoinNet, store store.BlocktxStore, receiveCh chan<- *bcnet.BlockMessage) *Listner {
func NewMcastListener(l *slog.Logger, addr string, network wire.BitcoinNet, store store.BlocktxStore, receiveCh chan<- *bcnet.BlockMessage) *Listener {
hostname, _ := os.Hostname()

listner := Listner{
logger: l.With("module", "mcast-listner"),
listner := Listener{
logger: l.With("module", "mcast-listener"),
hostname: hostname,
store: store,
receiveCh: receiveCh,
Expand All @@ -40,16 +40,16 @@ func NewMcastListner(l *slog.Logger, addr string, network wire.BitcoinNet, store
return &listner
}

func (l *Listner) Connect() bool {
func (l *Listener) Connect() bool {
return l.blockGroup.Connect()
}

func (l *Listner) Disconnect() {
func (l *Listener) Disconnect() {
l.blockGroup.Disconnect()
}

// OnReceive should be fire & forget
func (l *Listner) OnReceive(msg wire.Message) {
func (l *Listener) OnReceive(msg wire.Message) {
if msg.Command() == wire.CmdBlock {
blockMsg, ok := msg.(*bcnet.BlockMessage)
if !ok {
Expand All @@ -61,6 +61,8 @@ func (l *Listner) OnReceive(msg wire.Message) {
// lock block for the current instance to process
hash := blockMsg.Hash

l.logger.Info("Received BLOCK msg from multicast group", slog.String("hash", hash.String()))

processedBy, err := l.store.SetBlockProcessing(context.Background(), hash, l.hostname)
if err != nil {
// block is already being processed by another blocktx instance
Expand All @@ -82,6 +84,6 @@ func (l *Listner) OnReceive(msg wire.Message) {
}

// OnSend should be fire & forget
func (l *Listner) OnSend(_ wire.Message) {
func (l *Listener) OnSend(_ wire.Message) {
// ignore
}
34 changes: 31 additions & 3 deletions internal/metamorph/bcnet/metamorph_p2p/hybrid_message_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,46 @@ func NewHybridMsgHandler(l *slog.Logger, messageCh chan<- *TxStatusMessage) *Hyb
// OnReceive handles incoming messages depending on command type
func (h *HybridMsgHandler) OnReceive(msg wire.Message, peer p2p.PeerI) {
cmd := msg.Command()
if cmd == wire.CmdTx {
switch cmd {
case wire.CmdInv:
h.handleReceivedInv(msg, peer)

case wire.CmdTx:
h.handleReceivedTx(msg, peer)
}

// ignore other
default:
// ignore other
}
}

// OnSend handles outgoing messages depending on command type
func (h *HybridMsgHandler) OnSend(_ wire.Message, _ p2p.PeerI) {
// ignore
}

func (h *HybridMsgHandler) handleReceivedInv(wireMsg wire.Message, peer p2p.PeerI) {
msg, ok := wireMsg.(*wire.MsgInv)
if !ok {
return
}

go func() {
for _, iv := range msg.InvList {
if iv.Type == wire.InvTypeTx {
select {
case h.messageCh <- &TxStatusMessage{
Hash: &iv.Hash,
Status: metamorph_api.Status_SEEN_ON_NETWORK,
Peer: peer.String(),
}:
default: // Ensure that writing to channel is non-blocking -- probably we should give up on this
}
}
// ignore INV with block or error
}
}()
}

func (h *HybridMsgHandler) handleReceivedTx(wireMsg wire.Message, peer p2p.PeerI) {
msg, ok := wireMsg.(*wire.MsgTx)
if !ok {
Expand Down
5 changes: 4 additions & 1 deletion internal/multicast/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type Group[T wire.Message] struct {
func NewGroup[T wire.Message](l *slog.Logger, mh MessageHandlerI, addr string, mode ModeFlag, network wire.BitcoinNet /*TODO: add opts*/) *Group[T] {
var tmp T
l = l.With(
slog.String("module", "mcast-group"),
slog.Group("mcast",
slog.String("network", network.String()),
slog.String("cmd", tmp.Command()),
Expand Down Expand Up @@ -126,7 +127,7 @@ func (g *Group[T]) connect() bool {
return false
}

conn, err := net.ListenUDP("udp6", udpAddr)
conn, err := net.ListenPacket("udp6", udpAddr.String())
if err != nil {
g.logger.Error("Failed to dial node", slog.String("err", err.Error()))
return false
Expand All @@ -136,6 +137,7 @@ func (g *Group[T]) connect() bool {
g.mcastConn = &ipv6ConnAdapter{Conn: pConn}

if g.mode.Has(Read) {
g.logger.Info("Join to multicast group")
err = pConn.JoinGroup(nil, udpAddr) // TODO: define net interface
if err != nil {
g.logger.Error("Failed to join mcast group", slog.String("err", err.Error()))
Expand All @@ -146,6 +148,7 @@ func (g *Group[T]) connect() bool {
}

if g.mode.Has(Write) {
g.mcastConn.dst = udpAddr
g.sendMessages()
}

Expand Down
Loading

0 comments on commit 21082c8

Please sign in to comment.