diff --git a/cmd/arc/services/blocktx.go b/cmd/arc/services/blocktx.go index 43276c919..1c9f6a9da 100644 --- a/cmd/arc/services/blocktx.go +++ b/cmd/arc/services/blocktx.go @@ -6,7 +6,6 @@ import ( "os" "time" - "github.com/libsv/go-p2p" "go.opentelemetry.io/otel/attribute" "github.com/bitcoin-sv/arc/internal/grpc_opts" @@ -17,8 +16,11 @@ import ( "github.com/bitcoin-sv/arc/config" "github.com/bitcoin-sv/arc/internal/blocktx" + blockchain "github.com/bitcoin-sv/arc/internal/blocktx/blockchain_communication" + blocktx_p2p "github.com/bitcoin-sv/arc/internal/blocktx/blockchain_communication/p2p" "github.com/bitcoin-sv/arc/internal/blocktx/store" "github.com/bitcoin-sv/arc/internal/blocktx/store/postgresql" + "github.com/bitcoin-sv/arc/internal/p2p" "github.com/bitcoin-sv/arc/internal/version" ) @@ -37,7 +39,7 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err blockStore store.BlocktxStore mqClient blocktx.MessageQueueClient processor *blocktx.Processor - pm p2p.PeerManagerI + pm *p2p.PeerManager server *blocktx.Server healthServer *grpc_opts.GrpcServer workers *blocktx.BackgroundWorkers @@ -125,8 +127,8 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err blocktx.WithMessageQueueClient(mqClient), ) - blockRequestCh := make(chan blocktx.BlockRequest, blockProcessingBuffer) - blockProcessCh := make(chan *p2p.BlockMessage, blockProcessingBuffer) + blockRequestCh := make(chan blocktx_p2p.BlockRequest, blockProcessingBuffer) + blockProcessCh := make(chan *blockchain.BlockMessage, blockProcessingBuffer) processor, err = blocktx.NewProcessor(logger, blockStore, blockRequestCh, blockProcessCh, processorOpts...) if err != nil { @@ -140,17 +142,10 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err return nil, fmt.Errorf("failed to start peer handler: %v", err) } - peerOpts := []p2p.PeerOptions{ - p2p.WithMaximumMessageSize(maximumBlockSize), - p2p.WithRetryReadWriteMessageInterval(5 * time.Second), - p2p.WithPingInterval(30*time.Second, 1*time.Minute), - } - - if version.Version != "" { - peerOpts = append(peerOpts, p2p.WithUserAgent("ARC", version.Version)) - } + // p2p global setting + p2p.SetExcessiveBlockSize(maximumBlockSize) - pmOpts := []p2p.PeerManagerOptions{p2p.WithExcessiveBlockSize(maximumBlockSize)} + pmOpts := []p2p.PeerManagerOptions{} if arcConfig.Metamorph.MonitorPeers { pmOpts = append(pmOpts, p2p.WithRestartUnhealthyPeers()) } @@ -158,7 +153,16 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err pm = p2p.NewPeerManager(logger.With(slog.String("module", "peer-mng")), network, pmOpts...) peers := make([]p2p.PeerI, len(arcConfig.Broadcasting.Unicast.Peers)) - peerHandler := blocktx.NewPeerHandler(logger, blockRequestCh, blockProcessCh) + peerHandler := blocktx_p2p.NewMsgHandler(logger, blockRequestCh, blockProcessCh) + + peerOpts := []p2p.PeerOptions{ + p2p.WithMaximumMessageSize(maximumBlockSize), + p2p.WithPingInterval(30*time.Second, 1*time.Minute), + } + + if version.Version != "" { + peerOpts = append(peerOpts, p2p.WithUserAgent("ARC", version.Version)) + } for i, peerSetting := range arcConfig.Broadcasting.Unicast.Peers { peerURL, err := peerSetting.GetP2PUrl() @@ -167,11 +171,13 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err return nil, fmt.Errorf("error getting peer url: %v", err) } - peer, err := p2p.NewPeer(logger.With(slog.String("module", "peer")), peerURL, peerHandler, network, peerOpts...) - if err != nil { + peer := p2p.NewPeer(logger.With(slog.String("module", "peer")), peerHandler, peerURL, network, peerOpts...) + ok := peer.Connect() + if !ok { stopFn() return nil, fmt.Errorf("error creating peer %s: %v", peerURL, err) } + err = pm.AddPeer(peer) if err != nil { stopFn() @@ -238,7 +244,7 @@ func NewBlocktxStore(logger *slog.Logger, dbConfig *config.DbConfig, tracingConf } func disposeBlockTx(l *slog.Logger, server *blocktx.Server, processor *blocktx.Processor, - pm p2p.PeerManagerI, mqClient blocktx.MessageQueueClient, + pm *p2p.PeerManager, mqClient blocktx.MessageQueueClient, store store.BlocktxStore, healthServer *grpc_opts.GrpcServer, workers *blocktx.BackgroundWorkers, shutdownFns []func(), ) { diff --git a/internal/blocktx/background_workers.go b/internal/blocktx/background_workers.go index 75aa884f7..e5bdf9bcb 100644 --- a/internal/blocktx/background_workers.go +++ b/internal/blocktx/background_workers.go @@ -6,8 +6,9 @@ import ( "sync" "time" + blocktx_p2p "github.com/bitcoin-sv/arc/internal/blocktx/blockchain_communication/p2p" "github.com/bitcoin-sv/arc/internal/blocktx/store" - "github.com/libsv/go-p2p" + "github.com/bitcoin-sv/arc/internal/p2p" ) type BackgroundWorkers struct { @@ -40,7 +41,7 @@ func (w *BackgroundWorkers) GracefulStop() { w.l.Info("Shutdown complete") } -func (w *BackgroundWorkers) StartFillGaps(peers []p2p.PeerI, interval time.Duration, retentionDays int, blockRequestCh chan<- BlockRequest) { +func (w *BackgroundWorkers) StartFillGaps(peers []p2p.PeerI, interval time.Duration, retentionDays int, blockRequestingCh chan<- blocktx_p2p.BlockRequest) { w.workersWg.Add(1) go func() { @@ -53,7 +54,7 @@ func (w *BackgroundWorkers) StartFillGaps(peers []p2p.PeerI, interval time.Durat select { case <-t.C: i = i % len(peers) - err := w.fillGaps(peers[i], retentionDays, blockRequestCh) + err := w.fillGaps(peers[i], retentionDays, blockRequestingCh) if err != nil { w.l.Error("failed to fill blocks gaps", slog.String("err", err.Error())) } @@ -68,7 +69,7 @@ func (w *BackgroundWorkers) StartFillGaps(peers []p2p.PeerI, interval time.Durat }() } -func (w *BackgroundWorkers) fillGaps(peer p2p.PeerI, retentionDays int, blockRequestCh chan<- BlockRequest) error { +func (w *BackgroundWorkers) fillGaps(peer p2p.PeerI, retentionDays int, blockRequestingCh chan<- blocktx_p2p.BlockRequest) error { const ( hoursPerDay = 24 blocksPerHour = 6 @@ -91,7 +92,7 @@ func (w *BackgroundWorkers) fillGaps(peer p2p.PeerI, retentionDays int, blockReq slog.String("peer", peer.String()), ) - blockRequestCh <- BlockRequest{ + blockRequestingCh <- blocktx_p2p.BlockRequest{ Hash: block.Hash, Peer: peer, } diff --git a/internal/blocktx/background_workers_test.go b/internal/blocktx/background_workers_test.go index b06279c4f..fbbf8f57a 100644 --- a/internal/blocktx/background_workers_test.go +++ b/internal/blocktx/background_workers_test.go @@ -9,11 +9,12 @@ import ( "time" "github.com/bitcoin-sv/arc/internal/blocktx" - "github.com/bitcoin-sv/arc/internal/blocktx/mocks" + blocktx_p2p "github.com/bitcoin-sv/arc/internal/blocktx/blockchain_communication/p2p" "github.com/bitcoin-sv/arc/internal/blocktx/store" storeMocks "github.com/bitcoin-sv/arc/internal/blocktx/store/mocks" + "github.com/bitcoin-sv/arc/internal/p2p" + p2pMocks "github.com/bitcoin-sv/arc/internal/p2p/mocks" "github.com/bitcoin-sv/arc/internal/testdata" - "github.com/libsv/go-p2p" "github.com/stretchr/testify/require" ) @@ -64,6 +65,9 @@ func TestStartFillGaps(t *testing.T) { for _, tc := range tt { t.Run(tc.name, func(t *testing.T) { // given + const fillGapsInterval = 50 * time.Millisecond + + blockRequestingCh := make(chan blocktx_p2p.BlockRequest, 10) getBlockErrCh := make(chan error) getBlockGapTestErr := tc.getBlockGapsErr @@ -78,30 +82,21 @@ func TestStartFillGaps(t *testing.T) { }, } - peerMock := &mocks.PeerMock{ - StringFunc: func() string { - return "" - }, - } + peerMock := &p2pMocks.PeerIMock{StringFunc: func() string { return "peer" }} peers := []p2p.PeerI{peerMock} - logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})) - blockRequestCh := make(chan blocktx.BlockRequest, 10) - - sut := blocktx.NewBackgroundWorkers(storeMock, logger) - - interval := 50 * time.Millisecond + sut := blocktx.NewBackgroundWorkers(storeMock, slog.Default()) // when - sut.StartFillGaps(peers, interval, 28, blockRequestCh) + sut.StartFillGaps(peers, fillGapsInterval, 28, blockRequestingCh) // then select { - case hashPeer := <-blockRequestCh: + case hashPeer := <-blockRequestingCh: require.True(t, testdata.Block1Hash.IsEqual(hashPeer.Hash)) case err = <-getBlockErrCh: require.ErrorIs(t, err, tc.getBlockGapsErr) - case <-time.After(time.Duration(3.5 * float64(interval))): + case <-time.After(time.Duration(3.5 * float64(fillGapsInterval))): } sut.GracefulStop() diff --git a/internal/blocktx/blockchain_communication/block_message.go b/internal/blocktx/blockchain_communication/block_message.go new file mode 100644 index 000000000..70902f4b8 --- /dev/null +++ b/internal/blocktx/blockchain_communication/block_message.go @@ -0,0 +1,29 @@ +package blockchaincommunication + +import ( + "io" + + "github.com/libsv/go-p2p/chaincfg/chainhash" + "github.com/libsv/go-p2p/wire" +) + +// BlockMessage only stores the transaction IDs of the block, not the full transactions +type BlockMessage struct { + Header *wire.BlockHeader + Height uint64 + TransactionHashes []*chainhash.Hash + Size uint64 +} + +func (bm *BlockMessage) Bsvdecode(io.Reader, uint32, wire.MessageEncoding) error { + return nil +} +func (bm *BlockMessage) BsvEncode(io.Writer, uint32, wire.MessageEncoding) error { + return nil +} +func (bm *BlockMessage) Command() string { + return "block" +} +func (bm *BlockMessage) MaxPayloadLength(uint32) uint64 { + return wire.MaxExtMsgPayload +} diff --git a/internal/blocktx/peer_handler.go b/internal/blocktx/blockchain_communication/p2p/init.go similarity index 58% rename from internal/blocktx/peer_handler.go rename to internal/blocktx/blockchain_communication/p2p/init.go index 179adeb24..f7741b0fd 100644 --- a/internal/blocktx/peer_handler.go +++ b/internal/blocktx/blockchain_communication/p2p/init.go @@ -1,25 +1,22 @@ -package blocktx +package blocktx_p2p import ( "encoding/binary" - "errors" "io" - "log/slog" + + blockchain "github.com/bitcoin-sv/arc/internal/blocktx/blockchain_communication" "github.com/bitcoin-sv/go-sdk/script" sdkTx "github.com/bitcoin-sv/go-sdk/transaction" "github.com/bitcoin-sv/go-sdk/util" - "github.com/libsv/go-p2p" "github.com/libsv/go-p2p/chaincfg/chainhash" "github.com/libsv/go-p2p/wire" ) -var ErrUnableToCastWireMessage = errors.New("unable to cast wire.Message to p2p.BlockMessage") - func init() { // override the default wire block handler with our own that streams and stores only the transaction ids wire.SetExternalHandler(wire.CmdBlock, func(reader io.Reader, _ uint64, bytesRead int) (int, wire.Message, []byte, error) { - blockMessage := &p2p.BlockMessage{ + blockMessage := &blockchain.BlockMessage{ Header: &wire.BlockHeader{}, } @@ -68,60 +65,6 @@ func init() { }) } -type PeerHandler struct { - logger *slog.Logger - blockRequestCh chan BlockRequest - blockProcessCh chan *p2p.BlockMessage -} - -func NewPeerHandler(logger *slog.Logger, blockRequestCh chan BlockRequest, blockProcessCh chan *p2p.BlockMessage) *PeerHandler { - return &PeerHandler{ - logger: logger.With(slog.String("module", "peer-handler")), - blockRequestCh: blockRequestCh, - blockProcessCh: blockProcessCh, - } -} - -func (ph *PeerHandler) HandleTransactionsGet(_ []*wire.InvVect, _ p2p.PeerI) ([][]byte, error) { - return nil, nil -} - -func (ph *PeerHandler) HandleTransactionSent(_ *wire.MsgTx, _ p2p.PeerI) error { - return nil -} - -func (ph *PeerHandler) HandleTransactionAnnouncement(_ *wire.InvVect, _ p2p.PeerI) error { - return nil -} - -func (ph *PeerHandler) HandleTransactionRejection(_ *wire.MsgReject, _ p2p.PeerI) error { - return nil -} - -func (ph *PeerHandler) HandleTransaction(_ *wire.MsgTx, _ p2p.PeerI) error { - return nil -} - -func (ph *PeerHandler) HandleBlockAnnouncement(msg *wire.InvVect, peer p2p.PeerI) error { - req := BlockRequest{ - Hash: &msg.Hash, - Peer: peer, - } - - ph.blockRequestCh <- req - return nil -} - -func (ph *PeerHandler) HandleBlock(wireMsg wire.Message, _ p2p.PeerI) error { - msg, ok := wireMsg.(*p2p.BlockMessage) - if !ok { - return ErrUnableToCastWireMessage - } - - ph.blockProcessCh <- msg - return nil -} - func extractHeightFromCoinbaseTx(tx *sdkTx.Transaction) uint64 { // Coinbase tx has a special format, the height is encoded in the first 4 bytes of the scriptSig (BIP-34) diff --git a/internal/blocktx/peer_handler_test.go b/internal/blocktx/blockchain_communication/p2p/init_test.go similarity index 98% rename from internal/blocktx/peer_handler_test.go rename to internal/blocktx/blockchain_communication/p2p/init_test.go index 86cb2c2ee..43513dcc0 100644 --- a/internal/blocktx/peer_handler_test.go +++ b/internal/blocktx/blockchain_communication/p2p/init_test.go @@ -1,4 +1,4 @@ -package blocktx +package blocktx_p2p import ( "testing" diff --git a/internal/blocktx/blockchain_communication/p2p/message_handler.go b/internal/blocktx/blockchain_communication/p2p/message_handler.go new file mode 100644 index 000000000..0132fea03 --- /dev/null +++ b/internal/blocktx/blockchain_communication/p2p/message_handler.go @@ -0,0 +1,75 @@ +package blocktx_p2p + +import ( + "errors" + "log/slog" + + blockchain "github.com/bitcoin-sv/arc/internal/blocktx/blockchain_communication" + "github.com/bitcoin-sv/arc/internal/p2p" + "github.com/libsv/go-p2p/chaincfg/chainhash" + "github.com/libsv/go-p2p/wire" +) + +var ErrUnableToCastWireMessage = errors.New("unable to cast wire.Message to blockchain.BlockMessage") + +type BlockRequest struct { + Hash *chainhash.Hash + Peer p2p.PeerI +} + +var _ p2p.MessageHandlerI = (*MsgHandler)(nil) + +type MsgHandler struct { + logger *slog.Logger + blockRequestingCh chan<- BlockRequest + blockProcessingCh chan<- *blockchain.BlockMessage +} + +func NewMsgHandler(logger *slog.Logger, blockRequestCh chan<- BlockRequest, blockProcessCh chan<- *blockchain.BlockMessage) *MsgHandler { + return &MsgHandler{ + logger: logger.With(slog.String("module", "peer-msg-handler")), + blockRequestingCh: blockRequestCh, + blockProcessingCh: blockProcessCh, + } +} + +// should be fire & forget +func (h *MsgHandler) OnReceive(msg wire.Message, peer p2p.PeerI) { + cmd := msg.Command() + + switch cmd { + case wire.CmdInv: + invMsg, ok := msg.(*wire.MsgInv) + if !ok { + return + } + + for _, iv := range invMsg.InvList { + if iv.Type == wire.InvTypeBlock { + req := BlockRequest{ + Hash: &iv.Hash, + Peer: peer, + } + + h.blockRequestingCh <- req + } + // ignore INV with transaction or error + } + + case wire.CmdBlock: + blockMsg, ok := msg.(*blockchain.BlockMessage) + if !ok { + h.logger.Error("Block msg receive", slog.Any("err", ErrUnableToCastWireMessage)) + return + } + + h.blockProcessingCh <- blockMsg + + default: + // ignore other messages + } +} + +func (h *MsgHandler) OnSend(_ wire.Message, _ p2p.PeerI) { + // ignore +} diff --git a/internal/blocktx/blocktx_mocks.go b/internal/blocktx/blocktx_mocks.go index 48b204410..7d72e8709 100644 --- a/internal/blocktx/blocktx_mocks.go +++ b/internal/blocktx/blocktx_mocks.go @@ -8,7 +8,3 @@ package blocktx // from nats_core_client.go //go:generate moq -pkg mocks -out ./mocks/mq_client_mock.go . MessageQueueClient - -// from interface.go -//go:generate moq -pkg mocks -out ./mocks/peer_mock.go . Peer -//go:generate moq -pkg mocks -out ./mocks/peer_manager_mock.go . PeerManager diff --git a/internal/blocktx/health_check.go b/internal/blocktx/health_check.go index 72180cab8..f6c556fe8 100644 --- a/internal/blocktx/health_check.go +++ b/internal/blocktx/health_check.go @@ -31,13 +31,7 @@ func (s *Server) Check(ctx context.Context, req *grpc_health_v1.HealthCheckReque } // verify we have at least 1 node connected to blocktx - healthy := false - for _, peer := range s.pm.GetPeers() { - if peer.IsHealthy() && peer.Connected() { - healthy = true - break - } - } + healthy := s.pm.CountConnectedPeers() > 0 if !healthy { s.logger.Error("healthy peer not found") @@ -66,13 +60,7 @@ func (s *Server) Watch(req *grpc_health_v1.HealthCheckRequest, server grpc_healt } // verify we have at least 1 node connected to blocktx - healthy := false - for _, peer := range s.pm.GetPeers() { - if peer.IsHealthy() && peer.Connected() { - healthy = true - break - } - } + healthy := s.pm.CountConnectedPeers() > 0 if !healthy { s.logger.Error("healthy peer not found") diff --git a/internal/blocktx/health_check_test.go b/internal/blocktx/health_check_test.go index 159bfedb6..03d92a304 100644 --- a/internal/blocktx/health_check_test.go +++ b/internal/blocktx/health_check_test.go @@ -7,8 +7,6 @@ import ( "os" "testing" - "github.com/libsv/go-p2p" - "github.com/stretchr/testify/require" "google.golang.org/grpc/health/grpc_health_v1" @@ -16,6 +14,7 @@ import ( "github.com/bitcoin-sv/arc/internal/blocktx/mocks" "github.com/bitcoin-sv/arc/internal/blocktx/store" storeMocks "github.com/bitcoin-sv/arc/internal/blocktx/store/mocks" + "github.com/bitcoin-sv/arc/internal/p2p" ) func TestCheck(t *testing.T) { @@ -27,23 +26,23 @@ func TestCheck(t *testing.T) { expectedStatus grpc_health_v1.HealthCheckResponse_ServingStatus }{ { - name: "liveness - peer not found", - service: "readiness", + name: "liveness - serving", + service: "liveness", pingErr: nil, - expectedStatus: grpc_health_v1.HealthCheckResponse_NOT_SERVING, + expectedStatus: grpc_health_v1.HealthCheckResponse_SERVING, }, { - name: "db error - not connected", + name: "readiness - peer not found", service: "readiness", - pingErr: errors.New("not connected"), + pingErr: nil, expectedStatus: grpc_health_v1.HealthCheckResponse_NOT_SERVING, }, { - name: "db error - not connected", + name: "readiness - db error - not connected", service: "readiness", - pingErr: nil, + pingErr: errors.New("not connected"), expectedStatus: grpc_health_v1.HealthCheckResponse_NOT_SERVING, }, @@ -66,16 +65,7 @@ func TestCheck(t *testing.T) { } logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})) - pm := &mocks.PeerManagerMock{GetPeersFunc: func() []p2p.PeerI { - return []p2p.PeerI{&mocks.PeerMock{ - IsHealthyFunc: func() bool { - return false - }, - ConnectedFunc: func() bool { - return false - }, - }} - }} + pm := &p2p.PeerManager{} sut, err := blocktx.NewServer("", 0, logger, storeMock, pm, 0, nil) require.NoError(t, err) @@ -107,14 +97,14 @@ func TestWatch(t *testing.T) { expectedStatus: grpc_health_v1.HealthCheckResponse_SERVING, }, { - name: "not ready - healthy", + name: "readiness - peer not found", service: "readiness", pingErr: nil, expectedStatus: grpc_health_v1.HealthCheckResponse_NOT_SERVING, }, { - name: "not ready - healthy", + name: "readiness - db error - not connected", service: "readiness", pingErr: errors.New("not connected"), @@ -140,16 +130,7 @@ func TestWatch(t *testing.T) { logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})) - pm := &mocks.PeerManagerMock{ - GetPeersFunc: func() []p2p.PeerI { - return []p2p.PeerI{ - &mocks.PeerMock{ - IsHealthyFunc: func() bool { return false }, - ConnectedFunc: func() bool { return false }, - }, - } - }, - } + pm := &p2p.PeerManager{} sut, err := blocktx.NewServer("", 0, logger, storeMock, pm, 0, nil) require.NoError(t, err) diff --git a/internal/blocktx/integration_test/reorg_integration_test.go b/internal/blocktx/integration_test/reorg_integration_test.go index b0cb7e51f..073014ed7 100644 --- a/internal/blocktx/integration_test/reorg_integration_test.go +++ b/internal/blocktx/integration_test/reorg_integration_test.go @@ -27,13 +27,14 @@ import ( _ "github.com/golang-migrate/migrate/v4/source/file" _ "github.com/lib/pq" - "github.com/libsv/go-p2p" "github.com/libsv/go-p2p/chaincfg/chainhash" "github.com/libsv/go-p2p/wire" "github.com/ory/dockertest/v3" "github.com/stretchr/testify/require" "github.com/bitcoin-sv/arc/internal/blocktx" + blockchain "github.com/bitcoin-sv/arc/internal/blocktx/blockchain_communication" + blocktx_p2p "github.com/bitcoin-sv/arc/internal/blocktx/blockchain_communication/p2p" "github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api" "github.com/bitcoin-sv/arc/internal/blocktx/store/postgresql" testutils "github.com/bitcoin-sv/arc/internal/test_utils" @@ -96,13 +97,13 @@ func TestBlockStatus(t *testing.T) { logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) - var blockRequestCh chan blocktx.BlockRequest = nil // nolint: revive - blockProcessCh := make(chan *p2p.BlockMessage, 10) + var blockRequestCh chan blocktx_p2p.BlockRequest = nil // nolint: revive + blockProcessCh := make(chan *blockchain.BlockMessage, 10) blocktxStore, err := postgresql.New(dbInfo, 10, 80) require.NoError(t, err) - peerHandler := blocktx.NewPeerHandler(logger, blockRequestCh, blockProcessCh) + p2pMsgHandler := blocktx_p2p.NewMsgHandler(logger, blockRequestCh, blockProcessCh) processor, err := blocktx.NewProcessor(logger, blocktxStore, blockRequestCh, blockProcessCh) require.NoError(t, err) @@ -116,7 +117,7 @@ func TestBlockStatus(t *testing.T) { require.NoError(t, err) // should become LONGEST - blockMessage := &p2p.BlockMessage{ + blockMessage := &blockchain.BlockMessage{ Header: &wire.BlockHeader{ Version: 541065216, PrevBlock: *prevBlockHash, // NON-existent in the db @@ -127,8 +128,8 @@ func TestBlockStatus(t *testing.T) { TransactionHashes: []*chainhash.Hash{txHash}, } - err = peerHandler.HandleBlock(blockMessage, nil) - require.NoError(t, err) + p2pMsgHandler.OnReceive(blockMessage, nil) + // Allow DB to process the block time.Sleep(200 * time.Millisecond) @@ -149,7 +150,7 @@ func TestBlockStatus(t *testing.T) { require.NoError(t, err) // should become STALE - blockMessage = &p2p.BlockMessage{ + blockMessage = &blockchain.BlockMessage{ Header: &wire.BlockHeader{ Version: 541065216, PrevBlock: *prevBlockHash, // block with status LONGEST at height 822014 @@ -160,7 +161,7 @@ func TestBlockStatus(t *testing.T) { TransactionHashes: []*chainhash.Hash{txHash}, } - err = peerHandler.HandleBlock(blockMessage, nil) + p2pMsgHandler.OnReceive(blockMessage, nil) require.NoError(t, err) // Allow DB to process the block time.Sleep(200 * time.Millisecond) @@ -174,7 +175,7 @@ func TestBlockStatus(t *testing.T) { // should become LONGEST // reorg should happen - blockMessage = &p2p.BlockMessage{ + blockMessage = &blockchain.BlockMessage{ Header: &wire.BlockHeader{ Version: 541065216, PrevBlock: blockHashStale, // block with status STALE at height 822015 @@ -185,7 +186,7 @@ func TestBlockStatus(t *testing.T) { TransactionHashes: []*chainhash.Hash{txHash}, } - err = peerHandler.HandleBlock(blockMessage, nil) + p2pMsgHandler.OnReceive(blockMessage, nil) require.NoError(t, err) // Allow DB to process the block and perform reorg time.Sleep(1 * time.Second) diff --git a/internal/blocktx/interface.go b/internal/blocktx/interface.go deleted file mode 100644 index 9fa6d5264..000000000 --- a/internal/blocktx/interface.go +++ /dev/null @@ -1,41 +0,0 @@ -package blocktx - -import ( - "errors" - - "github.com/libsv/go-p2p" - "github.com/libsv/go-p2p/chaincfg/chainhash" - "github.com/libsv/go-p2p/wire" -) - -var ErrMerklePathNotFoundForTransaction = errors.New("merkle path not found for transaction") - -type BlockRequest struct { - Hash *chainhash.Hash - Peer p2p.PeerI -} - -type Peer interface { - Connected() bool - WriteMsg(msg wire.Message) error - String() string - AnnounceTransaction(txHash *chainhash.Hash) - RequestTransaction(txHash *chainhash.Hash) - AnnounceBlock(blockHash *chainhash.Hash) - RequestBlock(blockHash *chainhash.Hash) - Network() wire.BitcoinNet - IsHealthy() bool - IsUnhealthyCh() <-chan struct{} - Shutdown() - Restart() -} - -type PeerManager interface { - AnnounceTransaction(txHash *chainhash.Hash, peers []p2p.PeerI) []p2p.PeerI - RequestTransaction(txHash *chainhash.Hash) p2p.PeerI - AnnounceBlock(blockHash *chainhash.Hash, peers []p2p.PeerI) []p2p.PeerI - RequestBlock(blockHash *chainhash.Hash) p2p.PeerI - AddPeer(peer p2p.PeerI) error - GetPeers() []p2p.PeerI - Shutdown() -} diff --git a/internal/blocktx/mocks/peer_manager_mock.go b/internal/blocktx/mocks/peer_manager_mock.go deleted file mode 100644 index 646f80919..000000000 --- a/internal/blocktx/mocks/peer_manager_mock.go +++ /dev/null @@ -1,339 +0,0 @@ -// Code generated by moq; DO NOT EDIT. -// github.com/matryer/moq - -package mocks - -import ( - "github.com/bitcoin-sv/arc/internal/blocktx" - "github.com/libsv/go-p2p" - "github.com/libsv/go-p2p/chaincfg/chainhash" - "sync" -) - -// Ensure, that PeerManagerMock does implement blocktx.PeerManager. -// If this is not the case, regenerate this file with moq. -var _ blocktx.PeerManager = &PeerManagerMock{} - -// PeerManagerMock is a mock implementation of blocktx.PeerManager. -// -// func TestSomethingThatUsesPeerManager(t *testing.T) { -// -// // make and configure a mocked blocktx.PeerManager -// mockedPeerManager := &PeerManagerMock{ -// AddPeerFunc: func(peer p2p.PeerI) error { -// panic("mock out the AddPeer method") -// }, -// AnnounceBlockFunc: func(blockHash *chainhash.Hash, peers []p2p.PeerI) []p2p.PeerI { -// panic("mock out the AnnounceBlock method") -// }, -// AnnounceTransactionFunc: func(txHash *chainhash.Hash, peers []p2p.PeerI) []p2p.PeerI { -// panic("mock out the AnnounceTransaction method") -// }, -// GetPeersFunc: func() []p2p.PeerI { -// panic("mock out the GetPeers method") -// }, -// RequestBlockFunc: func(blockHash *chainhash.Hash) p2p.PeerI { -// panic("mock out the RequestBlock method") -// }, -// RequestTransactionFunc: func(txHash *chainhash.Hash) p2p.PeerI { -// panic("mock out the RequestTransaction method") -// }, -// ShutdownFunc: func() { -// panic("mock out the Shutdown method") -// }, -// } -// -// // use mockedPeerManager in code that requires blocktx.PeerManager -// // and then make assertions. -// -// } -type PeerManagerMock struct { - // AddPeerFunc mocks the AddPeer method. - AddPeerFunc func(peer p2p.PeerI) error - - // AnnounceBlockFunc mocks the AnnounceBlock method. - AnnounceBlockFunc func(blockHash *chainhash.Hash, peers []p2p.PeerI) []p2p.PeerI - - // AnnounceTransactionFunc mocks the AnnounceTransaction method. - AnnounceTransactionFunc func(txHash *chainhash.Hash, peers []p2p.PeerI) []p2p.PeerI - - // GetPeersFunc mocks the GetPeers method. - GetPeersFunc func() []p2p.PeerI - - // RequestBlockFunc mocks the RequestBlock method. - RequestBlockFunc func(blockHash *chainhash.Hash) p2p.PeerI - - // RequestTransactionFunc mocks the RequestTransaction method. - RequestTransactionFunc func(txHash *chainhash.Hash) p2p.PeerI - - // ShutdownFunc mocks the Shutdown method. - ShutdownFunc func() - - // calls tracks calls to the methods. - calls struct { - // AddPeer holds details about calls to the AddPeer method. - AddPeer []struct { - // Peer is the peer argument value. - Peer p2p.PeerI - } - // AnnounceBlock holds details about calls to the AnnounceBlock method. - AnnounceBlock []struct { - // BlockHash is the blockHash argument value. - BlockHash *chainhash.Hash - // Peers is the peers argument value. - Peers []p2p.PeerI - } - // AnnounceTransaction holds details about calls to the AnnounceTransaction method. - AnnounceTransaction []struct { - // TxHash is the txHash argument value. - TxHash *chainhash.Hash - // Peers is the peers argument value. - Peers []p2p.PeerI - } - // GetPeers holds details about calls to the GetPeers method. - GetPeers []struct { - } - // RequestBlock holds details about calls to the RequestBlock method. - RequestBlock []struct { - // BlockHash is the blockHash argument value. - BlockHash *chainhash.Hash - } - // RequestTransaction holds details about calls to the RequestTransaction method. - RequestTransaction []struct { - // TxHash is the txHash argument value. - TxHash *chainhash.Hash - } - // Shutdown holds details about calls to the Shutdown method. - Shutdown []struct { - } - } - lockAddPeer sync.RWMutex - lockAnnounceBlock sync.RWMutex - lockAnnounceTransaction sync.RWMutex - lockGetPeers sync.RWMutex - lockRequestBlock sync.RWMutex - lockRequestTransaction sync.RWMutex - lockShutdown sync.RWMutex -} - -// AddPeer calls AddPeerFunc. -func (mock *PeerManagerMock) AddPeer(peer p2p.PeerI) error { - if mock.AddPeerFunc == nil { - panic("PeerManagerMock.AddPeerFunc: method is nil but PeerManager.AddPeer was just called") - } - callInfo := struct { - Peer p2p.PeerI - }{ - Peer: peer, - } - mock.lockAddPeer.Lock() - mock.calls.AddPeer = append(mock.calls.AddPeer, callInfo) - mock.lockAddPeer.Unlock() - return mock.AddPeerFunc(peer) -} - -// AddPeerCalls gets all the calls that were made to AddPeer. -// Check the length with: -// -// len(mockedPeerManager.AddPeerCalls()) -func (mock *PeerManagerMock) AddPeerCalls() []struct { - Peer p2p.PeerI -} { - var calls []struct { - Peer p2p.PeerI - } - mock.lockAddPeer.RLock() - calls = mock.calls.AddPeer - mock.lockAddPeer.RUnlock() - return calls -} - -// AnnounceBlock calls AnnounceBlockFunc. -func (mock *PeerManagerMock) AnnounceBlock(blockHash *chainhash.Hash, peers []p2p.PeerI) []p2p.PeerI { - if mock.AnnounceBlockFunc == nil { - panic("PeerManagerMock.AnnounceBlockFunc: method is nil but PeerManager.AnnounceBlock was just called") - } - callInfo := struct { - BlockHash *chainhash.Hash - Peers []p2p.PeerI - }{ - BlockHash: blockHash, - Peers: peers, - } - mock.lockAnnounceBlock.Lock() - mock.calls.AnnounceBlock = append(mock.calls.AnnounceBlock, callInfo) - mock.lockAnnounceBlock.Unlock() - return mock.AnnounceBlockFunc(blockHash, peers) -} - -// AnnounceBlockCalls gets all the calls that were made to AnnounceBlock. -// Check the length with: -// -// len(mockedPeerManager.AnnounceBlockCalls()) -func (mock *PeerManagerMock) AnnounceBlockCalls() []struct { - BlockHash *chainhash.Hash - Peers []p2p.PeerI -} { - var calls []struct { - BlockHash *chainhash.Hash - Peers []p2p.PeerI - } - mock.lockAnnounceBlock.RLock() - calls = mock.calls.AnnounceBlock - mock.lockAnnounceBlock.RUnlock() - return calls -} - -// AnnounceTransaction calls AnnounceTransactionFunc. -func (mock *PeerManagerMock) AnnounceTransaction(txHash *chainhash.Hash, peers []p2p.PeerI) []p2p.PeerI { - if mock.AnnounceTransactionFunc == nil { - panic("PeerManagerMock.AnnounceTransactionFunc: method is nil but PeerManager.AnnounceTransaction was just called") - } - callInfo := struct { - TxHash *chainhash.Hash - Peers []p2p.PeerI - }{ - TxHash: txHash, - Peers: peers, - } - mock.lockAnnounceTransaction.Lock() - mock.calls.AnnounceTransaction = append(mock.calls.AnnounceTransaction, callInfo) - mock.lockAnnounceTransaction.Unlock() - return mock.AnnounceTransactionFunc(txHash, peers) -} - -// AnnounceTransactionCalls gets all the calls that were made to AnnounceTransaction. -// Check the length with: -// -// len(mockedPeerManager.AnnounceTransactionCalls()) -func (mock *PeerManagerMock) AnnounceTransactionCalls() []struct { - TxHash *chainhash.Hash - Peers []p2p.PeerI -} { - var calls []struct { - TxHash *chainhash.Hash - Peers []p2p.PeerI - } - mock.lockAnnounceTransaction.RLock() - calls = mock.calls.AnnounceTransaction - mock.lockAnnounceTransaction.RUnlock() - return calls -} - -// GetPeers calls GetPeersFunc. -func (mock *PeerManagerMock) GetPeers() []p2p.PeerI { - if mock.GetPeersFunc == nil { - panic("PeerManagerMock.GetPeersFunc: method is nil but PeerManager.GetPeers was just called") - } - callInfo := struct { - }{} - mock.lockGetPeers.Lock() - mock.calls.GetPeers = append(mock.calls.GetPeers, callInfo) - mock.lockGetPeers.Unlock() - return mock.GetPeersFunc() -} - -// GetPeersCalls gets all the calls that were made to GetPeers. -// Check the length with: -// -// len(mockedPeerManager.GetPeersCalls()) -func (mock *PeerManagerMock) GetPeersCalls() []struct { -} { - var calls []struct { - } - mock.lockGetPeers.RLock() - calls = mock.calls.GetPeers - mock.lockGetPeers.RUnlock() - return calls -} - -// RequestBlock calls RequestBlockFunc. -func (mock *PeerManagerMock) RequestBlock(blockHash *chainhash.Hash) p2p.PeerI { - if mock.RequestBlockFunc == nil { - panic("PeerManagerMock.RequestBlockFunc: method is nil but PeerManager.RequestBlock was just called") - } - callInfo := struct { - BlockHash *chainhash.Hash - }{ - BlockHash: blockHash, - } - mock.lockRequestBlock.Lock() - mock.calls.RequestBlock = append(mock.calls.RequestBlock, callInfo) - mock.lockRequestBlock.Unlock() - return mock.RequestBlockFunc(blockHash) -} - -// RequestBlockCalls gets all the calls that were made to RequestBlock. -// Check the length with: -// -// len(mockedPeerManager.RequestBlockCalls()) -func (mock *PeerManagerMock) RequestBlockCalls() []struct { - BlockHash *chainhash.Hash -} { - var calls []struct { - BlockHash *chainhash.Hash - } - mock.lockRequestBlock.RLock() - calls = mock.calls.RequestBlock - mock.lockRequestBlock.RUnlock() - return calls -} - -// RequestTransaction calls RequestTransactionFunc. -func (mock *PeerManagerMock) RequestTransaction(txHash *chainhash.Hash) p2p.PeerI { - if mock.RequestTransactionFunc == nil { - panic("PeerManagerMock.RequestTransactionFunc: method is nil but PeerManager.RequestTransaction was just called") - } - callInfo := struct { - TxHash *chainhash.Hash - }{ - TxHash: txHash, - } - mock.lockRequestTransaction.Lock() - mock.calls.RequestTransaction = append(mock.calls.RequestTransaction, callInfo) - mock.lockRequestTransaction.Unlock() - return mock.RequestTransactionFunc(txHash) -} - -// RequestTransactionCalls gets all the calls that were made to RequestTransaction. -// Check the length with: -// -// len(mockedPeerManager.RequestTransactionCalls()) -func (mock *PeerManagerMock) RequestTransactionCalls() []struct { - TxHash *chainhash.Hash -} { - var calls []struct { - TxHash *chainhash.Hash - } - mock.lockRequestTransaction.RLock() - calls = mock.calls.RequestTransaction - mock.lockRequestTransaction.RUnlock() - return calls -} - -// Shutdown calls ShutdownFunc. -func (mock *PeerManagerMock) Shutdown() { - if mock.ShutdownFunc == nil { - panic("PeerManagerMock.ShutdownFunc: method is nil but PeerManager.Shutdown was just called") - } - callInfo := struct { - }{} - mock.lockShutdown.Lock() - mock.calls.Shutdown = append(mock.calls.Shutdown, callInfo) - mock.lockShutdown.Unlock() - mock.ShutdownFunc() -} - -// ShutdownCalls gets all the calls that were made to Shutdown. -// Check the length with: -// -// len(mockedPeerManager.ShutdownCalls()) -func (mock *PeerManagerMock) ShutdownCalls() []struct { -} { - var calls []struct { - } - mock.lockShutdown.RLock() - calls = mock.calls.Shutdown - mock.lockShutdown.RUnlock() - return calls -} diff --git a/internal/blocktx/mocks/peer_mock.go b/internal/blocktx/mocks/peer_mock.go deleted file mode 100644 index 6b3f52020..000000000 --- a/internal/blocktx/mocks/peer_mock.go +++ /dev/null @@ -1,512 +0,0 @@ -// Code generated by moq; DO NOT EDIT. -// github.com/matryer/moq - -package mocks - -import ( - "github.com/bitcoin-sv/arc/internal/blocktx" - "github.com/libsv/go-p2p/chaincfg/chainhash" - "github.com/libsv/go-p2p/wire" - "sync" -) - -// Ensure, that PeerMock does implement blocktx.Peer. -// If this is not the case, regenerate this file with moq. -var _ blocktx.Peer = &PeerMock{} - -// PeerMock is a mock implementation of blocktx.Peer. -// -// func TestSomethingThatUsesPeer(t *testing.T) { -// -// // make and configure a mocked blocktx.Peer -// mockedPeer := &PeerMock{ -// AnnounceBlockFunc: func(blockHash *chainhash.Hash) { -// panic("mock out the AnnounceBlock method") -// }, -// AnnounceTransactionFunc: func(txHash *chainhash.Hash) { -// panic("mock out the AnnounceTransaction method") -// }, -// ConnectedFunc: func() bool { -// panic("mock out the Connected method") -// }, -// IsHealthyFunc: func() bool { -// panic("mock out the IsHealthy method") -// }, -// IsUnhealthyChFunc: func() <-chan struct{} { -// panic("mock out the IsUnhealthyCh method") -// }, -// NetworkFunc: func() wire.BitcoinNet { -// panic("mock out the Network method") -// }, -// RequestBlockFunc: func(blockHash *chainhash.Hash) { -// panic("mock out the RequestBlock method") -// }, -// RequestTransactionFunc: func(txHash *chainhash.Hash) { -// panic("mock out the RequestTransaction method") -// }, -// RestartFunc: func() { -// panic("mock out the Restart method") -// }, -// ShutdownFunc: func() { -// panic("mock out the Shutdown method") -// }, -// StringFunc: func() string { -// panic("mock out the String method") -// }, -// WriteMsgFunc: func(msg wire.Message) error { -// panic("mock out the WriteMsg method") -// }, -// } -// -// // use mockedPeer in code that requires blocktx.Peer -// // and then make assertions. -// -// } -type PeerMock struct { - // AnnounceBlockFunc mocks the AnnounceBlock method. - AnnounceBlockFunc func(blockHash *chainhash.Hash) - - // AnnounceTransactionFunc mocks the AnnounceTransaction method. - AnnounceTransactionFunc func(txHash *chainhash.Hash) - - // ConnectedFunc mocks the Connected method. - ConnectedFunc func() bool - - // IsHealthyFunc mocks the IsHealthy method. - IsHealthyFunc func() bool - - // IsUnhealthyChFunc mocks the IsUnhealthyCh method. - IsUnhealthyChFunc func() <-chan struct{} - - // NetworkFunc mocks the Network method. - NetworkFunc func() wire.BitcoinNet - - // RequestBlockFunc mocks the RequestBlock method. - RequestBlockFunc func(blockHash *chainhash.Hash) - - // RequestTransactionFunc mocks the RequestTransaction method. - RequestTransactionFunc func(txHash *chainhash.Hash) - - // RestartFunc mocks the Restart method. - RestartFunc func() - - // ShutdownFunc mocks the Shutdown method. - ShutdownFunc func() - - // StringFunc mocks the String method. - StringFunc func() string - - // WriteMsgFunc mocks the WriteMsg method. - WriteMsgFunc func(msg wire.Message) error - - // calls tracks calls to the methods. - calls struct { - // AnnounceBlock holds details about calls to the AnnounceBlock method. - AnnounceBlock []struct { - // BlockHash is the blockHash argument value. - BlockHash *chainhash.Hash - } - // AnnounceTransaction holds details about calls to the AnnounceTransaction method. - AnnounceTransaction []struct { - // TxHash is the txHash argument value. - TxHash *chainhash.Hash - } - // Connected holds details about calls to the Connected method. - Connected []struct { - } - // IsHealthy holds details about calls to the IsHealthy method. - IsHealthy []struct { - } - // IsUnhealthyCh holds details about calls to the IsUnhealthyCh method. - IsUnhealthyCh []struct { - } - // Network holds details about calls to the Network method. - Network []struct { - } - // RequestBlock holds details about calls to the RequestBlock method. - RequestBlock []struct { - // BlockHash is the blockHash argument value. - BlockHash *chainhash.Hash - } - // RequestTransaction holds details about calls to the RequestTransaction method. - RequestTransaction []struct { - // TxHash is the txHash argument value. - TxHash *chainhash.Hash - } - // Restart holds details about calls to the Restart method. - Restart []struct { - } - // Shutdown holds details about calls to the Shutdown method. - Shutdown []struct { - } - // String holds details about calls to the String method. - String []struct { - } - // WriteMsg holds details about calls to the WriteMsg method. - WriteMsg []struct { - // Msg is the msg argument value. - Msg wire.Message - } - } - lockAnnounceBlock sync.RWMutex - lockAnnounceTransaction sync.RWMutex - lockConnected sync.RWMutex - lockIsHealthy sync.RWMutex - lockIsUnhealthyCh sync.RWMutex - lockNetwork sync.RWMutex - lockRequestBlock sync.RWMutex - lockRequestTransaction sync.RWMutex - lockRestart sync.RWMutex - lockShutdown sync.RWMutex - lockString sync.RWMutex - lockWriteMsg sync.RWMutex -} - -// AnnounceBlock calls AnnounceBlockFunc. -func (mock *PeerMock) AnnounceBlock(blockHash *chainhash.Hash) { - if mock.AnnounceBlockFunc == nil { - panic("PeerMock.AnnounceBlockFunc: method is nil but Peer.AnnounceBlock was just called") - } - callInfo := struct { - BlockHash *chainhash.Hash - }{ - BlockHash: blockHash, - } - mock.lockAnnounceBlock.Lock() - mock.calls.AnnounceBlock = append(mock.calls.AnnounceBlock, callInfo) - mock.lockAnnounceBlock.Unlock() - mock.AnnounceBlockFunc(blockHash) -} - -// AnnounceBlockCalls gets all the calls that were made to AnnounceBlock. -// Check the length with: -// -// len(mockedPeer.AnnounceBlockCalls()) -func (mock *PeerMock) AnnounceBlockCalls() []struct { - BlockHash *chainhash.Hash -} { - var calls []struct { - BlockHash *chainhash.Hash - } - mock.lockAnnounceBlock.RLock() - calls = mock.calls.AnnounceBlock - mock.lockAnnounceBlock.RUnlock() - return calls -} - -// AnnounceTransaction calls AnnounceTransactionFunc. -func (mock *PeerMock) AnnounceTransaction(txHash *chainhash.Hash) { - if mock.AnnounceTransactionFunc == nil { - panic("PeerMock.AnnounceTransactionFunc: method is nil but Peer.AnnounceTransaction was just called") - } - callInfo := struct { - TxHash *chainhash.Hash - }{ - TxHash: txHash, - } - mock.lockAnnounceTransaction.Lock() - mock.calls.AnnounceTransaction = append(mock.calls.AnnounceTransaction, callInfo) - mock.lockAnnounceTransaction.Unlock() - mock.AnnounceTransactionFunc(txHash) -} - -// AnnounceTransactionCalls gets all the calls that were made to AnnounceTransaction. -// Check the length with: -// -// len(mockedPeer.AnnounceTransactionCalls()) -func (mock *PeerMock) AnnounceTransactionCalls() []struct { - TxHash *chainhash.Hash -} { - var calls []struct { - TxHash *chainhash.Hash - } - mock.lockAnnounceTransaction.RLock() - calls = mock.calls.AnnounceTransaction - mock.lockAnnounceTransaction.RUnlock() - return calls -} - -// Connected calls ConnectedFunc. -func (mock *PeerMock) Connected() bool { - if mock.ConnectedFunc == nil { - panic("PeerMock.ConnectedFunc: method is nil but Peer.Connected was just called") - } - callInfo := struct { - }{} - mock.lockConnected.Lock() - mock.calls.Connected = append(mock.calls.Connected, callInfo) - mock.lockConnected.Unlock() - return mock.ConnectedFunc() -} - -// ConnectedCalls gets all the calls that were made to Connected. -// Check the length with: -// -// len(mockedPeer.ConnectedCalls()) -func (mock *PeerMock) ConnectedCalls() []struct { -} { - var calls []struct { - } - mock.lockConnected.RLock() - calls = mock.calls.Connected - mock.lockConnected.RUnlock() - return calls -} - -// IsHealthy calls IsHealthyFunc. -func (mock *PeerMock) IsHealthy() bool { - if mock.IsHealthyFunc == nil { - panic("PeerMock.IsHealthyFunc: method is nil but Peer.IsHealthy was just called") - } - callInfo := struct { - }{} - mock.lockIsHealthy.Lock() - mock.calls.IsHealthy = append(mock.calls.IsHealthy, callInfo) - mock.lockIsHealthy.Unlock() - return mock.IsHealthyFunc() -} - -// IsHealthyCalls gets all the calls that were made to IsHealthy. -// Check the length with: -// -// len(mockedPeer.IsHealthyCalls()) -func (mock *PeerMock) IsHealthyCalls() []struct { -} { - var calls []struct { - } - mock.lockIsHealthy.RLock() - calls = mock.calls.IsHealthy - mock.lockIsHealthy.RUnlock() - return calls -} - -// IsUnhealthyCh calls IsUnhealthyChFunc. -func (mock *PeerMock) IsUnhealthyCh() <-chan struct{} { - if mock.IsUnhealthyChFunc == nil { - panic("PeerMock.IsUnhealthyChFunc: method is nil but Peer.IsUnhealthyCh was just called") - } - callInfo := struct { - }{} - mock.lockIsUnhealthyCh.Lock() - mock.calls.IsUnhealthyCh = append(mock.calls.IsUnhealthyCh, callInfo) - mock.lockIsUnhealthyCh.Unlock() - return mock.IsUnhealthyChFunc() -} - -// IsUnhealthyChCalls gets all the calls that were made to IsUnhealthyCh. -// Check the length with: -// -// len(mockedPeer.IsUnhealthyChCalls()) -func (mock *PeerMock) IsUnhealthyChCalls() []struct { -} { - var calls []struct { - } - mock.lockIsUnhealthyCh.RLock() - calls = mock.calls.IsUnhealthyCh - mock.lockIsUnhealthyCh.RUnlock() - return calls -} - -// Network calls NetworkFunc. -func (mock *PeerMock) Network() wire.BitcoinNet { - if mock.NetworkFunc == nil { - panic("PeerMock.NetworkFunc: method is nil but Peer.Network was just called") - } - callInfo := struct { - }{} - mock.lockNetwork.Lock() - mock.calls.Network = append(mock.calls.Network, callInfo) - mock.lockNetwork.Unlock() - return mock.NetworkFunc() -} - -// NetworkCalls gets all the calls that were made to Network. -// Check the length with: -// -// len(mockedPeer.NetworkCalls()) -func (mock *PeerMock) NetworkCalls() []struct { -} { - var calls []struct { - } - mock.lockNetwork.RLock() - calls = mock.calls.Network - mock.lockNetwork.RUnlock() - return calls -} - -// RequestBlock calls RequestBlockFunc. -func (mock *PeerMock) RequestBlock(blockHash *chainhash.Hash) { - if mock.RequestBlockFunc == nil { - panic("PeerMock.RequestBlockFunc: method is nil but Peer.RequestBlock was just called") - } - callInfo := struct { - BlockHash *chainhash.Hash - }{ - BlockHash: blockHash, - } - mock.lockRequestBlock.Lock() - mock.calls.RequestBlock = append(mock.calls.RequestBlock, callInfo) - mock.lockRequestBlock.Unlock() - mock.RequestBlockFunc(blockHash) -} - -// RequestBlockCalls gets all the calls that were made to RequestBlock. -// Check the length with: -// -// len(mockedPeer.RequestBlockCalls()) -func (mock *PeerMock) RequestBlockCalls() []struct { - BlockHash *chainhash.Hash -} { - var calls []struct { - BlockHash *chainhash.Hash - } - mock.lockRequestBlock.RLock() - calls = mock.calls.RequestBlock - mock.lockRequestBlock.RUnlock() - return calls -} - -// RequestTransaction calls RequestTransactionFunc. -func (mock *PeerMock) RequestTransaction(txHash *chainhash.Hash) { - if mock.RequestTransactionFunc == nil { - panic("PeerMock.RequestTransactionFunc: method is nil but Peer.RequestTransaction was just called") - } - callInfo := struct { - TxHash *chainhash.Hash - }{ - TxHash: txHash, - } - mock.lockRequestTransaction.Lock() - mock.calls.RequestTransaction = append(mock.calls.RequestTransaction, callInfo) - mock.lockRequestTransaction.Unlock() - mock.RequestTransactionFunc(txHash) -} - -// RequestTransactionCalls gets all the calls that were made to RequestTransaction. -// Check the length with: -// -// len(mockedPeer.RequestTransactionCalls()) -func (mock *PeerMock) RequestTransactionCalls() []struct { - TxHash *chainhash.Hash -} { - var calls []struct { - TxHash *chainhash.Hash - } - mock.lockRequestTransaction.RLock() - calls = mock.calls.RequestTransaction - mock.lockRequestTransaction.RUnlock() - return calls -} - -// Restart calls RestartFunc. -func (mock *PeerMock) Restart() { - if mock.RestartFunc == nil { - panic("PeerMock.RestartFunc: method is nil but Peer.Restart was just called") - } - callInfo := struct { - }{} - mock.lockRestart.Lock() - mock.calls.Restart = append(mock.calls.Restart, callInfo) - mock.lockRestart.Unlock() - mock.RestartFunc() -} - -// RestartCalls gets all the calls that were made to Restart. -// Check the length with: -// -// len(mockedPeer.RestartCalls()) -func (mock *PeerMock) RestartCalls() []struct { -} { - var calls []struct { - } - mock.lockRestart.RLock() - calls = mock.calls.Restart - mock.lockRestart.RUnlock() - return calls -} - -// Shutdown calls ShutdownFunc. -func (mock *PeerMock) Shutdown() { - if mock.ShutdownFunc == nil { - panic("PeerMock.ShutdownFunc: method is nil but Peer.Shutdown was just called") - } - callInfo := struct { - }{} - mock.lockShutdown.Lock() - mock.calls.Shutdown = append(mock.calls.Shutdown, callInfo) - mock.lockShutdown.Unlock() - mock.ShutdownFunc() -} - -// ShutdownCalls gets all the calls that were made to Shutdown. -// Check the length with: -// -// len(mockedPeer.ShutdownCalls()) -func (mock *PeerMock) ShutdownCalls() []struct { -} { - var calls []struct { - } - mock.lockShutdown.RLock() - calls = mock.calls.Shutdown - mock.lockShutdown.RUnlock() - return calls -} - -// String calls StringFunc. -func (mock *PeerMock) String() string { - if mock.StringFunc == nil { - panic("PeerMock.StringFunc: method is nil but Peer.String was just called") - } - callInfo := struct { - }{} - mock.lockString.Lock() - mock.calls.String = append(mock.calls.String, callInfo) - mock.lockString.Unlock() - return mock.StringFunc() -} - -// StringCalls gets all the calls that were made to String. -// Check the length with: -// -// len(mockedPeer.StringCalls()) -func (mock *PeerMock) StringCalls() []struct { -} { - var calls []struct { - } - mock.lockString.RLock() - calls = mock.calls.String - mock.lockString.RUnlock() - return calls -} - -// WriteMsg calls WriteMsgFunc. -func (mock *PeerMock) WriteMsg(msg wire.Message) error { - if mock.WriteMsgFunc == nil { - panic("PeerMock.WriteMsgFunc: method is nil but Peer.WriteMsg was just called") - } - callInfo := struct { - Msg wire.Message - }{ - Msg: msg, - } - mock.lockWriteMsg.Lock() - mock.calls.WriteMsg = append(mock.calls.WriteMsg, callInfo) - mock.lockWriteMsg.Unlock() - return mock.WriteMsgFunc(msg) -} - -// WriteMsgCalls gets all the calls that were made to WriteMsg. -// Check the length with: -// -// len(mockedPeer.WriteMsgCalls()) -func (mock *PeerMock) WriteMsgCalls() []struct { - Msg wire.Message -} { - var calls []struct { - Msg wire.Message - } - mock.lockWriteMsg.RLock() - calls = mock.calls.WriteMsg - mock.lockWriteMsg.RUnlock() - return calls -} diff --git a/internal/blocktx/processor.go b/internal/blocktx/processor.go index 0e0e1e983..217a825dd 100644 --- a/internal/blocktx/processor.go +++ b/internal/blocktx/processor.go @@ -13,12 +13,13 @@ import ( "github.com/cenkalti/backoff/v4" "github.com/libsv/go-bc" - "github.com/libsv/go-p2p" "github.com/libsv/go-p2p/chaincfg/chainhash" "github.com/libsv/go-p2p/wire" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" + blockchain "github.com/bitcoin-sv/arc/internal/blocktx/blockchain_communication" + blocktx_p2p "github.com/bitcoin-sv/arc/internal/blocktx/blockchain_communication/p2p" "github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api" "github.com/bitcoin-sv/arc/internal/blocktx/store" "github.com/bitcoin-sv/arc/internal/tracing" @@ -44,8 +45,8 @@ const ( type Processor struct { hostname string - blockRequestCh chan BlockRequest - blockProcessCh chan *p2p.BlockMessage + blockRequestCh chan blocktx_p2p.BlockRequest + blockProcessCh chan *blockchain.BlockMessage store store.BlocktxStore logger *slog.Logger transactionStorageBatchSize int @@ -73,8 +74,8 @@ type Processor struct { func NewProcessor( logger *slog.Logger, storeI store.BlocktxStore, - blockRequestCh chan BlockRequest, - blockProcessCh chan *p2p.BlockMessage, + blockRequestCh chan blocktx_p2p.BlockRequest, + blockProcessCh chan *blockchain.BlockMessage, opts ...func(*Processor), ) (*Processor, error) { hostname, err := os.Hostname() @@ -140,6 +141,7 @@ func (p *Processor) StartBlockRequesting() { waitUntilFree := func(ctx context.Context) bool { t := time.NewTicker(time.Second) + defer t.Stop() for { bhs, err := p.store.GetBlockHashesProcessingInProgress(p.ctx, p.hostname) @@ -187,16 +189,10 @@ func (p *Processor) StartBlockRequesting() { continue } - msg := wire.NewMsgGetData() - _ = msg.AddInvVect(wire.NewInvVect(wire.InvTypeBlock, hash)) // ignore error at this point - p.logger.Info("Sending block request", slog.String("hash", hash.String())) - if err = peer.WriteMsg(msg); err != nil { - p.logger.Error("failed to write block request message to peer", slog.String("hash", hash.String()), slog.String("err", err.Error())) - p.unlockBlock(p.ctx, hash) - - continue - } + msg := wire.NewMsgGetDataSizeHint(1) + _ = msg.AddInvVect(wire.NewInvVect(wire.InvTypeBlock, hash)) // ignore error at this point + peer.WriteMsg(msg) p.startBlockProcessGuard(p.ctx, hash) p.logger.Info("Block request message sent to peer", slog.String("hash", hash.String()), slog.String("peer", peer.String())) @@ -425,7 +421,7 @@ func (p *Processor) buildMerkleTreeStoreChainHash(ctx context.Context, txids []* return bc.BuildMerkleTreeStoreChainHash(txids) } -func (p *Processor) processBlock(msg *p2p.BlockMessage) (err error) { +func (p *Processor) processBlock(msg *blockchain.BlockMessage) (err error) { ctx := p.ctx var blockHash chainhash.Hash ctx, span := tracing.StartTracing(ctx, "processBlock", p.tracingEnabled, p.tracingAttributes...) @@ -743,8 +739,3 @@ func (p *Processor) Shutdown() { } } } - -// GetBlockRequestCh is for testing purposes only -func (p *Processor) GetBlockRequestCh() chan BlockRequest { - return p.blockRequestCh -} diff --git a/internal/blocktx/processor_helpers.go b/internal/blocktx/processor_helpers.go index 09cc69cc7..078972a04 100644 --- a/internal/blocktx/processor_helpers.go +++ b/internal/blocktx/processor_helpers.go @@ -4,11 +4,11 @@ import ( "math" "math/big" + blockchain "github.com/bitcoin-sv/arc/internal/blocktx/blockchain_communication" "github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api" - "github.com/libsv/go-p2p" ) -func createBlock(msg *p2p.BlockMessage, prevBlock *blocktx_api.Block, longestTipExists bool) *blocktx_api.Block { +func createBlock(msg *blockchain.BlockMessage, prevBlock *blocktx_api.Block, longestTipExists bool) *blocktx_api.Block { hash := msg.Header.BlockHash() prevHash := msg.Header.PrevBlock merkleRoot := msg.Header.MerkleRoot diff --git a/internal/blocktx/processor_test.go b/internal/blocktx/processor_test.go index ffaab0ea8..9c931d683 100644 --- a/internal/blocktx/processor_test.go +++ b/internal/blocktx/processor_test.go @@ -9,24 +9,23 @@ import ( "testing" "time" - "github.com/libsv/go-bc" - "github.com/libsv/go-p2p" "github.com/libsv/go-p2p/chaincfg/chainhash" "github.com/libsv/go-p2p/wire" "github.com/stretchr/testify/require" "google.golang.org/protobuf/reflect/protoreflect" "github.com/bitcoin-sv/arc/internal/blocktx" + blockchain "github.com/bitcoin-sv/arc/internal/blocktx/blockchain_communication" + blocktx_p2p "github.com/bitcoin-sv/arc/internal/blocktx/blockchain_communication/p2p" "github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api" "github.com/bitcoin-sv/arc/internal/blocktx/mocks" "github.com/bitcoin-sv/arc/internal/blocktx/store" storeMocks "github.com/bitcoin-sv/arc/internal/blocktx/store/mocks" + p2p_mocks "github.com/bitcoin-sv/arc/internal/p2p/mocks" "github.com/bitcoin-sv/arc/internal/testdata" ) func TestHandleBlock(t *testing.T) { - // define HandleBlock function parameters (BlockMessage and p2p.PeerI) - prevBlockHash1573650, _ := chainhash.NewHashFromStr("00000000000007b1f872a8abe664223d65acd22a500b1b8eb5db3fe09a9837ff") merkleRootHash1573650, _ := chainhash.NewHashFromStr("3d64b2bb6bd4e85aacb6d1965a2407fa21846c08dd9a8616866ad2f5c80fda7f") @@ -144,7 +143,20 @@ func TestHandleBlock(t *testing.T) { for _, tc := range tt { t.Run(tc.name, func(t *testing.T) { // given - batchSize := 4 + const batchSize = 4 + + var expectedInsertedTransactions [][]byte + transactionHashes := make([]*chainhash.Hash, len(tc.txHashes)) + for i, hash := range tc.txHashes { + txHash, err := chainhash.NewHashFromStr(hash) + require.NoError(t, err) + transactionHashes[i] = txHash + + expectedInsertedTransactions = append(expectedInsertedTransactions, txHash[:]) + } + + var actualInsertedBlockTransactions [][]byte + storeMock := &storeMocks.BlocktxStoreMock{ GetBlockFunc: func(_ context.Context, _ *chainhash.Hash) (*blocktx_api.Block, error) { if tc.blockAlreadyProcessed { @@ -161,71 +173,35 @@ func TestHandleBlock(t *testing.T) { UpsertBlockFunc: func(_ context.Context, _ *blocktx_api.Block) (uint64, error) { return 0, nil }, - MarkBlockAsDoneFunc: func(_ context.Context, _ *chainhash.Hash, _ uint64, _ uint64) error { - return nil - }, - GetBlockHashesProcessingInProgressFunc: func(_ context.Context, _ string) ([]*chainhash.Hash, error) { - return nil, nil - }, + MarkBlockAsDoneFunc: func(_ context.Context, _ *chainhash.Hash, _ uint64, _ uint64) error { return nil }, + GetBlockHashesProcessingInProgressFunc: func(_ context.Context, _ string) ([]*chainhash.Hash, error) { return nil, nil }, } - mq := &mocks.MessageQueueClientMock{ - PublishMarshalFunc: func(_ context.Context, _ string, _ protoreflect.ProtoMessage) error { - return nil - }, - } - - // build peer manager and processor - logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})) - - var blockRequestCh chan blocktx.BlockRequest = nil // nolint: revive - blockProcessCh := make(chan *p2p.BlockMessage, 10) - - // when - sut := blocktx.NewPeerHandler(logger, blockRequestCh, blockProcessCh) - processor, err := blocktx.NewProcessor(logger, storeMock, blockRequestCh, blockProcessCh, blocktx.WithTransactionBatchSize(batchSize), blocktx.WithMessageQueueClient(mq)) - require.NoError(t, err) - - processor.StartBlockProcessing() - - var expectedInsertedTransactions [][]byte - transactionHashes := make([]*chainhash.Hash, len(tc.txHashes)) - for i, hash := range tc.txHashes { - txHash, err := chainhash.NewHashFromStr(hash) - require.NoError(t, err) - transactionHashes[i] = txHash - - expectedInsertedTransactions = append(expectedInsertedTransactions, txHash[:]) - } - - var insertedBlockTransactions [][]byte - storeMock.UpsertBlockTransactionsFunc = func(_ context.Context, _ uint64, txsWithMerklePaths []store.TxWithMerklePath) ([]store.TxWithMerklePath, error) { - require.True(t, len(txsWithMerklePaths) <= batchSize) + require.LessOrEqual(t, len(txsWithMerklePaths), batchSize) - for _, tx := range txsWithMerklePaths { - bump, err := bc.NewBUMPFromStr(tx.MerklePath) - require.NoError(t, err) - tx, err := chainhash.NewHash(tx.Hash) + for _, txWithMr := range txsWithMerklePaths { + tx, err := chainhash.NewHash(txWithMr.Hash) require.NoError(t, err) - root, err := bump.CalculateRootGivenTxid(tx.String()) - require.NoError(t, err) - - require.Equal(t, root, tc.merkleRoot.String()) - insertedBlockTransactions = append(insertedBlockTransactions, tx[:]) + actualInsertedBlockTransactions = append(actualInsertedBlockTransactions, tx[:]) } return txsWithMerklePaths, nil } - peer := &mocks.PeerMock{ - StringFunc: func() string { - return "" - }, + mq := &mocks.MessageQueueClientMock{ + PublishMarshalFunc: func(_ context.Context, _ string, _ protoreflect.ProtoMessage) error { return nil }, } - blockMessage := &p2p.BlockMessage{ + logger := slog.Default() + blockProcessCh := make(chan *blockchain.BlockMessage, 1) + p2pMsgHandler := blocktx_p2p.NewMsgHandler(logger, nil, blockProcessCh) + + sut, err := blocktx.NewProcessor(logger, storeMock, nil, blockProcessCh, blocktx.WithTransactionBatchSize(batchSize), blocktx.WithMessageQueueClient(mq)) + require.NoError(t, err) + + blockMessage := &blockchain.BlockMessage{ Header: &wire.BlockHeader{ Version: 541065216, PrevBlock: tc.prevBlockHash, @@ -238,14 +214,17 @@ func TestHandleBlock(t *testing.T) { Size: tc.size, } - // call tested function - err = sut.HandleBlock(blockMessage, peer) - require.NoError(t, err) + // when + sut.StartBlockProcessing() + + // simulate receiving block from node + p2pMsgHandler.OnReceive(blockMessage, &p2p_mocks.PeerIMock{StringFunc: func() string { return "peer" }}) + time.Sleep(20 * time.Millisecond) - processor.Shutdown() + sut.Shutdown() // then - require.ElementsMatch(t, expectedInsertedTransactions, insertedBlockTransactions) + require.ElementsMatch(t, expectedInsertedTransactions, actualInsertedBlockTransactions) }) } } @@ -373,13 +352,12 @@ func TestHandleBlockReorg(t *testing.T) { } // build peer manager and processor - logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})) - var blockRequestCh chan blocktx.BlockRequest = nil // nolint: revive - blockProcessCh := make(chan *p2p.BlockMessage, 10) + logger := slog.Default() + blockProcessCh := make(chan *blockchain.BlockMessage, 10) + p2pMsgHandler := blocktx_p2p.NewMsgHandler(logger, nil, blockProcessCh) - peerHandler := blocktx.NewPeerHandler(logger, blockRequestCh, blockProcessCh) - sut, err := blocktx.NewProcessor(logger, storeMock, blockRequestCh, blockProcessCh) + sut, err := blocktx.NewProcessor(logger, storeMock, nil, blockProcessCh) require.NoError(t, err) txHash, err := chainhash.NewHashFromStr("be181e91217d5f802f695e52144078f8dfbe51b8a815c3d6fb48c0d853ec683b") @@ -387,7 +365,7 @@ func TestHandleBlockReorg(t *testing.T) { merkleRoot, err := chainhash.NewHashFromStr("be181e91217d5f802f695e52144078f8dfbe51b8a815c3d6fb48c0d853ec683b") require.NoError(t, err) - blockMessage := &p2p.BlockMessage{ + blockMessage := &blockchain.BlockMessage{ Header: &wire.BlockHeader{ Version: 541065216, MerkleRoot: *merkleRoot, @@ -400,8 +378,8 @@ func TestHandleBlockReorg(t *testing.T) { // when sut.StartBlockProcessing() - err = peerHandler.HandleBlock(blockMessage, nil) - require.NoError(t, err) + // simulate receiving block from node + p2pMsgHandler.OnReceive(blockMessage, nil) // then time.Sleep(20 * time.Millisecond) @@ -478,15 +456,12 @@ func TestStartProcessRegisterTxs(t *testing.T) { } func TestStartBlockRequesting(t *testing.T) { - // define HandleBlock function parameters (BlockMessage and p2p.PeerI) - blockHash, err := chainhash.NewHashFromStr("00000000000007b1f872a8abe664223d65acd22a500b1b8eb5db3fe09a9837ff") require.NoError(t, err) tt := []struct { name string setBlockProcessingErr error - writeMsgErr error bhsProcInProg []*chainhash.Hash expectedSetBlockProcessingCalls int @@ -532,25 +507,6 @@ func TestStartBlockRequesting(t *testing.T) { expectedGetBlockHashesProcessingInProgressCalls: 1, expectedPeerWriteMessageCalls: 0, }, - { - name: "write message error", - writeMsgErr: errors.New("failed to write message"), - - expectedSetBlockProcessingCalls: 1, - expectedDelBlockProcessingCalls: 1, - expectedGetBlockHashesProcessingInProgressCalls: 1, - expectedPeerWriteMessageCalls: 1, - }, - { - name: "write message error - delete block processing failed at first try", - writeMsgErr: errors.New("failed to write message"), - - expectedSetBlockProcessingCalls: 1, - expectedDelBlockProcessingErrors: 1, - expectedDelBlockProcessingCalls: 2, - expectedGetBlockHashesProcessingInProgressCalls: 1, - expectedPeerWriteMessageCalls: 1, - }, } for _, tc := range tt { @@ -574,44 +530,40 @@ func TestStartBlockRequesting(t *testing.T) { return 1, nil } - writeMsgErrTest := tc.writeMsgErr - peerMock := &mocks.PeerMock{ - WriteMsgFunc: func(_ wire.Message) error { - return writeMsgErrTest - }, - StringFunc: func() string { - return "" - }, + peerMock := &p2p_mocks.PeerIMock{ + WriteMsgFunc: func(_ wire.Message) {}, + StringFunc: func() string { return "peer" }, } // build peer manager logger := slog.Default() - blockRequestCh := make(chan blocktx.BlockRequest, 10) - blockProcessCh := make(chan *p2p.BlockMessage, 10) + blockRequestCh := make(chan blocktx_p2p.BlockRequest, 10) + blockProcessCh := make(chan *blockchain.BlockMessage, 10) - // when - peerHandler := blocktx.NewPeerHandler(logger, blockRequestCh, blockProcessCh) - sut, err := blocktx.NewProcessor(logger, storeMock, blockRequestCh, blockProcessCh) - require.NoError(t, err) + peerHandler := blocktx_p2p.NewMsgHandler(logger, blockRequestCh, blockProcessCh) - // send msg Inv to blockRequest channel - err = peerHandler.HandleBlockAnnouncement(wire.NewInvVect(wire.InvTypeBlock, blockHash), peerMock) + sut, err := blocktx.NewProcessor(logger, storeMock, blockRequestCh, blockProcessCh) require.NoError(t, err) + // when sut.StartBlockRequesting() - // call tested function + // simulate receiving INV BLOCK msg from node + invMsg := wire.NewMsgInvSizeHint(1) + err = invMsg.AddInvVect(wire.NewInvVect(wire.InvTypeBlock, blockHash)) require.NoError(t, err) + peerHandler.OnReceive(invMsg, peerMock) + time.Sleep(200 * time.Millisecond) // then + defer sut.Shutdown() + require.Equal(t, tc.expectedGetBlockHashesProcessingInProgressCalls, len(storeMock.GetBlockHashesProcessingInProgressCalls())) require.Equal(t, tc.expectedDelBlockProcessingCalls, len(storeMock.DelBlockProcessingCalls())) require.Equal(t, tc.expectedSetBlockProcessingCalls, len(storeMock.SetBlockProcessingCalls())) require.Equal(t, tc.expectedPeerWriteMessageCalls, len(peerMock.WriteMsgCalls())) - - sut.Shutdown() }) } } diff --git a/internal/blocktx/server.go b/internal/blocktx/server.go index 0c0afc903..9d1451fda 100644 --- a/internal/blocktx/server.go +++ b/internal/blocktx/server.go @@ -5,7 +5,6 @@ import ( "log/slog" "time" - "github.com/libsv/go-p2p" "google.golang.org/grpc/reflection" "google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/timestamppb" @@ -14,6 +13,7 @@ import ( "github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api" "github.com/bitcoin-sv/arc/internal/blocktx/store" "github.com/bitcoin-sv/arc/internal/grpc_opts" + "github.com/bitcoin-sv/arc/internal/p2p" ) // Server type carries the logger within it. @@ -22,14 +22,14 @@ type Server struct { grpc_opts.GrpcServer logger *slog.Logger - pm p2p.PeerManagerI + pm *p2p.PeerManager store store.BlocktxStore maxAllowedBlockHeightMismatch int } // NewServer will return a server instance with the logger stored within it. func NewServer(prometheusEndpoint string, maxMsgSize int, logger *slog.Logger, - store store.BlocktxStore, pm p2p.PeerManagerI, maxAllowedBlockHeightMismatch int, tracingConfig *config.TracingConfig) (*Server, error) { + store store.BlocktxStore, pm *p2p.PeerManager, maxAllowedBlockHeightMismatch int, tracingConfig *config.TracingConfig) (*Server, error) { logger = logger.With(slog.String("module", "server")) grpcServer, err := grpc_opts.NewGrpcServer(logger, "blocktx", prometheusEndpoint, maxMsgSize, tracingConfig) diff --git a/internal/blocktx/server_test.go b/internal/blocktx/server_test.go index 51ba33679..7fa61838f 100644 --- a/internal/blocktx/server_test.go +++ b/internal/blocktx/server_test.go @@ -13,8 +13,8 @@ import ( "github.com/bitcoin-sv/arc/internal/blocktx" "github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api" - "github.com/bitcoin-sv/arc/internal/blocktx/mocks" storeMocks "github.com/bitcoin-sv/arc/internal/blocktx/store/mocks" + "github.com/bitcoin-sv/arc/internal/p2p" "github.com/bitcoin-sv/arc/internal/testdata" ) @@ -32,7 +32,7 @@ func TestListenAndServe(t *testing.T) { // given logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})) storeMock := &storeMocks.BlocktxStoreMock{} - pm := &mocks.PeerManagerMock{ShutdownFunc: func() {}} + pm := &p2p.PeerManager{} sut, err := blocktx.NewServer("", 0, logger, storeMock, pm, 0, nil) require.NoError(t, err) diff --git a/internal/metamorph/server_test.go b/internal/metamorph/server_test.go index 02c9293a0..a540c3249 100644 --- a/internal/metamorph/server_test.go +++ b/internal/metamorph/server_test.go @@ -17,7 +17,6 @@ import ( "google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/timestamppb" - "github.com/bitcoin-sv/arc/internal/blocktx" "github.com/bitcoin-sv/arc/internal/metamorph" "github.com/bitcoin-sv/arc/internal/metamorph/metamorph_api" "github.com/bitcoin-sv/arc/internal/metamorph/mocks" @@ -290,7 +289,7 @@ func TestServer_GetTransactionStatus(t *testing.T) { Txid: testdata.TX1Hash.String(), }, status: metamorph_api.Status_MINED, - getTxMerklePathErr: blocktx.ErrMerklePathNotFoundForTransaction, + getTxMerklePathErr: errors.New("merkle path not found for transaction"), want: &metamorph_api.TransactionStatus{ StoredAt: timestamppb.New(testdata.Time),