diff --git a/cmd/arc/services/blocktx.go b/cmd/arc/services/blocktx.go index 90a860f55..8dbd1dfd7 100644 --- a/cmd/arc/services/blocktx.go +++ b/cmd/arc/services/blocktx.go @@ -1,6 +1,7 @@ package cmd import ( + "errors" "fmt" "log/slog" "os" @@ -13,11 +14,13 @@ import ( "github.com/bitcoin-sv/arc/internal/message_queue/nats/client/nats_jetstream" "github.com/bitcoin-sv/arc/internal/message_queue/nats/nats_connection" "github.com/bitcoin-sv/arc/internal/tracing" + "github.com/libsv/go-p2p/wire" "github.com/bitcoin-sv/arc/config" "github.com/bitcoin-sv/arc/internal/blocktx" "github.com/bitcoin-sv/arc/internal/blocktx/bcnet" "github.com/bitcoin-sv/arc/internal/blocktx/bcnet/blocktx_p2p" + "github.com/bitcoin-sv/arc/internal/blocktx/bcnet/mcast" "github.com/bitcoin-sv/arc/internal/blocktx/store" "github.com/bitcoin-sv/arc/internal/blocktx/store/postgresql" "github.com/bitcoin-sv/arc/internal/p2p" @@ -37,13 +40,14 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err btxConfig := arcConfig.Blocktx var ( - blockStore store.BlocktxStore - mqClient blocktx.MessageQueueClient - processor *blocktx.Processor - pm *p2p.PeerManager - server *blocktx.Server - healthServer *grpc_opts.GrpcServer - workers *blocktx.BackgroundWorkers + blockStore store.BlocktxStore + mqClient blocktx.MessageQueueClient + processor *blocktx.Processor + pm *p2p.PeerManager + mcastListener *mcast.Listener + server *blocktx.Server + healthServer *grpc_opts.GrpcServer + workers *blocktx.BackgroundWorkers err error ) @@ -71,15 +75,10 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err stopFn := func() { logger.Info("Shutting down blocktx") - disposeBlockTx(logger, server, processor, pm, mqClient, blockStore, healthServer, workers, shutdownFns) + disposeBlockTx(logger, server, processor, pm, mcastListener, mqClient, blockStore, healthServer, workers, shutdownFns) logger.Info("Shutdown complete") } - network, err := config.GetNetwork(arcConfig.Network) - if err != nil { - return nil, err - } - blockStore, err = NewBlocktxStore(logger, btxConfig.Db, arcConfig.Tracing) if err != nil { return nil, fmt.Errorf("failed to create blocktx store: %v", err) @@ -143,58 +142,18 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err err = processor.Start(arcConfig.Prometheus.IsEnabled()) if err != nil { stopFn() - return nil, fmt.Errorf("failed to start peer handler: %v", err) - } - - // p2p global setting - p2p.SetExcessiveBlockSize(maximumBlockSize) - - pmOpts := []p2p.PeerManagerOptions{} - if arcConfig.Blocktx.MonitorPeers { - pmOpts = append(pmOpts, p2p.WithRestartUnhealthyPeers()) - } - - pm = p2p.NewPeerManager(logger.With(slog.String("module", "peer-mng")), network, pmOpts...) - peers := make([]p2p.PeerI, len(arcConfig.Broadcasting.Unicast.Peers)) - - peerHandler := blocktx_p2p.NewMsgHandler(logger, blockRequestCh, blockProcessCh) - - peerOpts := []p2p.PeerOptions{ - p2p.WithMaximumMessageSize(maximumBlockSize), - p2p.WithPingInterval(30*time.Second, 1*time.Minute), - p2p.WithReadBufferSize(arcConfig.Blocktx.P2pReadBufferSize), - } - - if version.Version != "" { - peerOpts = append(peerOpts, p2p.WithUserAgent("ARC", version.Version)) + return nil, fmt.Errorf("failed to start prometheus: %v", err) } - for i, peerSetting := range arcConfig.Broadcasting.Unicast.Peers { - peerURL, err := peerSetting.GetP2PUrl() - if err != nil { - stopFn() - return nil, fmt.Errorf("error getting peer url: %v", err) - } - - peer := p2p.NewPeer(logger.With(slog.String("module", "peer")), peerHandler, peerURL, network, peerOpts...) - ok := peer.Connect() - if !ok { - stopFn() - return nil, fmt.Errorf("error creating peer %s: %v", peerURL, err) - } - - err = pm.AddPeer(peer) - if err != nil { - stopFn() - return nil, fmt.Errorf("error adding peer: %v", err) - } - - peers[i] = peer + pm, mcastListener, err = setupBcNetworkCommunication(logger, arcConfig, blockStore, blockRequestCh, blockProcessCh) + if err != nil { + stopFn() + return nil, fmt.Errorf("failed to establish connection with network: %v", err) } if btxConfig.FillGaps != nil && btxConfig.FillGaps.Enabled { workers = blocktx.NewBackgroundWorkers(blockStore, logger) - workers.StartFillGaps(peers, btxConfig.FillGaps.Interval, btxConfig.RecordRetentionDays, blockRequestCh) + workers.StartFillGaps(pm.GetPeers(), btxConfig.FillGaps.Interval, btxConfig.RecordRetentionDays, blockRequestCh) } server, err = blocktx.NewServer(arcConfig.Prometheus.Endpoint, arcConfig.GrpcMessageSize, logger, @@ -250,8 +209,151 @@ func NewBlocktxStore(logger *slog.Logger, dbConfig *config.DbConfig, tracingConf return s, err } +// setupBcNetworkCommunication initializes the Bloctx blockchain network communication layer, configuring it +// to operate in either classic (P2P-only) or hybrid (P2P and multicast) mode. +// +// Parameters: +// - `l *slog.Logger`: Logger instance for logging events. +// - `arcConfig *config.ArcConfig`: Configuration object containing blockchain network settings. +// - `store store.BlocktxStore`: A storage interface for blockchain transactions. +// - `blockRequestCh chan<- blocktx_p2p.BlockRequest`: Channel for handling block requests. +// - `blockProcessCh chan<- *bcnet.BlockMessage`: Channel for processing block messages. +// +// Returns: +// - `manager *p2p.PeerManager`: Manages P2P peers. +// - `mcastListener *mcast.Listener`: Handles multicast communication, or `nil` if not in hybrid mode. +// - `err error`: Error if any issue occurs during setup. +// +// Key Details: +// - **Mode Handling**: +// - `"classic"` mode uses `blocktx_p2p.NewMsgHandler` for P2P communication only. +// - `"hybrid"` mode uses `blocktx_p2p.NewHybridMsgHandler` for P2P and multicast communication. +// +// - **Error Cleanup**: Ensures resources like peers and multicast listeners are properly cleaned up on errors. +// - **Peer Management**: Connects to configured peers and initializes a PeerManager for handling P2P connections. +// - **Multicast Communication**: In hybrid mode, joins a multicast group for blockchain updates and uses correct P2P message handler. +// +// Message Handlers: +// - `blocktx_p2p.NewMsgHandler`: Used in classic mode, handles all blockchain communication exclusively via P2P. +// - `blocktx_p2p.NewHybridMsgHandler`: Used in hybrid mode, seamlessly integrates P2P communication with multicast group updates. +func setupBcNetworkCommunication(l *slog.Logger, arcConfig *config.ArcConfig, store store.BlocktxStore, blockRequestCh chan<- blocktx_p2p.BlockRequest, blockProcessCh chan<- *bcnet.BlockMessage) (manager *p2p.PeerManager, mcastListener *mcast.Listener, err error) { + defer func() { + // cleanup on error + if err == nil { + return + } + + if manager != nil { + manager.Shutdown() + } + + if mcastListener != nil { + mcastListener.Disconnect() + } + }() + + // p2p global setting + p2p.SetExcessiveBlockSize(maximumBlockSize) + + cfg := arcConfig.Blocktx.BlockchainNetwork + network, err := config.GetNetwork(cfg.Network) + if err != nil { + return + } + + var msgHandler p2p.MessageHandlerI + + if cfg.Mode == "classic" { + msgHandler = blocktx_p2p.NewMsgHandler(l, blockRequestCh, blockProcessCh) + } else if cfg.Mode == "hybrid" { + l.Info("!!! Blocktx will communicate with blockchain in HYBRID mode (via p2p and multicast groups) !!!") + msgHandler = blocktx_p2p.NewHybridMsgHandler(l, blockProcessCh) + } else { + return nil, nil, fmt.Errorf("unsupported communication type: %s", cfg.Mode) + } + + // connect to peers + var managerOpts []p2p.PeerManagerOptions + if arcConfig.Blocktx.MonitorPeers { + managerOpts = append(managerOpts, p2p.WithRestartUnhealthyPeers()) + } + + manager = p2p.NewPeerManager(l.With(slog.String("module", "peer-mng")), network, managerOpts...) + peers, err := connectToPeers(l, network, msgHandler, cfg.Peers, p2p.WithMaximumMessageSize(maximumBlockSize)) + if err != nil { + return + } + + for _, p := range peers { + if err = manager.AddPeer(p); err != nil { + return + } + } + + // connect to mcast + if cfg.Mode == "hybrid" { + if cfg.Mcast == nil { + return manager, mcastListener, errors.New("mcast config is required") + } + + // TODO: add net interfaces + mcastListener = mcast.NewMcastListener(l, cfg.Mcast.McastBlock.Address, network, store, blockProcessCh) + ok := mcastListener.Connect() + if !ok { + return manager, nil, fmt.Errorf("error connecting to mcast %s: %w", cfg.Mcast.McastBlock, err) + } + } + + return +} + +func connectToPeers(l *slog.Logger, network wire.BitcoinNet, msgHandler p2p.MessageHandlerI, peersConfig []*config.PeerConfig, additionalOpts ...p2p.PeerOptions) (peers []*p2p.Peer, err error) { + defer func() { + // cleanup on error + if err == nil { + return + } + + for _, p := range peers { + p.Shutdown() + } + }() + + opts := []p2p.PeerOptions{ + p2p.WithPingInterval(30*time.Second, 2*time.Minute), + } + + if version.Version != "" { + opts = append(opts, p2p.WithUserAgent("ARC", version.Version)) + } + + opts = append(opts, additionalOpts...) + + for _, settings := range peersConfig { + url, err := settings.GetP2PUrl() + if err != nil { + return nil, fmt.Errorf("error getting peer url: %w", err) + } + + p := p2p.NewPeer( + l.With(slog.String("module", "peer")), + msgHandler, + url, + network, + opts...) + ok := p.Connect() + if !ok { + return nil, fmt.Errorf("error connecting peer %s: %w", url, err) + } + + peers = append(peers, p) + } + + return +} + func disposeBlockTx(l *slog.Logger, server *blocktx.Server, processor *blocktx.Processor, - pm *p2p.PeerManager, mqClient blocktx.MessageQueueClient, + pm *p2p.PeerManager, mcastListener *mcast.Listener, mqClient blocktx.MessageQueueClient, store store.BlocktxStore, healthServer *grpc_opts.GrpcServer, workers *blocktx.BackgroundWorkers, shutdownFns []func(), ) { @@ -277,6 +379,9 @@ func disposeBlockTx(l *slog.Logger, server *blocktx.Server, processor *blocktx.P if pm != nil { pm.Shutdown() } + if mcastListener != nil { + mcastListener.Disconnect() + } if mqClient != nil { mqClient.Shutdown() } diff --git a/cmd/arc/services/callbacker.go b/cmd/arc/services/callbacker.go index 214a557da..b75379642 100644 --- a/cmd/arc/services/callbacker.go +++ b/cmd/arc/services/callbacker.go @@ -27,6 +27,7 @@ import ( "log/slog" "net/http" "os" + "reflect" "time" "github.com/bitcoin-sv/arc/config" @@ -195,7 +196,7 @@ func dispose(l *slog.Logger, server *callbacker.Server, workers *callbacker.Back processor.GracefulStop() } - if mqClient != nil { + if mqClient != nil && !(reflect.ValueOf(mqClient).IsNil()) { mqClient.Shutdown() } diff --git a/cmd/arc/services/metamorph.go b/cmd/arc/services/metamorph.go index 05ed0eea6..04d3d3e40 100644 --- a/cmd/arc/services/metamorph.go +++ b/cmd/arc/services/metamorph.go @@ -2,11 +2,11 @@ package cmd import ( "context" + "errors" "fmt" "log/slog" "net/url" "os" - "time" "go.opentelemetry.io/otel/attribute" @@ -25,13 +25,14 @@ import ( "github.com/bitcoin-sv/arc/internal/message_queue/nats/client/nats_jetstream" "github.com/bitcoin-sv/arc/internal/message_queue/nats/nats_connection" "github.com/bitcoin-sv/arc/internal/metamorph" + "github.com/bitcoin-sv/arc/internal/metamorph/bcnet" + "github.com/bitcoin-sv/arc/internal/metamorph/bcnet/mcast" "github.com/bitcoin-sv/arc/internal/metamorph/bcnet/metamorph_p2p" "github.com/bitcoin-sv/arc/internal/metamorph/metamorph_api" "github.com/bitcoin-sv/arc/internal/metamorph/store" "github.com/bitcoin-sv/arc/internal/metamorph/store/postgresql" "github.com/bitcoin-sv/arc/internal/p2p" - "github.com/bitcoin-sv/arc/internal/version" ) const ( @@ -47,7 +48,10 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore var ( metamorphStore store.MetamorphStore + bcMediator *bcnet.Mediator pm *p2p.PeerManager + messenger *p2p.NetworkMessenger + multicaster *mcast.Multicaster statusMessageCh chan *metamorph_p2p.TxStatusMessage mqClient metamorph.MessageQueue processor *metamorph.Processor @@ -62,6 +66,7 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore optsServer := make([]metamorph.ServerOption, 0) processorOpts := make([]metamorph.Option, 0) callbackerOpts := make([]callbacker.Option, 0) + bcMediatorOpts := make([]bcnet.Option, 0) if arcConfig.IsTracingEnabled() { cleanup, err := tracing.Enable(logger, "metamorph", arcConfig.Tracing) @@ -81,11 +86,12 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore optsServer = append(optsServer, metamorph.WithServerTracer(attributes...)) callbackerOpts = append(callbackerOpts, callbacker.WithTracerCallbacker(attributes...)) processorOpts = append(processorOpts, metamorph.WithTracerProcessor(attributes...)) + bcMediatorOpts = append(bcMediatorOpts, bcnet.WithTracer(attributes...)) } stopFn := func() { logger.Info("Shutting down metamorph") - disposeMtm(logger, server, processor, pm, mqClient, metamorphStore, healthServer, shutdownFns) + disposeMtm(logger, server, processor, pm, messenger, multicaster, mqClient, metamorphStore, healthServer, shutdownFns) logger.Info("Shutdown complete") } @@ -94,7 +100,7 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore return nil, fmt.Errorf("failed to create metamorph store: %v", err) } - pm, statusMessageCh, err = initPeerManager(logger, metamorphStore, arcConfig) + bcMediator, messenger, pm, multicaster, statusMessageCh, err = setupMtmBcNetworkCommunication(logger, metamorphStore, arcConfig, bcMediatorOpts) if err != nil { stopFn() return nil, err @@ -166,7 +172,7 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore processor, err = metamorph.NewProcessor( metamorphStore, cacheStore, - p2p.NewNetworkMessenger(logger, pm), + bcMediator, statusMessageCh, processorOpts..., ) @@ -210,7 +216,7 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore return nil, fmt.Errorf("serve GRPC server failed: %v", err) } - for i, peerSetting := range arcConfig.Broadcasting.Unicast.Peers { + for i, peerSetting := range arcConfig.Metamorph.BlockchainNetwork.Peers { zmqURL, err := peerSetting.GetZMQUrl() if err != nil { logger.Warn("failed to get zmq URL for peer", slog.Int("index", i), slog.String("err", err.Error())) @@ -277,53 +283,124 @@ func NewMetamorphStore(dbConfig *config.DbConfig, tracingConfig *config.TracingC return s, err } -func initPeerManager(logger *slog.Logger, s store.MetamorphStore, arcConfig *config.ArcConfig) (*p2p.PeerManager, chan *metamorph_p2p.TxStatusMessage, error) { - network, err := config.GetNetwork(arcConfig.Network) +// setupMtmBcNetworkCommunication initializes the Metamorph blockchain network communication, configuring it +// to operate in either classic (P2P-only) or hybrid (P2P and multicast) mode. +// +// Parameters: +// - `l *slog.Logger`: Logger instance for event logging. +// - `s store.MetamorphStore`: Storage interface for Metamorph operations. +// - `arcConfig *config.ArcConfig`: Configuration object for blockchain network settings. +// - `mediatorOpts []bcnet.Option`: Additional options for the mediator. +// +// Returns: +// - `mediator *bcnet.Mediator`: Coordinates communication between P2P and multicast layers. +// - `messenger *p2p.NetworkMessenger`: Handles P2P message delivery - used by mediator only. +// - `manager *p2p.PeerManager`: Manages the lifecycle of P2P peers. +// - `multicaster *mcast.Multicaster`: Handles multicast message broadcasting and listening - used by mediator only. +// - `messageCh chan *metamorph_p2p.TxStatusMessage`: Channel for handling transaction status messages. +// - `err error`: Error if any part of the setup fails. +// +// Key Details: +// - **Mode Handling**: +// - `"classic"`: Uses `metamorph_p2p.NewMsgHandler` for exclusive P2P communication. +// - `"hybrid"`: Uses `metamorph_p2p.NewHybridMsgHandler` for integration of P2P and multicast group communication. +// +// - **Error Cleanup**: Cleans up resources such as the messenger, manager, and multicaster on failure. +// - **Peer Management**: Establishes connections to P2P peers and configures them with write handlers and buffer sizes. +// - **Multicast Communication**: In hybrid mode, joins multicast groups for transaction and rejection messages. +// +// Message Handlers: +// - `metamorph_p2p.NewMsgHandler`: Used in classic mode, handling all communication via P2P. +// - `metamorph_p2p.NewHybridMsgHandler`: Used in hybrid mode, integrating P2P communication with multicast group updates. +func setupMtmBcNetworkCommunication(l *slog.Logger, s store.MetamorphStore, arcConfig *config.ArcConfig, mediatorOpts []bcnet.Option) ( + mediator *bcnet.Mediator, messenger *p2p.NetworkMessenger, manager *p2p.PeerManager, multicaster *mcast.Multicaster, + messageCh chan *metamorph_p2p.TxStatusMessage, err error) { + defer func() { + // cleanup on error + if err == nil { + return + } + + if messenger != nil { + messenger.Shutdown() + messenger = nil + } + + if manager != nil { + manager.Shutdown() + manager = nil + } + + if multicaster != nil { + multicaster.Disconnect() + multicaster = nil + } + }() + + cfg := arcConfig.Metamorph.BlockchainNetwork + network, err := config.GetNetwork(cfg.Network) if err != nil { - return nil, nil, fmt.Errorf("failed to get network: %v", err) + return } - logger.Info("Assuming bitcoin network", "network", network) + l.Info("Assuming bitcoin network", "network", network) - messageCh := make(chan *metamorph_p2p.TxStatusMessage, 10000) - var pmOpts []p2p.PeerManagerOptions - if arcConfig.Metamorph.MonitorPeers { - pmOpts = append(pmOpts, p2p.WithRestartUnhealthyPeers()) - } + messageCh = make(chan *metamorph_p2p.TxStatusMessage, 10000) + var msgHandler p2p.MessageHandlerI - pm := p2p.NewPeerManager(logger.With(slog.String("module", "peer-mng")), network, pmOpts...) + if cfg.Mode == "classic" { + msgHandler = metamorph_p2p.NewMsgHandler(l, s, messageCh) + } else if cfg.Mode == "hybrid" { + l.Info("!!! Metamorph will communicate with blockchain in HYBRID mode (via p2p and multicast groups) !!!") + msgHandler = metamorph_p2p.NewHybridMsgHandler(l, messageCh) + } else { + err = fmt.Errorf("unsupported communication type: %s", cfg.Mode) + return + } - msgHandler := metamorph_p2p.NewMsgHandler(logger.With(slog.String("module", "peer-msg-handler")), s, messageCh) + // connect to peers + var managerOpts []p2p.PeerManagerOptions + if arcConfig.Metamorph.MonitorPeers { + managerOpts = append(managerOpts, p2p.WithRestartUnhealthyPeers()) + } - peerOpts := []p2p.PeerOptions{ - p2p.WithPingInterval(30*time.Second, 2*time.Minute), + manager = p2p.NewPeerManager(l.With(slog.String("module", "peer-mng")), network, managerOpts...) + peers, err := connectToPeers(l, network, msgHandler, cfg.Peers, p2p.WithNrOfWriteHandlers(8), - p2p.WithWriteChannelSize(4096), + p2p.WithWriteChannelSize(4096)) + if err != nil { + return } - if version.Version != "" { - peerOpts = append(peerOpts, p2p.WithUserAgent("ARC", version.Version)) + for _, p := range peers { + if err = manager.AddPeer(p); err != nil { + return + } } - l := logger.With(slog.String("module", "peer")) - for _, peerSetting := range arcConfig.Broadcasting.Unicast.Peers { - peerURL, err := peerSetting.GetP2PUrl() - if err != nil { - return nil, nil, fmt.Errorf("error getting peer url: %v", err) + // connect to mcast + if cfg.Mode == "hybrid" { + if cfg.Mcast == nil { + err = errors.New("mcast config is required") + return } - peer := p2p.NewPeer(l, msgHandler, peerURL, network, peerOpts...) - ok := peer.Connect() - if !ok { - return nil, nil, fmt.Errorf("cannot connect to peer %s", peerURL) + // TODO: add interfaces + groups := mcast.GroupsAddresses{ + McastTx: cfg.Mcast.McastTx.Address, + McastReject: cfg.Mcast.McastReject.Address, } - - if err = pm.AddPeer(peer); err != nil { - return nil, nil, fmt.Errorf("error adding peer %s: %v", peerURL, err) + multicaster = mcast.NewMulticaster(l, groups, network, messageCh) + ok := multicaster.Connect() + if !ok { + err = fmt.Errorf("error connecting to mcast: %w", err) + return } } - return pm, messageCh, nil + messenger = p2p.NewNetworkMessenger(l, manager) + mediator = bcnet.NewMediator(l, cfg.Mode == "classic", messenger, multicaster, mediatorOpts...) + return } func initGrpcCallbackerConn(address, prometheusEndpoint string, grpcMsgSize int, tracingConfig *config.TracingConfig) (callbacker_api.CallbackerAPIClient, error) { @@ -340,7 +417,7 @@ func initGrpcCallbackerConn(address, prometheusEndpoint string, grpcMsgSize int, } func disposeMtm(l *slog.Logger, server *metamorph.Server, processor *metamorph.Processor, - peerManaager *p2p.PeerManager, mqClient metamorph.MessageQueueClient, + pm *p2p.PeerManager, messenger *p2p.NetworkMessenger, multicaster *mcast.Multicaster, mqClient metamorph.MessageQueueClient, metamorphStore store.MetamorphStore, healthServer *grpc_opts.GrpcServer, shutdownFns []func(), ) { @@ -359,9 +436,19 @@ func disposeMtm(l *slog.Logger, server *metamorph.Server, processor *metamorph.P if processor != nil { processor.Shutdown() } - if peerManaager != nil { - peerManaager.Shutdown() + + if messenger != nil { + messenger.Shutdown() + } + + if pm != nil { + pm.Shutdown() } + + if multicaster != nil { + multicaster.Disconnect() + } + if mqClient != nil { mqClient.Shutdown() } diff --git a/config/config.go b/config/config.go index 793bfe117..75333e21b 100644 --- a/config/config.go +++ b/config/config.go @@ -22,7 +22,6 @@ type ArcConfig struct { MessageQueue *MessageQueueConfig `mapstructure:"messageQueue"` Tracing *TracingConfig `mapstructure:"tracing"` PeerRPC *PeerRPCConfig `mapstructure:"peerRpc"` - Broadcasting *BroadcastingConfig `mapstructure:"broadcasting"` Metamorph *MetamorphConfig `mapstructure:"metamorph"` Blocktx *BlocktxConfig `mapstructure:"blocktx"` API *APIConfig `mapstructure:"api"` @@ -41,21 +40,6 @@ func (p *PrometheusConfig) IsEnabled() bool { return p.Enabled && p.Addr != "" && p.Endpoint != "" } -type BroadcastingConfig struct { - Mode string `mapstructure:"mode"` - Multicast *Mulsticast `mapstructure:"multicast"` - Unicast *Unicast `mapstructure:"unicast"` -} - -type Unicast struct { - Peers []*PeerConfig `mapstructure:"peers"` -} -type Mulsticast struct { - Ipv6Enabled bool `mapstructure:"ipv6Enabled"` - MulticastGroups []*string `mapstructure:"multicastGroups"` - Interfaces []*string `mapstructure:"interfaces"` -} - type PeerConfig struct { Host string `mapstructure:"host"` Port *PeerPortConfig `mapstructure:"port"` @@ -100,34 +84,57 @@ type PeerPortConfig struct { } type MetamorphConfig struct { - ListenAddr string `mapstructure:"listenAddr"` - DialAddr string `mapstructure:"dialAddr"` - Db *DbConfig `mapstructure:"db"` - ProcessorCacheExpiryTime time.Duration `mapstructure:"processorCacheExpiryTime"` - UnseenTransactionRebroadcastingInterval time.Duration `mapstructure:"unseenTransactionRebroadcastingInterval"` - MaxRetries int `mapstructure:"maxRetries"` - ProcessStatusUpdateInterval time.Duration `mapstructure:"processStatusUpdateInterval"` - RecheckSeen RecheckSeen `mapstructure:"recheckSeen"` - MonitorPeers bool `mapstructure:"monitorPeers"` - CheckUtxos bool `mapstructure:"checkUtxos"` - Health *HealthConfig `mapstructure:"health"` - RejectCallbackContaining []string `mapstructure:"rejectCallbackContaining"` - Stats *StatsConfig `mapstructure:"stats"` + ListenAddr string `mapstructure:"listenAddr"` + DialAddr string `mapstructure:"dialAddr"` + Db *DbConfig `mapstructure:"db"` + ProcessorCacheExpiryTime time.Duration `mapstructure:"processorCacheExpiryTime"` + UnseenTransactionRebroadcastingInterval time.Duration `mapstructure:"unseenTransactionRebroadcastingInterval"` + MaxRetries int `mapstructure:"maxRetries"` + ProcessStatusUpdateInterval time.Duration `mapstructure:"processStatusUpdateInterval"` + RecheckSeen RecheckSeen `mapstructure:"recheckSeen"` + MonitorPeers bool `mapstructure:"monitorPeers"` + CheckUtxos bool `mapstructure:"checkUtxos"` + Health *HealthConfig `mapstructure:"health"` + RejectCallbackContaining []string `mapstructure:"rejectCallbackContaining"` + Stats *StatsConfig `mapstructure:"stats"` + BlockchainNetwork *BlockchainNetwork[*MetamorphGroups] `mapstructure:"bcnet"` } type BlocktxConfig struct { - ListenAddr string `mapstructure:"listenAddr"` - DialAddr string `mapstructure:"dialAddr"` - HealthServerDialAddr string `mapstructure:"healthServerDialAddr"` - Db *DbConfig `mapstructure:"db"` - RecordRetentionDays int `mapstructure:"recordRetentionDays"` - RegisterTxsInterval time.Duration `mapstructure:"registerTxsInterval"` - MaxBlockProcessingDuration time.Duration `mapstructure:"maxBlockProcessingDuration"` - MonitorPeers bool `mapstructure:"monitorPeers"` - FillGaps *FillGapsConfig `mapstructure:"fillGaps"` - MaxAllowedBlockHeightMismatch int `mapstructure:"maxAllowedBlockHeightMismatch"` - MessageQueue *MessageQueueConfig `mapstructure:"mq"` - P2pReadBufferSize int `mapstructure:"p2pReadBufferSize"` + ListenAddr string `mapstructure:"listenAddr"` + DialAddr string `mapstructure:"dialAddr"` + HealthServerDialAddr string `mapstructure:"healthServerDialAddr"` + Db *DbConfig `mapstructure:"db"` + RecordRetentionDays int `mapstructure:"recordRetentionDays"` + RegisterTxsInterval time.Duration `mapstructure:"registerTxsInterval"` + MaxBlockProcessingDuration time.Duration `mapstructure:"maxBlockProcessingDuration"` + MonitorPeers bool `mapstructure:"monitorPeers"` + FillGaps *FillGapsConfig `mapstructure:"fillGaps"` + MaxAllowedBlockHeightMismatch int `mapstructure:"maxAllowedBlockHeightMismatch"` + MessageQueue *MessageQueueConfig `mapstructure:"mq"` + P2pReadBufferSize int `mapstructure:"p2pReadBufferSize"` + BlockchainNetwork *BlockchainNetwork[*BlocktxGroups] `mapstructure:"bcnet"` +} + +type BlockchainNetwork[McastT any] struct { + Mode string `mapstructure:"mode"` + Network string `mapstructure:"network"` + Peers []*PeerConfig `mapstructure:"peers"` + Mcast McastT `mapstructure:"mcast"` +} + +type BlocktxGroups struct { + McastBlock McastGroup `mapstructure:"block"` +} + +type MetamorphGroups struct { + McastTx McastGroup `mapstructure:"tx"` + McastReject McastGroup `mapstructure:"reject"` +} + +type McastGroup struct { + Address string `mapstructure:"address"` + Interfaces []string `mapstructure:"interfaces"` } type DbConfig struct { @@ -151,10 +158,6 @@ type CacheConfig struct { Redis *RedisConfig `mapstructure:"redis"` } -type FreeCacheConfig struct { - Size int `mapstructure:"size"` -} - type RedisConfig struct { Addr string `mapstructure:"addr"` Password string `mapstructure:"password"` diff --git a/config/defaults.go b/config/defaults.go index 869d44492..b7490454f 100644 --- a/config/defaults.go +++ b/config/defaults.go @@ -17,7 +17,6 @@ func getDefaultArcConfig() *ArcConfig { MessageQueue: getDefaultMessageQueueConfig(), Tracing: getDefaultTracingConfig(), PeerRPC: getDefaultPeerRPCConfig(), - Broadcasting: getBroadcastingConfig(), Metamorph: getMetamorphConfig(), Blocktx: getBlocktxConfig(), API: getAPIConfig(), @@ -54,40 +53,6 @@ func getDefaultPeerRPCConfig() *PeerRPCConfig { } } -func getBroadcastingConfig() *BroadcastingConfig { - return &BroadcastingConfig{ - Mode: "unicast", - Unicast: &Unicast{ - Peers: []*PeerConfig{ - { - Host: "localhost", - Port: &PeerPortConfig{ - P2P: 18333, - ZMQ: 28332, - }, - }, - { - Host: "localhost", - Port: &PeerPortConfig{ - P2P: 18334, - }, - }, - { - Host: "localhost", - Port: &PeerPortConfig{ - P2P: 18335, - }, - }, - }, - }, - Multicast: &Mulsticast{ - Ipv6Enabled: false, - MulticastGroups: nil, - Interfaces: nil, - }, - } -} - func getMetamorphConfig() *MetamorphConfig { return &MetamorphConfig{ ListenAddr: "localhost:8001", @@ -112,6 +77,19 @@ func getMetamorphConfig() *MetamorphConfig { NotSeenTimeLimit: 10 * time.Minute, NotFinalTimeLimit: 20 * time.Minute, }, + BlockchainNetwork: &BlockchainNetwork[*MetamorphGroups]{ + Mode: "classic", + Network: "regtest", + Peers: []*PeerConfig{ + { + Host: "localhost", + Port: &PeerPortConfig{ + P2P: 18333, + ZMQ: 28332, + }, + }, + }, + }, } } @@ -129,6 +107,19 @@ func getBlocktxConfig() *BlocktxConfig { MaxBlockProcessingDuration: 5 * time.Minute, MessageQueue: &MessageQueueConfig{}, P2pReadBufferSize: 8 * 1024 * 1024, + BlockchainNetwork: &BlockchainNetwork[*BlocktxGroups]{ + Mode: "classic", + Network: "regtest", + Peers: []*PeerConfig{ + { + Host: "localhost", + Port: &PeerPortConfig{ + P2P: 18333, + ZMQ: 28332, + }, + }, + }, + }, } } diff --git a/config/example_config.yaml b/config/example_config.yaml index 4a23a08b4..b2ce2ec51 100644 --- a/config/example_config.yaml +++ b/config/example_config.yaml @@ -27,28 +27,6 @@ peerRpc: # rpc configuration for bitcoin node host: localhost port: 18332 -broadcasting: # settings for connection to nodes - mode: unicast # one of unicast | multicast - multicast: - ipv6Enabled: true # indicates whether ipv6 is enabled for multicasting - multicastGroups: # must be specified if mode = multicast - - "172.28.56.77" # address of multicast group, needs to be ipv6 address if ipv6 is enabled - interfaces: - - "eth0" - - "eth1" - unicast: - peers: # list of bitcoin node peers to connect to - - host: localhost - port: - p2p: 18333 # port for p2p connection - zmq: 28332 # port for zmq connection - - host: localhost - port: - p2p: 18334 - - host: localhost - port: - p2p: 18335 - cache: engine: redis # cache engine - one of in-memory | redis redis: # redis cache configuration in case that engine: redis @@ -86,6 +64,31 @@ metamorph: stats: notSeenTimeLimit: 10m # amount of time after storing at which a non-seen tx will be counted towards not seen stat notFinalTimeLimit: 20m # amount of time after storing at which a seen but not mined tx will be counted towards not mined stat + bcnet: + mode: classic # [classic|hybrid] + network: regtest # bitcoin network to connect to. Value can be one of mainnet | testnet | regtest + peers: # list of bitcoin node peers to connect to (required) + - host: localhost + port: + p2p: 18333 # port for p2p connection + zmq: 28332 # port for zmq connection + - host: localhost + port: + p2p: 18334 + - host: localhost + port: + p2p: 18335 + mcast: # multicast groups (required if mode: hybrid) + tx: + address: "[]" # multicast group IPv6 address + interfaces: + - "eth0" + - "eth1" + reject: + address: "[]" # multicast group IPv6 address + interfaces: + - "eth0" + - "eth1" blocktx: listenAddr: localhost:8011 # address space for blocktx to listen on. Can be for example localhost:8011 or :8011 for listening on all addresses @@ -111,6 +114,26 @@ blocktx: interval: 15m # time interval to check and fill gaps in processed blocks maxAllowedBlockHeightMismatch: 3 # maximum number of blocks that can be ahead of current highest block in blocktx, used for merkle roots verification p2pReadBufferSize: 8388608 # size of read buffer for p2p messeges + bcnet: + mode: classic # [classic|hybrid] + network: regtest # bitcoin network to connect to. Value can be one of mainnet | testnet | regtest + peers: # list of bitcoin node peers to connect to (required) + - host: localhost + port: + p2p: 18333 # port for p2p connection + zmq: 28332 # port for zmq connection + - host: localhost + port: + p2p: 18334 + - host: localhost + port: + p2p: 18335 + mcast: # multicast groups (required if mode: hybrid) + block: + address: "[]" # multicast group IPv6 address + interfaces: + - "eth0" + - "eth1" api: address: localhost:9090 # address to start api server on diff --git a/config/load_test.go b/config/load_test.go index 2da6cfd27..50f9a9cc8 100644 --- a/config/load_test.go +++ b/config/load_test.go @@ -36,12 +36,6 @@ func Test_Load(t *testing.T) { assert.Equal(t, "INFO", actualConfig.LogLevel) assert.Equal(t, "text", actualConfig.LogFormat) assert.Equal(t, "mainnet", actualConfig.Network) - assert.Equal(t, 18335, actualConfig.Broadcasting.Unicast.Peers[2].Port.P2P) - assert.Equal(t, "172.28.56.77", *actualConfig.Broadcasting.Multicast.MulticastGroups[0]) - assert.Equal(t, true, actualConfig.Broadcasting.Multicast.Ipv6Enabled) - assert.Equal(t, "unicast", actualConfig.Broadcasting.Mode) - assert.Equal(t, "eth1", *actualConfig.Broadcasting.Multicast.Interfaces[1]) - assert.Equal(t, 18335, actualConfig.Broadcasting.Unicast.Peers[2].Port.P2P) assert.NotNil(t, actualConfig.Tracing) assert.Equal(t, "http://tracing:1234", actualConfig.Tracing.DialAddr) }) diff --git a/config/test_files/config.yaml b/config/test_files/config.yaml index 2c05415c3..a9cd16ede 100644 --- a/config/test_files/config.yaml +++ b/config/test_files/config.yaml @@ -5,24 +5,3 @@ network: mainnet tracing: dialAddr: http://tracing:1234 sample: 10 # sampling in percentage -broadcasting: # settings for connection to nodes - mode: unicast # one of unicast | multicast - multicast: - ipv6Enabled: true # indicates whether ipv6 is enabled for multicasting - multicastGroups: # must be specified if mode = multicast - - "172.28.56.77" # address of multicast group, needs to be ipv6 address if ipv6 is enabled - interfaces: - - "eth0" - - "eth1" - unicast: - peers: # list of bitcoin node peers to connect to - - host: localhost - port: - p2p: 18333 # port for p2p connection - zmq: 28332 # port for zmq connection - - host: localhost - port: - p2p: 18334 - - host: localhost - port: - p2p: 18335 diff --git a/go.mod b/go.mod index 6f207991f..06e4af5fb 100644 --- a/go.mod +++ b/go.mod @@ -44,6 +44,7 @@ require ( go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0 go.opentelemetry.io/otel/sdk v1.28.0 go.opentelemetry.io/otel/trace v1.31.0 + golang.org/x/net v0.30.0 google.golang.org/genproto/googleapis/rpc v0.0.0-20240805194559-2c9e96a0b5d4 google.golang.org/grpc v1.65.0 google.golang.org/protobuf v1.34.2 @@ -149,7 +150,6 @@ require ( go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.28.0 // indirect golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect - golang.org/x/net v0.30.0 // indirect golang.org/x/oauth2 v0.22.0 // indirect golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.26.0 // indirect diff --git a/internal/blocktx/bcnet/blocktx_p2p/hybrid_message_handler.go b/internal/blocktx/bcnet/blocktx_p2p/hybrid_message_handler.go new file mode 100644 index 000000000..6e5c829ef --- /dev/null +++ b/internal/blocktx/bcnet/blocktx_p2p/hybrid_message_handler.go @@ -0,0 +1,50 @@ +package blocktx_p2p + +import ( + "log/slog" + + "github.com/bitcoin-sv/arc/internal/blocktx/bcnet" + "github.com/bitcoin-sv/arc/internal/p2p" + "github.com/libsv/go-p2p/wire" +) + +var _ p2p.MessageHandlerI = (*HybridMsgHandler)(nil) + +type HybridMsgHandler struct { + logger *slog.Logger + blockProcessingCh chan<- *bcnet.BlockMessage +} + +func NewHybridMsgHandler(l *slog.Logger, blockProcessCh chan<- *bcnet.BlockMessage) *HybridMsgHandler { + return &HybridMsgHandler{ + logger: l.With( + slog.String("module", "peer-msg-handler"), + slog.String("mode", "hybrid"), + ), + blockProcessingCh: blockProcessCh, + } +} + +// OnReceive handles incoming messages depending on command type +func (h *HybridMsgHandler) OnReceive(msg wire.Message, _ p2p.PeerI) { + cmd := msg.Command() + + switch cmd { + case wire.CmdBlock: + blockMsg, ok := msg.(*bcnet.BlockMessage) + if !ok { + h.logger.Error("Block msg receive", slog.Any("err", ErrUnableToCastWireMessage)) + return + } + + h.blockProcessingCh <- blockMsg + + default: + // ignore other messages + } +} + +// OnSend handles outgoing messages depending on command type +func (h *HybridMsgHandler) OnSend(_ wire.Message, _ p2p.PeerI) { + // ignore +} diff --git a/internal/blocktx/bcnet/blocktx_p2p/message_handler.go b/internal/blocktx/bcnet/blocktx_p2p/message_handler.go index a4aafbe3d..fe6ab7bdc 100644 --- a/internal/blocktx/bcnet/blocktx_p2p/message_handler.go +++ b/internal/blocktx/bcnet/blocktx_p2p/message_handler.go @@ -25,9 +25,12 @@ type MsgHandler struct { blockProcessingCh chan<- *bcnet.BlockMessage } -func NewMsgHandler(logger *slog.Logger, blockRequestCh chan<- BlockRequest, blockProcessCh chan<- *bcnet.BlockMessage) *MsgHandler { +func NewMsgHandler(l *slog.Logger, blockRequestCh chan<- BlockRequest, blockProcessCh chan<- *bcnet.BlockMessage) *MsgHandler { return &MsgHandler{ - logger: logger.With(slog.String("module", "peer-msg-handler")), + logger: l.With( + slog.String("module", "peer-msg-handler"), + slog.String("mode", "classic"), + ), blockRequestingCh: blockRequestCh, blockProcessingCh: blockProcessCh, } diff --git a/internal/blocktx/bcnet/mcast/listener.go b/internal/blocktx/bcnet/mcast/listener.go new file mode 100644 index 000000000..3526203c8 --- /dev/null +++ b/internal/blocktx/bcnet/mcast/listener.go @@ -0,0 +1,110 @@ +package mcast + +import ( + "context" + "errors" + "log/slog" + "os" + + "github.com/bitcoin-sv/arc/internal/blocktx/bcnet" + "github.com/bitcoin-sv/arc/internal/blocktx/store" + "github.com/bitcoin-sv/arc/internal/multicast" + "github.com/libsv/go-p2p/wire" +) + +var ErrUnableToCastWireMessage = errors.New("unable to cast wire.Message to blockchain.BlockMessage") + +var _ multicast.MessageHandlerI = (*Listener)(nil) + +// Listener is a multicast message listener specifically designed for processing blocks messages. +// +// Responsibilities: +// - Connects to a multicast group and listens for block messages (`CmdBlock`). +// - Processes incoming block messages by marking them for processing in the storage to ensure that only one instance processes a block at a time. +// - Ignores non-block messages to optimize processing and maintain focus on relevant data. +// +// Key Methods: +// - `NewMcastListener`: Initializes a new Listener instance, setting up the multicast group for reading block messages. +// - `Connect`: Establishes the connection to the multicast group. +// - `Disconnect`: Leaves the multicast group. +// - `OnReceive`: Handles received multicast messages, verifying their type and ensuring proper processing of block messages. +// - `OnSend`: Placeholder for handling messages sent to the multicast group, currently ignored. +// +// Behavior: +// - On receiving a block message (`CmdBlock`): +// 1. Verifies the message type. +// 2. Tries to mark the block as being processed by the current instance. +// 3. Forwards the block message to the `receiveCh` for further handling. +// - Logs and gracefully handles errors in block processing, ensuring robustness in a distributed system. + +type Listener struct { + hostname string + + logger *slog.Logger + store store.BlocktxStore + receiveCh chan<- *bcnet.BlockMessage + + blockGroup *multicast.Group[*bcnet.BlockMessage] +} + +func NewMcastListener(l *slog.Logger, addr string, network wire.BitcoinNet, store store.BlocktxStore, receiveCh chan<- *bcnet.BlockMessage) *Listener { + hostname, _ := os.Hostname() + + listner := Listener{ + logger: l.With("module", "mcast-listener"), + hostname: hostname, + store: store, + receiveCh: receiveCh, + } + + listner.blockGroup = multicast.NewGroup[*bcnet.BlockMessage](l, &listner, addr, multicast.Read, network) + return &listner +} + +func (l *Listener) Connect() bool { + return l.blockGroup.Connect() +} + +func (l *Listener) Disconnect() { + l.blockGroup.Disconnect() +} + +// OnReceive handles received messages from multicast group +func (l *Listener) OnReceive(msg wire.Message) { + if msg.Command() == wire.CmdBlock { + blockMsg, ok := msg.(*bcnet.BlockMessage) + if !ok { + l.logger.Error("Block msg receive", slog.Any("err", ErrUnableToCastWireMessage)) + return + } + + // TODO: move it to mediator or smth + // lock block for the current instance to process + hash := blockMsg.Hash + + l.logger.Info("Received BLOCK msg from multicast group", slog.String("hash", hash.String())) + + processedBy, err := l.store.SetBlockProcessing(context.Background(), hash, l.hostname) + if err != nil { + // block is already being processed by another blocktx instance + if errors.Is(err, store.ErrBlockProcessingDuplicateKey) { + l.logger.Debug("block processing already in progress", slog.String("hash", hash.String()), slog.String("processed_by", processedBy)) + return + } + + l.logger.Error("failed to set block processing", slog.String("hash", hash.String()), slog.String("err", err.Error())) + return + } + + // p.startBlockProcessGuard(p.ctx, hash) // handle it somehow + + l.receiveCh <- blockMsg + } + + // ignore other messages +} + +// OnSend handles sent messages to multicast group +func (l *Listener) OnSend(_ wire.Message) { + // ignore +} diff --git a/internal/metamorph/bcnet/mcast/multicaster.go b/internal/metamorph/bcnet/mcast/multicaster.go new file mode 100644 index 000000000..2ec1df586 --- /dev/null +++ b/internal/metamorph/bcnet/mcast/multicaster.go @@ -0,0 +1,128 @@ +package mcast + +import ( + "errors" + "log/slog" + + "github.com/bitcoin-sv/arc/internal/metamorph/bcnet/metamorph_p2p" + "github.com/bitcoin-sv/arc/internal/metamorph/metamorph_api" + "github.com/bitcoin-sv/arc/internal/multicast" + "github.com/libsv/go-p2p/bsvutil" + "github.com/libsv/go-p2p/wire" +) + +var ErrTxRejectedByPeer = errors.New("transaction rejected by peer") + +var _ multicast.MessageHandlerI = (*Multicaster)(nil) + +// Multicaster facilitates the transmission and reception of transaction and reject messages over multicast groups. +// +// Fields: +// - `messageCh`: Channel used to send status messages for transactions, such as acceptance or rejection. +// - `txGroup`: Multicast group for transmitting transaction (`MsgTx`) messages. +// - `rejectGroup`: Multicast group for receiving rejection (`MsgReject`) messages. +// +// Responsibilities: +// - Establishes and manages connections to multicast groups for sending and receiving blockchain transaction messages. +// - Handles the transmission of transactions and processes rejections received from the network. +// +// Key Methods: +// - `NewMulticaster`: Initializes a new `Multicaster` instance with specified multicast group addresses, network, and message channel. +// - `Connect`: Connects to both `txGroup` (for sending) and `rejectGroup` (for receiving). +// - `Disconnect`: Disconnects from all multicast groups, cleaning up resources. +// - `SendTx`: Encodes and sends a raw transaction to the multicast `txGroup`. +// - `OnReceive`: Processes messages received via the `rejectGroup` and updates the `messageCh` with rejection status. +// - `OnSend`: Processes messages sent via the `txGroup` and updates the `messageCh` with sent-to-network status. +// +// Behavior: +// - On receiving a `MsgReject` (`CmdReject`): +// 1. Extracts the rejection reason and transaction hash. +// 2. Sends a `TxStatusMessage` to the `messageCh` indicating rejection status. +// +// - On sending a `MsgTx` (`CmdTx`): +// 1. Extracts the transaction hash. +// 2. Sends a `TxStatusMessage` to the `messageCh` indicating the transaction was sent to the network. +// +// - Ignores unsupported or irrelevant message types for both sending and receiving. +type Multicaster struct { + logger *slog.Logger + messageCh chan<- *metamorph_p2p.TxStatusMessage + + txGroup *multicast.Group[*wire.MsgTx] + rejectGroup *multicast.Group[*wire.MsgReject] +} + +type GroupsAddresses struct { + McastTx string + McastReject string +} + +func NewMulticaster(l *slog.Logger, addresses GroupsAddresses, network wire.BitcoinNet, messageCh chan<- *metamorph_p2p.TxStatusMessage) *Multicaster { + m := Multicaster{ + logger: l.With("module", "multicaster"), + messageCh: messageCh, + } + + m.txGroup = multicast.NewGroup[*wire.MsgTx](l, &m, addresses.McastTx, multicast.Write, network) + m.rejectGroup = multicast.NewGroup[*wire.MsgReject](l, &m, addresses.McastReject, multicast.Read, network) + + return &m +} + +func (m *Multicaster) Connect() bool { + return m.txGroup.Connect() && m.rejectGroup.Connect() +} + +func (m *Multicaster) Disconnect() { + m.txGroup.Disconnect() + m.rejectGroup.Disconnect() +} + +func (m *Multicaster) SendTx(rawTx []byte) error { + tx, err := bsvutil.NewTxFromBytes(rawTx) + if err != nil { + // m.logger.Error("failed to parse tx", slog.String("rawHex", hex.EncodeToString(rawTx)), slog.String("err", err.Error())) + return err + } + + m.txGroup.WriteMsg(tx.MsgTx()) + return nil +} + +// OnReceive should be fire & forget +func (m *Multicaster) OnReceive(msg wire.Message) { + if msg.Command() == wire.CmdReject { + rejectMsg, ok := msg.(*wire.MsgReject) + if !ok { + return + } + + m.messageCh <- &metamorph_p2p.TxStatusMessage{ + Hash: &rejectMsg.Hash, + Status: metamorph_api.Status_REJECTED, + Peer: "Mcast REJECT", + Err: errors.Join(ErrTxRejectedByPeer, errors.New(rejectMsg.Reason)), + } + } + + // ignore other messages +} + +// OnSend should be fire & forget +func (m *Multicaster) OnSend(msg wire.Message) { + if msg.Command() == wire.CmdTx { + txMsg, ok := msg.(*wire.MsgTx) + if !ok { + return + } + + hash := txMsg.TxHash() + m.messageCh <- &metamorph_p2p.TxStatusMessage{ + Hash: &hash, + Status: metamorph_api.Status_SENT_TO_NETWORK, + Peer: "Mcast TX", + } + } + + // ignore other messages +} diff --git a/internal/metamorph/bcnet/mediator.go b/internal/metamorph/bcnet/mediator.go new file mode 100644 index 000000000..3b32d3857 --- /dev/null +++ b/internal/metamorph/bcnet/mediator.go @@ -0,0 +1,107 @@ +package bcnet + +import ( + "context" + "log/slog" + "runtime" + + "github.com/bitcoin-sv/arc/internal/metamorph/bcnet/mcast" + "github.com/bitcoin-sv/arc/internal/metamorph/store" + "github.com/bitcoin-sv/arc/internal/p2p" + "github.com/bitcoin-sv/arc/internal/tracing" + "github.com/libsv/go-p2p/wire" + "go.opentelemetry.io/otel/attribute" +) + +// Mediator acts as the central communication hub between metamorph processor and blockchain network, +// coordinating the interactions between the peer-to-peer messenger (p2p) and the multicast system (mcast). +// It is responsible for handling transactions and peer interactions depending on the operating mode (classic or hybrid). +// +// Fields: +// - `classic`: A flag indicating if the system is operating in classic mode (`true`) or hybrid mode (`false`). +// - `p2pMessenger`: The component responsible for managing peer-to-peer communications, including requesting and announcing transactions. +// - `mcaster`: The component responsible for sending transactions over multicast networks in hybrid mode. +// +// Methods: +// - `NewMediator`: Initializes a new `Mediator` with the specified logging, mode (classic or hybrid), +// p2p messenger, multicast system, and optional tracing attributes. +// - `AskForTxAsync`: Asynchronously requests a transaction by its hash from the network via P2P. +// - `AnnounceTxAsync`: Asynchronously announces a transaction to the network. +// In classic mode, it uses `p2pMessenger` to announce the transaction. In hybrid mode, it uses `mcaster` to send the transaction via multicast. +// +// Usage: +// - The `Mediator` abstracts the differences between classic (peer-to-peer) and hybrid (peer-to-peer and multicast) modes. +// - In classic mode, transactions are communicated directly with peers, whereas in hybrid mode, multicast groups are used for broader network communication. +// - Tracing functionality allows monitoring the flow of transactions and network requests, providing insights into system performance and behavior. +type Mediator struct { + logger *slog.Logger + classic bool + + p2pMessenger *p2p.NetworkMessenger + mcaster *mcast.Multicaster + + tracingEnabled bool + tracingAttributes []attribute.KeyValue +} + +type Option func(*Mediator) + +func WithTracer(attr ...attribute.KeyValue) Option { + return func(p *Mediator) { + p.tracingEnabled = true + if len(attr) > 0 { + p.tracingAttributes = append(p.tracingAttributes, attr...) + } + _, file, _, ok := runtime.Caller(1) + if ok { + p.tracingAttributes = append(p.tracingAttributes, attribute.String("file", file)) + } + } +} + +func NewMediator(l *slog.Logger, classic bool, messenger *p2p.NetworkMessenger, mcaster *mcast.Multicaster, opts ...Option) *Mediator { + mode := "classic" + if !classic { + mode = "hybrid" + } + + m := &Mediator{ + logger: l.With("mode", mode), + classic: classic, + p2pMessenger: messenger, + mcaster: mcaster, + } + + for _, opt := range opts { + opt(m) + } + + return m +} + +func (m *Mediator) AskForTxAsync(ctx context.Context, tx *store.Data) { + _, span := tracing.StartTracing(ctx, "AskForTxAsync", m.tracingEnabled, m.tracingAttributes...) + + m.p2pMessenger.RequestWithAutoBatch(tx.Hash, wire.InvTypeTx) + tracing.EndTracing(span, nil) +} + +func (m *Mediator) AnnounceTxAsync(ctx context.Context, tx *store.Data) { + _, span := tracing.StartTracing(ctx, "AskForTxAsync", m.tracingEnabled, m.tracingAttributes...) + + if m.classic { + m.p2pMessenger.AnnounceWithAutoBatch(tx.Hash, wire.InvTypeTx) + } else { + _ = m.mcaster.SendTx(tx.RawTx) + } + + tracing.EndTracing(span, nil) +} + +func (m *Mediator) GetPeers() []p2p.PeerI { + return m.p2pMessenger.GetPeers() +} + +func (m *Mediator) CountConnectedPeers() uint { + return m.p2pMessenger.CountConnectedPeers() +} diff --git a/internal/metamorph/bcnet/metamorph_p2p/hybrid_message_handler.go b/internal/metamorph/bcnet/metamorph_p2p/hybrid_message_handler.go new file mode 100644 index 000000000..5a52b083b --- /dev/null +++ b/internal/metamorph/bcnet/metamorph_p2p/hybrid_message_handler.go @@ -0,0 +1,84 @@ +package metamorph_p2p + +import ( + "log/slog" + + "github.com/bitcoin-sv/arc/internal/metamorph/metamorph_api" + "github.com/bitcoin-sv/arc/internal/p2p" + "github.com/libsv/go-p2p/wire" +) + +var _ p2p.MessageHandlerI = (*HybridMsgHandler)(nil) + +type HybridMsgHandler struct { + logger *slog.Logger + messageCh chan<- *TxStatusMessage +} + +func NewHybridMsgHandler(l *slog.Logger, messageCh chan<- *TxStatusMessage) *HybridMsgHandler { + return &HybridMsgHandler{ + logger: l.With( + slog.String("module", "peer-msg-handler"), + slog.String("mode", "hybrid"), + ), + + messageCh: messageCh, + } +} + +// OnReceive handles incoming messages depending on command type +func (h *HybridMsgHandler) OnReceive(msg wire.Message, peer p2p.PeerI) { + cmd := msg.Command() + switch cmd { + case wire.CmdInv: + h.handleReceivedInv(msg, peer) + + case wire.CmdTx: + h.handleReceivedTx(msg, peer) + + default: + // ignore other + } +} + +// OnSend handles outgoing messages depending on command type +func (h *HybridMsgHandler) OnSend(_ wire.Message, _ p2p.PeerI) { + // ignore +} + +func (h *HybridMsgHandler) handleReceivedInv(wireMsg wire.Message, peer p2p.PeerI) { + msg, ok := wireMsg.(*wire.MsgInv) + if !ok { + return + } + + go func() { + for _, iv := range msg.InvList { + if iv.Type == wire.InvTypeTx { + select { + case h.messageCh <- &TxStatusMessage{ + Hash: &iv.Hash, + Status: metamorph_api.Status_SEEN_ON_NETWORK, + Peer: peer.String(), + }: + default: // Ensure that writing to channel is non-blocking -- probably we should give up on this + } + } + // ignore INV with block or error + } + }() +} + +func (h *HybridMsgHandler) handleReceivedTx(wireMsg wire.Message, peer p2p.PeerI) { + msg, ok := wireMsg.(*wire.MsgTx) + if !ok { + return + } + + hash := msg.TxHash() + h.messageCh <- &TxStatusMessage{ + Hash: &hash, + Status: metamorph_api.Status_SEEN_ON_NETWORK, + Peer: peer.String(), + } +} diff --git a/internal/metamorph/bcnet/metamorph_p2p/message_handler.go b/internal/metamorph/bcnet/metamorph_p2p/message_handler.go index 36e0164b9..18c6b5378 100644 --- a/internal/metamorph/bcnet/metamorph_p2p/message_handler.go +++ b/internal/metamorph/bcnet/metamorph_p2p/message_handler.go @@ -37,7 +37,10 @@ type MsgHandler struct { func NewMsgHandler(l *slog.Logger, s store.MetamorphStore, messageCh chan<- *TxStatusMessage) *MsgHandler { ph := &MsgHandler{ - logger: l, + logger: l.With( + slog.String("module", "peer-msg-handler"), + slog.String("mode", "classic"), + ), store: s, messageCh: messageCh, } diff --git a/internal/metamorph/integration_test/double_spend_integration_test.go b/internal/metamorph/integration_test/double_spend_integration_test.go index 5a57e73e3..12fcab68c 100644 --- a/internal/metamorph/integration_test/double_spend_integration_test.go +++ b/internal/metamorph/integration_test/double_spend_integration_test.go @@ -30,11 +30,11 @@ import ( "github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api" "github.com/bitcoin-sv/arc/internal/cache" "github.com/bitcoin-sv/arc/internal/metamorph" + "github.com/bitcoin-sv/arc/internal/metamorph/bcnet" "github.com/bitcoin-sv/arc/internal/metamorph/bcnet/metamorph_p2p" "github.com/bitcoin-sv/arc/internal/metamorph/metamorph_api" "github.com/bitcoin-sv/arc/internal/metamorph/mocks" "github.com/bitcoin-sv/arc/internal/metamorph/store/postgresql" - "github.com/bitcoin-sv/arc/internal/p2p" testutils "github.com/bitcoin-sv/arc/internal/test_utils" ) @@ -83,7 +83,7 @@ func TestDoubleSpendDetection(t *testing.T) { require.NoError(t, err) } - pm := &p2p.NetworkMessenger{} + pm := &bcnet.Mediator{} processor, err := metamorph.NewProcessor(metamorphStore, cStore, pm, statusMessageChannel, metamorph.WithMinedTxsChan(minedTxChannel), diff --git a/internal/metamorph/integration_test/processor_integration_test.go b/internal/metamorph/integration_test/processor_integration_test.go index 1afd3c3e1..258ebf03e 100644 --- a/internal/metamorph/integration_test/processor_integration_test.go +++ b/internal/metamorph/integration_test/processor_integration_test.go @@ -10,6 +10,7 @@ import ( "github.com/bitcoin-sv/arc/internal/message_queue/nats/client/nats_core" nats_mocks "github.com/bitcoin-sv/arc/internal/message_queue/nats/client/nats_core/mocks" "github.com/bitcoin-sv/arc/internal/metamorph" + "github.com/bitcoin-sv/arc/internal/metamorph/bcnet" "github.com/bitcoin-sv/arc/internal/metamorph/bcnet/metamorph_p2p" "github.com/bitcoin-sv/arc/internal/metamorph/metamorph_api" "github.com/bitcoin-sv/arc/internal/metamorph/store" @@ -46,6 +47,8 @@ func TestProcessor(t *testing.T) { require.NoError(t, err) messenger := p2p.NewNetworkMessenger(slog.Default(), pm) + defer messenger.Shutdown() + mediator := bcnet.NewMediator(slog.Default(), true, messenger, nil) natsMock := &nats_mocks.NatsConnectionMock{ DrainFunc: func() error { @@ -58,7 +61,7 @@ func TestProcessor(t *testing.T) { natsQueue := nats_core.New(natsMock) statusMessageChannel := make(chan *metamorph_p2p.TxStatusMessage, 10) - sut, err := metamorph.NewProcessor(mtmStore, cacheStore, messenger, statusMessageChannel, + sut, err := metamorph.NewProcessor(mtmStore, cacheStore, mediator, statusMessageChannel, metamorph.WithProcessStatusUpdatesInterval(200*time.Millisecond), metamorph.WithMessageQueueClient(natsQueue), ) diff --git a/internal/metamorph/processor.go b/internal/metamorph/processor.go index 8a0a2c017..a15049450 100644 --- a/internal/metamorph/processor.go +++ b/internal/metamorph/processor.go @@ -10,12 +10,12 @@ import ( "time" "github.com/libsv/go-p2p/chaincfg/chainhash" - "github.com/libsv/go-p2p/wire" "go.opentelemetry.io/otel/attribute" "google.golang.org/protobuf/proto" "github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api" "github.com/bitcoin-sv/arc/internal/cache" + "github.com/bitcoin-sv/arc/internal/metamorph/bcnet" metamorph_p2p "github.com/bitcoin-sv/arc/internal/metamorph/bcnet/metamorph_p2p" "github.com/bitcoin-sv/arc/internal/metamorph/metamorph_api" "github.com/bitcoin-sv/arc/internal/metamorph/store" @@ -65,7 +65,7 @@ type Processor struct { store store.MetamorphStore cacheStore cache.Store hostname string - messenger *p2p.NetworkMessenger + bcMediator *bcnet.Mediator mqClient MessageQueue logger *slog.Logger mapExpiryTime time.Duration @@ -115,12 +115,12 @@ type CallbackSender interface { SendCallback(ctx context.Context, data *store.Data) } -func NewProcessor(s store.MetamorphStore, c cache.Store, messenger *p2p.NetworkMessenger, statusMessageChannel chan *metamorph_p2p.TxStatusMessage, opts ...Option) (*Processor, error) { +func NewProcessor(s store.MetamorphStore, c cache.Store, bcMediator *bcnet.Mediator, statusMessageChannel chan *metamorph_p2p.TxStatusMessage, opts ...Option) (*Processor, error) { if s == nil { return nil, ErrStoreNil } - if messenger == nil { + if bcMediator == nil { return nil, ErrPeerMessengerNil } @@ -133,7 +133,7 @@ func NewProcessor(s store.MetamorphStore, c cache.Store, messenger *p2p.NetworkM store: s, cacheStore: c, hostname: hostname, - messenger: messenger, + bcMediator: bcMediator, mapExpiryTime: mapExpiryTimeDefault, recheckSeenFromAgo: recheckSeenFromAgo, recheckSeenUntilAgo: recheckSeenUntilAgoDefault, @@ -706,13 +706,13 @@ func (p *Processor) StartProcessExpiredTransactions() { if tx.Retries%2 != 0 { // Send GETDATA to peers to see if they have it p.logger.Debug("Re-getting expired tx", slog.String("hash", tx.Hash.String())) - p.messenger.RequestWithAutoBatch(tx.Hash, wire.InvTypeTx) + p.bcMediator.AskForTxAsync(ctx, tx) requested++ continue } p.logger.Debug("Re-announcing expired tx", slog.String("hash", tx.Hash.String())) - p.messenger.AnnounceWithAutoBatch(tx.Hash, wire.InvTypeTx) + p.bcMediator.AnnounceTxAsync(ctx, tx) announced++ } } @@ -729,7 +729,7 @@ func (p *Processor) StartProcessExpiredTransactions() { // GetPeers returns a list of connected and disconnected peers func (p *Processor) GetPeers() []p2p.PeerI { - return p.messenger.GetPeers() + return p.bcMediator.GetPeers() } func (p *Processor) ProcessTransaction(ctx context.Context, req *ProcessorRequest) { @@ -791,18 +791,18 @@ func (p *Processor) ProcessTransaction(ctx context.Context, req *ProcessorReques return } - // register transaction in blocktx using message queue - if err = p.mqClient.Publish(ctx, RegisterTxTopic, req.Data.Hash[:]); err != nil { - p.logger.Error("failed to register tx in blocktx", slog.String("hash", req.Data.Hash.String()), slog.String("err", err.Error())) - } - // broadcast that transaction is stored to client statusResponse.UpdateStatus(StatusAndError{ Status: metamorph_api.Status_STORED, }) + // register transaction in blocktx using message queue + if err = p.mqClient.Publish(ctx, RegisterTxTopic, req.Data.Hash[:]); err != nil { + p.logger.Error("failed to register tx in blocktx", slog.String("hash", req.Data.Hash.String()), slog.String("err", err.Error())) + } + // Add this transaction to the map of transactions that client is listening to with open connection - ctx, responseProcessorAddSpan := tracing.StartTracing(ctx, "responseProcessor.Add", p.tracingEnabled, p.tracingAttributes...) + _, responseProcessorAddSpan := tracing.StartTracing(ctx, "responseProcessor.Add", p.tracingEnabled, p.tracingAttributes...) p.responseProcessor.Add(statusResponse) tracing.EndTracing(responseProcessorAddSpan, nil) @@ -813,15 +813,9 @@ func (p *Processor) ProcessTransaction(ctx context.Context, req *ProcessorReques // don't return here, because the transaction will try to be added to cache again when re-broadcasting unmined txs } - // Send GETDATA to peers to see if they have it - _, requestTransactionSpan := tracing.StartTracing(ctx, "RequestWithAutoBatch", p.tracingEnabled, p.tracingAttributes...) - p.messenger.RequestWithAutoBatch(req.Data.Hash, wire.InvTypeTx) - tracing.EndTracing(requestTransactionSpan, nil) - - // Announce transaction to network peers - _, announceTransactionSpan := tracing.StartTracing(ctx, "AnnounceWithAutoBatch", p.tracingEnabled, p.tracingAttributes...) - p.messenger.AnnounceWithAutoBatch(req.Data.Hash, wire.InvTypeTx) - tracing.EndTracing(announceTransactionSpan, nil) + // ask network about the tx to see if they have it + p.bcMediator.AskForTxAsync(ctx, req.Data) + p.bcMediator.AnnounceTxAsync(ctx, req.Data) // update status in response statusResponse.UpdateStatus(StatusAndError{ @@ -863,8 +857,7 @@ func (p *Processor) ProcessTransactions(ctx context.Context, sReq []*store.Data) p.logger.Error("Failed to register tx in blocktx", slog.String("hash", data.Hash.String()), slog.String("err", err.Error())) } - // Announce transaction to network and save peers - p.messenger.AnnounceWithAutoBatch(data.Hash, wire.InvTypeTx) + p.bcMediator.AnnounceTxAsync(ctx, data) // update status in storage p.storageStatusUpdateCh <- store.UpdateStatus{ @@ -876,7 +869,7 @@ func (p *Processor) ProcessTransactions(ctx context.Context, sReq []*store.Data) } func (p *Processor) Health() error { - healthyConnections := int(p.messenger.CountConnectedPeers()) // #nosec G115 + healthyConnections := int(p.bcMediator.CountConnectedPeers()) // #nosec G115 if healthyConnections < p.minimumHealthyConnections { p.logger.Warn("Less than expected healthy peers", slog.Int("connections", healthyConnections)) diff --git a/internal/metamorph/processor_test.go b/internal/metamorph/processor_test.go index e863c285f..b5232ad4c 100644 --- a/internal/metamorph/processor_test.go +++ b/internal/metamorph/processor_test.go @@ -21,6 +21,7 @@ import ( "github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api" "github.com/bitcoin-sv/arc/internal/cache" "github.com/bitcoin-sv/arc/internal/metamorph" + "github.com/bitcoin-sv/arc/internal/metamorph/bcnet" "github.com/bitcoin-sv/arc/internal/metamorph/bcnet/metamorph_p2p" "github.com/bitcoin-sv/arc/internal/metamorph/metamorph_api" "github.com/bitcoin-sv/arc/internal/metamorph/mocks" @@ -39,13 +40,13 @@ func TestNewProcessor(t *testing.T) { SetUnlockedByNameFunc: func(_ context.Context, _ string) (int64, error) { return 0, nil }, } - nMessenger := &p2p.NetworkMessenger{} + nMessenger := &bcnet.Mediator{} cStore := cache.NewMemoryStore() tt := []struct { name string store store.MetamorphStore - messenger *p2p.NetworkMessenger + messenger *bcnet.Mediator expectedError error expectedNonNilProcessor bool @@ -127,7 +128,7 @@ func TestStartLockTransactions(t *testing.T) { SetUnlockedByNameFunc: func(_ context.Context, _ string) (int64, error) { return 0, nil }, } - messenger := &p2p.NetworkMessenger{} + messenger := &bcnet.Mediator{} cStore := cache.NewMemoryStore() // when @@ -252,6 +253,8 @@ func TestProcessTransaction(t *testing.T) { require.NoError(t, err) messenger := p2p.NewNetworkMessenger(slog.Default(), pm) + defer messenger.Shutdown() + mediator := bcnet.NewMediator(slog.Default(), true, messenger, nil) publisher := &mocks.MessageQueueMock{ PublishFunc: func(_ context.Context, _ string, _ []byte) error { @@ -259,7 +262,7 @@ func TestProcessTransaction(t *testing.T) { }, } - sut, err := metamorph.NewProcessor(s, cStore, messenger, nil, metamorph.WithMessageQueueClient(publisher)) + sut, err := metamorph.NewProcessor(s, cStore, mediator, nil, metamorph.WithMessageQueueClient(publisher)) require.NoError(t, err) require.Equal(t, 0, sut.GetProcessorMapSize()) @@ -531,7 +534,7 @@ func TestStartSendStatusForTransaction(t *testing.T) { }, } - messenger := &p2p.NetworkMessenger{} + messenger := &bcnet.Mediator{} cStore := cache.NewMemoryStore() for _, i := range tc.inputs { if i.registered { @@ -712,7 +715,7 @@ func TestStartProcessSubmittedTxs(t *testing.T) { err := pm.AddPeer(peer) require.NoError(t, err) - messenger := p2p.NewNetworkMessenger(slog.Default(), pm) + messenger := bcnet.NewMediator(slog.Default(), true, p2p.NewNetworkMessenger(slog.Default(), pm), nil) publisher := &mocks.MessageQueueMock{ PublishFunc: func(_ context.Context, _ string, _ []byte) error { @@ -851,7 +854,7 @@ func TestProcessExpiredTransactions(t *testing.T) { err := pm.AddPeer(peer) require.NoError(t, err) - messenger := p2p.NewNetworkMessenger(slog.Default(), pm) + messenger := bcnet.NewMediator(slog.Default(), true, p2p.NewNetworkMessenger(slog.Default(), pm), nil) publisher := &mocks.MessageQueueMock{ PublishFunc: func(_ context.Context, _ string, _ []byte) error { @@ -935,7 +938,7 @@ func TestStartProcessMinedCallbacks(t *testing.T) { }, SetUnlockedByNameFunc: func(_ context.Context, _ string) (int64, error) { return 0, nil }, } - pm := &p2p.NetworkMessenger{} + pm := &bcnet.Mediator{} minedTxsChan := make(chan *blocktx_api.TransactionBlock, 5) mqClient := &mocks.MessageQueueMock{ @@ -1032,7 +1035,7 @@ func TestProcessorHealth(t *testing.T) { } cStore := cache.NewMemoryStore() - messenger := p2p.NewNetworkMessenger(slog.Default(), pm) + messenger := bcnet.NewMediator(slog.Default(), true, p2p.NewNetworkMessenger(slog.Default(), pm), nil) sut, err := metamorph.NewProcessor(metamorphStore, cStore, messenger, nil, metamorph.WithProcessExpiredTxsInterval(time.Millisecond*20), @@ -1086,7 +1089,7 @@ func TestStart(t *testing.T) { return 0, nil }} - pm := &p2p.NetworkMessenger{} + pm := &bcnet.Mediator{} cStore := cache.NewMemoryStore() diff --git a/internal/metamorph/stats_collector_test.go b/internal/metamorph/stats_collector_test.go index bc09069a5..74e47073c 100644 --- a/internal/metamorph/stats_collector_test.go +++ b/internal/metamorph/stats_collector_test.go @@ -8,9 +8,8 @@ import ( "testing" "time" - "github.com/bitcoin-sv/arc/internal/p2p" - "github.com/bitcoin-sv/arc/internal/metamorph" + "github.com/bitcoin-sv/arc/internal/metamorph/bcnet" "github.com/bitcoin-sv/arc/internal/metamorph/store" storeMocks "github.com/bitcoin-sv/arc/internal/metamorph/store/mocks" "github.com/stretchr/testify/require" @@ -50,7 +49,7 @@ func TestStartCollectStats(t *testing.T) { SetUnlockedByNameFunc: func(_ context.Context, _ string) (int64, error) { return 0, nil }, } - messenger := &p2p.NetworkMessenger{} + messenger := &bcnet.Mediator{} processor, err := metamorph.NewProcessor(mtmStore, nil, messenger, nil, metamorph.WithProcessorLogger(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: metamorph.LogLevelDefault}))), diff --git a/internal/multicast/group.go b/internal/multicast/group.go new file mode 100644 index 000000000..6f0bc860a --- /dev/null +++ b/internal/multicast/group.go @@ -0,0 +1,281 @@ +package multicast + +import ( + "context" + "errors" + "log/slog" + "net" + "sync" + "sync/atomic" + + "github.com/bitcoin-sv/arc/internal/p2p" + "github.com/libsv/go-p2p/wire" + "golang.org/x/net/ipv6" +) + +// WORK IN PROGRESS +// +// +// The Group structure represents a multicast communication group for exchanging messages +// over a network. It is designed to support both reading and writing messages through the +// IPv6 multicast group. The Group abstracts the complexities of managing connections, handling +// messages, and interfacing with the multicast group. Key features include: +// +// - **Multicast Address Management:** Supports setting up multicast UDP connections, +// joining groups, and resolving addresses. +// - **Mode Configurability:** Allows configuring the group in read-only, write-only, or +// read-write mode through the `ModeFlag`. +// - **Transparent Communication:** Ensures seamless handling of protocol-specific +// messages via handlers and encapsulated logic. +// - **Error Handling and Logging:** Integrates structured logging for diagnostics and error +// tracking, aiding maintainability and debugging. + +type Group[T wire.Message] struct { + execWg sync.WaitGroup + execCtx context.Context + cancelExecCtx context.CancelFunc + + startMu sync.Mutex + connected atomic.Bool + + addr string + mode ModeFlag + + network wire.BitcoinNet + maxMsgSize int64 + readBuffSize int + + mcastConn *ipv6ConnAdapter + writeCh chan T + + logger *slog.Logger + mh MessageHandlerI +} + +// MessageHandlerI is an interface that defines the contract for handling messages +// in the multicast group communication. It provides two primary methods: +// +// - **OnReceive(msg wire.Message):** Triggered when a message is received from the +// multicast group. This method should be implemented to handle received messages in +// a fire-and-forget manner, ensuring that processing does not block the main communication flow. +// +// - **OnSend(msg wire.Message):** Triggered when a message is successfully sent to the +// multicast group. This method allows implementing custom actions or logging after +// message transmission, also in a fire-and-forget manner. +// +// By defining this interface, the Group structure decouples the message handling logic +// from the underlying communication mechanism, providing extensibility and modularity. +type MessageHandlerI interface { + OnReceive(msg wire.Message) + OnSend(msg wire.Message) +} + +type ModeFlag uint8 + +const ( + Read ModeFlag = 1 << iota + Write +) + +func (flag ModeFlag) Has(v ModeFlag) bool { + return v&flag != 0 +} + +func NewGroup[T wire.Message](l *slog.Logger, mh MessageHandlerI, addr string, mode ModeFlag, network wire.BitcoinNet /*TODO: add opts*/) *Group[T] { + var tmp T + l = l.With( + slog.String("module", "mcast-group"), + slog.Group("mcast", + slog.String("network", network.String()), + slog.String("cmd", tmp.Command()), + slog.String("address", addr), + ), + ) + + g := Group[T]{ + logger: l, + mh: mh, + + addr: addr, + mode: mode, + + network: network, + maxMsgSize: 32 * 1024 * 1024, + readBuffSize: 4096, + } + + if mode.Has(Write) && g.writeCh == nil { + g.writeCh = make(chan T, 256) + } + + return &g +} + +func (g *Group[T]) Connect() bool { + g.startMu.Lock() + defer g.startMu.Unlock() + + if g.connected.Load() { + g.logger.Warn("Unexpected Connect() call. Group is connected already.") + return true + } + + return g.connect() +} + +func (g *Group[T]) Disconnect() { + g.startMu.Lock() + defer g.startMu.Unlock() + + if !g.connected.Load() { + return + } + + g.disconnect() +} + +func (g *Group[T]) WriteMsg(msg T) { + if !g.mode.Has(Write) { + panic("Cannot write to group in read-only mode") + } + + g.writeCh <- msg +} + +func (g *Group[T]) connect() bool { + g.logger.Info("Connecting") + + ctx, cancelFn := context.WithCancel(context.Background()) + g.execCtx = ctx + g.cancelExecCtx = cancelFn + + udpAddr, err := net.ResolveUDPAddr("udp6", g.addr) + if err != nil { + g.logger.Error("Cannot resolve UDP address", slog.String("err", err.Error())) + return false + } + + conn, err := net.ListenPacket("udp6", udpAddr.String()) + if err != nil { + g.logger.Error("Failed to dial node", slog.String("err", err.Error())) + return false + } + + pConn := ipv6.NewPacketConn(conn) + g.mcastConn = &ipv6ConnAdapter{Conn: pConn} + + if g.mode.Has(Read) { + g.logger.Info("Join to multicast group") + err = pConn.JoinGroup(nil, udpAddr) // TODO: define net interface + if err != nil { + g.logger.Error("Failed to join mcast group", slog.String("err", err.Error())) + return false + } + + g.listenForMessages() + } + + if g.mode.Has(Write) { + g.mcastConn.dst = udpAddr + g.sendMessages() + } + + g.connected.Store(true) + g.logger.Info("Ready") + return true +} + +func (g *Group[T]) disconnect() { + g.logger.Info("Disconnecting") + + g.cancelExecCtx() + g.execWg.Wait() + + if g.mode.Has(Read) { + udpAddr, _ := net.ResolveUDPAddr("udp6", g.addr) + _ = g.mcastConn.Conn.LeaveGroup(nil, udpAddr) // TODO: define net interface + } + + _ = g.mcastConn.Conn.Close() + g.mcastConn = nil + g.execCtx = nil + g.cancelExecCtx = nil + + g.connected.Store(false) + g.logger.Info("Disconnected") +} + +func (g *Group[T]) listenForMessages() { + g.execWg.Add(1) + + go func() { + g.logger.Debug("Start listen handler") + defer g.logger.Debug("Stop listen handler") + defer g.execWg.Done() + + var tmp T + expectedCmd := tmp.Command() + + reader := p2p.NewWireReaderSize(g.mcastConn, g.maxMsgSize, g.readBuffSize) + for { + msg, err := reader.ReadNextMsg(g.execCtx, wire.ProtocolVersion, g.network) + if err != nil { + if errors.Is(err, context.Canceled) { + g.logger.Debug("Stop listen handler") + return + } + + // TODO: think how to handle read errors + + g.logger.Error("Read failed", slog.String("err", err.Error())) + // stop group + //p.unhealthyDisconnect() -- auto reconnect? + return + } + + cmd := msg.Command() + if cmd != expectedCmd { + g.logger.Warn("Unexpected message type from mcast group. Message ignored", slog.String("cmd", cmd)) + continue + } + + g.logger.Log(context.Background(), slogLvlTrace, "Received message") + g.mh.OnReceive(msg) + } + }() +} + +func (g *Group[T]) sendMessages() { + g.execWg.Add(1) + + go func() { + g.logger.Debug("Start send handler") + defer g.execWg.Done() + + for { + select { + case <-g.execCtx.Done(): + g.logger.Debug("Stop send handler") + return + + case msg := <-g.writeCh: + // do not retry + err := wire.WriteMessage(g.mcastConn, msg, wire.ProtocolVersion, g.network) + if err != nil { + // TODO: think how to handle send errors + g.logger.Error("Failed to send message", slog.String("err", err.Error())) + + // // stop group + // p.unhealthyDisconnect() + return + } + + g.logger.Log(context.Background(), slogLvlTrace, "Sent message") + // let client react on sending msg + g.mh.OnSend(msg) + } + } + }() +} + +const slogLvlTrace slog.Level = slog.LevelDebug - 4 diff --git a/internal/multicast/ipv6conn_adapter.go b/internal/multicast/ipv6conn_adapter.go new file mode 100644 index 000000000..b6ad07b7a --- /dev/null +++ b/internal/multicast/ipv6conn_adapter.go @@ -0,0 +1,27 @@ +package multicast + +import ( + "io" + "net" + + "golang.org/x/net/ipv6" +) + +var ( + _ io.Reader = (*ipv6ConnAdapter)(nil) + _ io.Writer = (*ipv6ConnAdapter)(nil) +) + +type ipv6ConnAdapter struct { + Conn *ipv6.PacketConn + dst *net.UDPAddr +} + +func (c *ipv6ConnAdapter) Read(b []byte) (n int, err error) { + n, _, _, err = c.Conn.ReadFrom(b) + return +} + +func (c *ipv6ConnAdapter) Write(b []byte) (n int, err error) { + return c.Conn.WriteTo(b, nil, c.dst) +} diff --git a/internal/multicast/mocks/message_handler_mock.go b/internal/multicast/mocks/message_handler_mock.go new file mode 100644 index 000000000..64da79f87 --- /dev/null +++ b/internal/multicast/mocks/message_handler_mock.go @@ -0,0 +1,120 @@ +// Code generated by moq; DO NOT EDIT. +// github.com/matryer/moq + +package mocks + +import ( + "github.com/bitcoin-sv/arc/internal/multicast" + "github.com/libsv/go-p2p/wire" + "sync" +) + +// Ensure, that MessageHandlerIMock does implement multicast.MessageHandlerI. +// If this is not the case, regenerate this file with moq. +var _ multicast.MessageHandlerI = &MessageHandlerIMock{} + +// MessageHandlerIMock is a mock implementation of multicast.MessageHandlerI. +// +// func TestSomethingThatUsesMessageHandlerI(t *testing.T) { +// +// // make and configure a mocked multicast.MessageHandlerI +// mockedMessageHandlerI := &MessageHandlerIMock{ +// OnReceiveFunc: func(msg wire.Message) { +// panic("mock out the OnReceive method") +// }, +// OnSendFunc: func(msg wire.Message) { +// panic("mock out the OnSend method") +// }, +// } +// +// // use mockedMessageHandlerI in code that requires multicast.MessageHandlerI +// // and then make assertions. +// +// } +type MessageHandlerIMock struct { + // OnReceiveFunc mocks the OnReceive method. + OnReceiveFunc func(msg wire.Message) + + // OnSendFunc mocks the OnSend method. + OnSendFunc func(msg wire.Message) + + // calls tracks calls to the methods. + calls struct { + // OnReceive holds details about calls to the OnReceive method. + OnReceive []struct { + // Msg is the msg argument value. + Msg wire.Message + } + // OnSend holds details about calls to the OnSend method. + OnSend []struct { + // Msg is the msg argument value. + Msg wire.Message + } + } + lockOnReceive sync.RWMutex + lockOnSend sync.RWMutex +} + +// OnReceive calls OnReceiveFunc. +func (mock *MessageHandlerIMock) OnReceive(msg wire.Message) { + if mock.OnReceiveFunc == nil { + panic("MessageHandlerIMock.OnReceiveFunc: method is nil but MessageHandlerI.OnReceive was just called") + } + callInfo := struct { + Msg wire.Message + }{ + Msg: msg, + } + mock.lockOnReceive.Lock() + mock.calls.OnReceive = append(mock.calls.OnReceive, callInfo) + mock.lockOnReceive.Unlock() + mock.OnReceiveFunc(msg) +} + +// OnReceiveCalls gets all the calls that were made to OnReceive. +// Check the length with: +// +// len(mockedMessageHandlerI.OnReceiveCalls()) +func (mock *MessageHandlerIMock) OnReceiveCalls() []struct { + Msg wire.Message +} { + var calls []struct { + Msg wire.Message + } + mock.lockOnReceive.RLock() + calls = mock.calls.OnReceive + mock.lockOnReceive.RUnlock() + return calls +} + +// OnSend calls OnSendFunc. +func (mock *MessageHandlerIMock) OnSend(msg wire.Message) { + if mock.OnSendFunc == nil { + panic("MessageHandlerIMock.OnSendFunc: method is nil but MessageHandlerI.OnSend was just called") + } + callInfo := struct { + Msg wire.Message + }{ + Msg: msg, + } + mock.lockOnSend.Lock() + mock.calls.OnSend = append(mock.calls.OnSend, callInfo) + mock.lockOnSend.Unlock() + mock.OnSendFunc(msg) +} + +// OnSendCalls gets all the calls that were made to OnSend. +// Check the length with: +// +// len(mockedMessageHandlerI.OnSendCalls()) +func (mock *MessageHandlerIMock) OnSendCalls() []struct { + Msg wire.Message +} { + var calls []struct { + Msg wire.Message + } + mock.lockOnSend.RLock() + calls = mock.calls.OnSend + mock.lockOnSend.RUnlock() + return calls +} diff --git a/internal/multicast/multicast_mocks.go b/internal/multicast/multicast_mocks.go new file mode 100644 index 000000000..f7a95fdb2 --- /dev/null +++ b/internal/multicast/multicast_mocks.go @@ -0,0 +1,3 @@ +package multicast + +//go:generate moq -pkg mocks -out ./mocks/message_handler_mock.go ./ MessageHandlerI diff --git a/internal/multicast/tests/group_test.go b/internal/multicast/tests/group_test.go new file mode 100644 index 000000000..be0e0f406 --- /dev/null +++ b/internal/multicast/tests/group_test.go @@ -0,0 +1,45 @@ +package multicast_test + +import ( + "log/slog" + "testing" + "time" + + "github.com/bitcoin-sv/arc/internal/multicast" + "github.com/bitcoin-sv/arc/internal/multicast/mocks" + "github.com/libsv/go-p2p/wire" + "github.com/stretchr/testify/require" +) + +var ( + addr = "[ff02::1]:1234" + bcNet = wire.TestNet +) + +func TestGroupCommunication(t *testing.T) { + // given + lMsgHandler := &mocks.MessageHandlerIMock{OnReceiveFunc: func(_ wire.Message) {}} + listener := multicast.NewGroup[*wire.MsgPing](slog.Default(), lMsgHandler, addr, multicast.Read, bcNet) + require.True(t, listener.Connect()) + defer listener.Disconnect() + + wMsgHandler := &mocks.MessageHandlerIMock{OnSendFunc: func(_ wire.Message) {}} + writer := multicast.NewGroup[*wire.MsgPing](slog.Default(), wMsgHandler, addr, multicast.Write, bcNet) + require.True(t, writer.Connect()) + defer writer.Disconnect() + + msg := wire.NewMsgPing(825906425) + + // when + writer.WriteMsg(msg) + time.Sleep(200 * time.Millisecond) + + // then + sentMsgs := wMsgHandler.OnSendCalls() + require.Len(t, sentMsgs, 1, "writer didn't send message") + require.Equal(t, msg, (sentMsgs[0].Msg).(*wire.MsgPing)) + + receivedMsgs := lMsgHandler.OnReceiveCalls() + require.Len(t, receivedMsgs, 1, "listener didn't receive message") + require.Equal(t, msg, (receivedMsgs[0].Msg).(*wire.MsgPing)) +} diff --git a/test/config/config.yaml b/test/config/config.yaml index 5c97d5321..dbf4b0ae3 100644 --- a/test/config/config.yaml +++ b/test/config/config.yaml @@ -7,7 +7,6 @@ prometheus: endpoint: /metrics addr: :2112 grpcMessageSize: 100000000 -network: regtest messageQueue: streaming: enabled: true @@ -24,23 +23,6 @@ peerRpc: host: node1 port: 18332 -broadcasting: - mode: unicast - unicast: - peers: - - host: node1 - port: - p2p: 18333 - zmq: 28332 - - host: node2 - port: - p2p: 18333 - zmq: 28332 - - host: node3 - port: - p2p: 18333 - zmq: 28332 - cache: engine: redis redis: @@ -65,8 +47,6 @@ metamorph: processorCacheExpiryTime: 24h maxRetries: 1000 processStatusUpdateInterval: 50ms - checkSeenOnNetworkOlderThan: 3h - checkSeenOnNetworkPeriod: 4h monitorPeers: true checkUtxos: false profilerAddr: localhost:9992 @@ -77,6 +57,22 @@ metamorph: stats: notSeenTimeLimit: 10m notFinalTimeLimit: 20m + bcnet: + mode: classic + network: regtest + peers: + - host: node1 + port: + p2p: 18333 + zmq: 28332 + - host: node2 + port: + p2p: 18333 + zmq: 28332 + - host: node3 + port: + p2p: 18333 + zmq: 28332 blocktx: listenAddr: 0.0.0.0:8011 @@ -100,6 +96,22 @@ blocktx: enabled: true interval: 1s maxAllowedBlockHeightMismatch: 3 + bcnet: + mode: classic + network: regtest + peers: + - host: node1 + port: + p2p: 18333 + zmq: 28332 + - host: node2 + port: + p2p: 18333 + zmq: 28332 + - host: node3 + port: + p2p: 18333 + zmq: 28332 api: address: 0.0.0.0:9090