From 59689ba9983f087546c90d0185b063200c554faa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20B=C3=B6ckli?= <42514703+boecklim@users.noreply.github.com> Date: Mon, 3 Feb 2025 09:03:21 +0100 Subject: [PATCH] feat: p2p blocktx (#751) Co-authored-by: Arkadiusz Osowski --- cmd/arc/services/blocktx.go | 25 +- go.mod | 1 - go.sum | 7 - internal/blocktx/background_workers.go | 10 +- internal/blocktx/background_workers_test.go | 13 +- internal/blocktx/bcnet/block_message.go | 30 + .../bcnet/blocktx_p2p/message_handler.go | 78 +++ .../{peer_handler.go => bcnet/init.go} | 69 +-- internal/blocktx/bcnet/init_test.go | 33 ++ internal/blocktx/health_check.go | 16 +- internal/blocktx/health_check_test.go | 2 +- internal/blocktx/integration_test/helpers.go | 9 +- .../reorg_integration_test.go | 45 +- internal/blocktx/interface.go | 37 -- internal/blocktx/mocks/peer_mock.go | 512 ------------------ internal/blocktx/processor.go | 27 +- internal/blocktx/processor_test.go | 38 +- internal/blocktx/server.go | 7 +- internal/blocktx/server_test.go | 2 +- internal/p2p/tests/wire_reader_test.go | 119 ++++ 20 files changed, 357 insertions(+), 723 deletions(-) create mode 100644 internal/blocktx/bcnet/block_message.go create mode 100644 internal/blocktx/bcnet/blocktx_p2p/message_handler.go rename internal/blocktx/{peer_handler.go => bcnet/init.go} (56%) create mode 100644 internal/blocktx/bcnet/init_test.go delete mode 100644 internal/blocktx/interface.go delete mode 100644 internal/blocktx/mocks/peer_mock.go create mode 100644 internal/p2p/tests/wire_reader_test.go diff --git a/cmd/arc/services/blocktx.go b/cmd/arc/services/blocktx.go index 611ae815c..5c328b0a9 100644 --- a/cmd/arc/services/blocktx.go +++ b/cmd/arc/services/blocktx.go @@ -8,13 +8,14 @@ import ( "go.opentelemetry.io/otel/attribute" - "github.com/libsv/go-p2p" - "github.com/bitcoin-sv/arc/config" "github.com/bitcoin-sv/arc/internal/blocktx" + "github.com/bitcoin-sv/arc/internal/blocktx/bcnet" + "github.com/bitcoin-sv/arc/internal/blocktx/bcnet/blocktx_p2p" "github.com/bitcoin-sv/arc/internal/blocktx/store" "github.com/bitcoin-sv/arc/internal/blocktx/store/postgresql" "github.com/bitcoin-sv/arc/internal/grpc_opts" + "github.com/bitcoin-sv/arc/internal/p2p" "github.com/bitcoin-sv/arc/internal/version" "github.com/bitcoin-sv/arc/pkg/message_queue/nats/client/nats_core" "github.com/bitcoin-sv/arc/pkg/message_queue/nats/client/nats_jetstream" @@ -37,7 +38,7 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err blockStore store.BlocktxStore mqClient blocktx.MessageQueueClient processor *blocktx.Processor - pm p2p.PeerManagerI + pm *p2p.PeerManager server *blocktx.Server healthServer *grpc_opts.GrpcServer workers *blocktx.BackgroundWorkers @@ -127,8 +128,8 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err blocktx.WithIncomingIsLongest(btxConfig.IncomingIsLongest), ) - blockRequestCh := make(chan blocktx.BlockRequest, blockProcessingBuffer) - blockProcessCh := make(chan *p2p.BlockMessage, blockProcessingBuffer) + blockRequestCh := make(chan blocktx_p2p.BlockRequest, blockProcessingBuffer) + blockProcessCh := make(chan *bcnet.BlockMessage, blockProcessingBuffer) processor, err = blocktx.NewProcessor(logger, blockStore, blockRequestCh, blockProcessCh, processorOpts...) if err != nil { @@ -142,7 +143,10 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err return nil, fmt.Errorf("failed to start peer handler: %v", err) } - pmOpts := []p2p.PeerManagerOptions{p2p.WithExcessiveBlockSize(maximumBlockSize)} + // p2p global setting + p2p.SetExcessiveBlockSize(maximumBlockSize) + + pmOpts := []p2p.PeerManagerOptions{} if arcConfig.Blocktx.MonitorPeers { pmOpts = append(pmOpts, p2p.WithRestartUnhealthyPeers()) } @@ -150,7 +154,7 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err pm = p2p.NewPeerManager(logger.With(slog.String("module", "peer-mng")), network, pmOpts...) peers := make([]p2p.PeerI, len(arcConfig.Broadcasting.Unicast.Peers)) - peerHandler := blocktx.NewPeerHandler(logger, blockRequestCh, blockProcessCh) + peerHandler := blocktx_p2p.NewMsgHandler(logger, blockRequestCh, blockProcessCh) peerOpts := []p2p.PeerOptions{ p2p.WithMaximumMessageSize(maximumBlockSize), @@ -169,8 +173,9 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err return nil, fmt.Errorf("error getting peer url: %v", err) } - peer, err := p2p.NewPeer(logger.With(slog.String("module", "peer")), peerURL, peerHandler, network, peerOpts...) - if err != nil { + peer := p2p.NewPeer(logger.With(slog.String("module", "peer")), peerHandler, peerURL, network, peerOpts...) + ok := peer.Connect() + if !ok { stopFn() return nil, fmt.Errorf("error creating peer %s: %v", peerURL, err) } @@ -243,7 +248,7 @@ func NewBlocktxStore(logger *slog.Logger, dbConfig *config.DbConfig, tracingConf } func disposeBlockTx(l *slog.Logger, server *blocktx.Server, processor *blocktx.Processor, - pm p2p.PeerManagerI, mqClient blocktx.MessageQueueClient, + pm *p2p.PeerManager, mqClient blocktx.MessageQueueClient, store store.BlocktxStore, healthServer *grpc_opts.GrpcServer, workers *blocktx.BackgroundWorkers, shutdownFns []func(), ) { diff --git a/go.mod b/go.mod index 3a0785edf..452ff3e91 100644 --- a/go.mod +++ b/go.mod @@ -86,7 +86,6 @@ require ( github.com/go-openapi/swag v0.23.0 // indirect github.com/go-zeromq/goczmq/v4 v4.2.2 // indirect github.com/gogo/protobuf v1.3.2 // indirect - github.com/golang-jwt/jwt v3.2.2+incompatible // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/google/gnostic-models v0.6.8 // indirect github.com/google/gofuzz v1.2.0 // indirect diff --git a/go.sum b/go.sum index 65f19a8a4..a00b12448 100644 --- a/go.sum +++ b/go.sum @@ -106,8 +106,6 @@ github.com/go-zeromq/zmq4 v0.17.0 h1:r12/XdqPeRbuaF4C3QZJeWCt7a5vpJbslDH1rTXF+Kc github.com/go-zeromq/zmq4 v0.17.0/go.mod h1:EQxjJD92qKnrsVMzAnx62giD6uJIPi1dMGZ781iCDtY= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= -github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY= -github.com/golang-jwt/jwt v3.2.2+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= github.com/golang-migrate/migrate/v4 v4.16.2 h1:8coYbMKUyInrFk1lfGfRovTLAW7PhWp8qQDT2iKfuoA= github.com/golang-migrate/migrate/v4 v4.16.2/go.mod h1:pfcJX4nPHaVdc5nmdCikFBWtm+UBpiZjRNNsyBbp0/o= github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe h1:lXe2qZdvpiX5WZkZR4hgp4KJVfY3nMkvmwbVkpv1rVY= @@ -200,8 +198,6 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/labstack/echo-contrib v0.17.1 h1:7I/he7ylVKsDUieaGRZ9XxxTYOjfQwVzHzUYrNykfCU= github.com/labstack/echo-contrib v0.17.1/go.mod h1:SnsCZtwHBAZm5uBSAtQtXQHI3wqEA73hvTn0bYMKnZA= -github.com/labstack/echo/v4 v4.12.0 h1:IKpw49IMryVB2p1a4dzwlhP1O2Tf2E0Ir/450lH+kI0= -github.com/labstack/echo/v4 v4.12.0/go.mod h1:UP9Cr2DJXbOK3Kr9ONYzNowSh7HP0aG0ShAyycHSJvM= github.com/labstack/echo/v4 v4.13.3 h1:pwhpCPrTl5qry5HRdM5FwdXnhXSLSY+WE+YQSeCaafY= github.com/labstack/echo/v4 v4.13.3/go.mod h1:o90YNEeQWjDozo584l7AwhJMHN0bOC4tAfg+Xox9q5g= github.com/labstack/gommon v0.4.2 h1:F8qTUNXgG1+6WQmqoUWnz8WiEU60mXVVw0P4ht1WRA0= @@ -349,7 +345,6 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= @@ -454,8 +449,6 @@ golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= -golang.org/x/time v0.7.0 h1:ntUhktv3OPE6TgYxXWv9vKvUSJyIFJlyohwbkEwPrKQ= -golang.org/x/time v0.7.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg= golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/internal/blocktx/background_workers.go b/internal/blocktx/background_workers.go index d2d4ee44d..da394e8d3 100644 --- a/internal/blocktx/background_workers.go +++ b/internal/blocktx/background_workers.go @@ -6,9 +6,9 @@ import ( "sync" "time" - "github.com/libsv/go-p2p" - + "github.com/bitcoin-sv/arc/internal/blocktx/bcnet/blocktx_p2p" "github.com/bitcoin-sv/arc/internal/blocktx/store" + "github.com/bitcoin-sv/arc/internal/p2p" ) type BackgroundWorkers struct { @@ -41,7 +41,7 @@ func (w *BackgroundWorkers) GracefulStop() { w.logger.Info("Shutdown complete") } -func (w *BackgroundWorkers) StartFillGaps(peers []p2p.PeerI, interval time.Duration, retentionDays int, blockRequestingCh chan<- BlockRequest) { +func (w *BackgroundWorkers) StartFillGaps(peers []p2p.PeerI, interval time.Duration, retentionDays int, blockRequestingCh chan<- blocktx_p2p.BlockRequest) { w.workersWg.Add(1) go func() { @@ -69,7 +69,7 @@ func (w *BackgroundWorkers) StartFillGaps(peers []p2p.PeerI, interval time.Durat }() } -func (w *BackgroundWorkers) fillGaps(peer p2p.PeerI, retentionDays int, blockRequestingCh chan<- BlockRequest) error { +func (w *BackgroundWorkers) fillGaps(peer p2p.PeerI, retentionDays int, blockRequestingCh chan<- blocktx_p2p.BlockRequest) error { const ( hoursPerDay = 24 blocksPerHour = 6 @@ -92,7 +92,7 @@ func (w *BackgroundWorkers) fillGaps(peer p2p.PeerI, retentionDays int, blockReq slog.String("peer", peer.String()), ) - blockRequestingCh <- BlockRequest{ + blockRequestingCh <- blocktx_p2p.BlockRequest{ Hash: block.Hash, Peer: peer, } diff --git a/internal/blocktx/background_workers_test.go b/internal/blocktx/background_workers_test.go index edcdd5c90..f39c5b2e1 100644 --- a/internal/blocktx/background_workers_test.go +++ b/internal/blocktx/background_workers_test.go @@ -8,15 +8,14 @@ import ( "testing" "time" - "github.com/libsv/go-p2p" + "github.com/stretchr/testify/require" "github.com/bitcoin-sv/arc/internal/blocktx" - "github.com/bitcoin-sv/arc/internal/blocktx/mocks" + "github.com/bitcoin-sv/arc/internal/blocktx/bcnet/blocktx_p2p" "github.com/bitcoin-sv/arc/internal/blocktx/store" storeMocks "github.com/bitcoin-sv/arc/internal/blocktx/store/mocks" - - "github.com/stretchr/testify/require" - + "github.com/bitcoin-sv/arc/internal/p2p" + p2pMocks "github.com/bitcoin-sv/arc/internal/p2p/mocks" "github.com/bitcoin-sv/arc/internal/testdata" ) @@ -69,7 +68,7 @@ func TestStartFillGaps(t *testing.T) { // given const fillGapsInterval = 50 * time.Millisecond - blockRequestingCh := make(chan blocktx.BlockRequest, 10) + blockRequestingCh := make(chan blocktx_p2p.BlockRequest, 10) getBlockErrCh := make(chan error) getBlockGapTestErr := tc.getBlockGapsErr @@ -84,7 +83,7 @@ func TestStartFillGaps(t *testing.T) { }, } - peerMock := &mocks.PeerMock{ + peerMock := &p2pMocks.PeerIMock{ StringFunc: func() string { return "" }, diff --git a/internal/blocktx/bcnet/block_message.go b/internal/blocktx/bcnet/block_message.go new file mode 100644 index 000000000..2114465c5 --- /dev/null +++ b/internal/blocktx/bcnet/block_message.go @@ -0,0 +1,30 @@ +package bcnet + +import ( + "io" + + "github.com/libsv/go-p2p/chaincfg/chainhash" + "github.com/libsv/go-p2p/wire" +) + +// BlockMessage only stores the transaction IDs of the block, not the full transactions +type BlockMessage struct { + Hash *chainhash.Hash + Header *wire.BlockHeader + Height uint64 + TransactionHashes []*chainhash.Hash + Size uint64 +} + +func (bm *BlockMessage) Bsvdecode(io.Reader, uint32, wire.MessageEncoding) error { + return nil +} +func (bm *BlockMessage) BsvEncode(io.Writer, uint32, wire.MessageEncoding) error { + return nil +} +func (bm *BlockMessage) Command() string { + return "block" +} +func (bm *BlockMessage) MaxPayloadLength(uint32) uint64 { + return wire.MaxExtMsgPayload +} diff --git a/internal/blocktx/bcnet/blocktx_p2p/message_handler.go b/internal/blocktx/bcnet/blocktx_p2p/message_handler.go new file mode 100644 index 000000000..a4aafbe3d --- /dev/null +++ b/internal/blocktx/bcnet/blocktx_p2p/message_handler.go @@ -0,0 +1,78 @@ +package blocktx_p2p + +import ( + "errors" + "log/slog" + + "github.com/bitcoin-sv/arc/internal/blocktx/bcnet" + "github.com/bitcoin-sv/arc/internal/p2p" + "github.com/libsv/go-p2p/chaincfg/chainhash" + "github.com/libsv/go-p2p/wire" +) + +var ErrUnableToCastWireMessage = errors.New("unable to cast wire.Message to blockchain.BlockMessage") + +type BlockRequest struct { + Hash *chainhash.Hash + Peer p2p.PeerI +} + +var _ p2p.MessageHandlerI = (*MsgHandler)(nil) + +type MsgHandler struct { + logger *slog.Logger + blockRequestingCh chan<- BlockRequest + blockProcessingCh chan<- *bcnet.BlockMessage +} + +func NewMsgHandler(logger *slog.Logger, blockRequestCh chan<- BlockRequest, blockProcessCh chan<- *bcnet.BlockMessage) *MsgHandler { + return &MsgHandler{ + logger: logger.With(slog.String("module", "peer-msg-handler")), + blockRequestingCh: blockRequestCh, + blockProcessingCh: blockProcessCh, + } +} + +// OnReceive handles incoming messages depending on command type +func (h *MsgHandler) OnReceive(msg wire.Message, peer p2p.PeerI) { + cmd := msg.Command() + + switch cmd { + case wire.CmdInv: + invMsg, ok := msg.(*wire.MsgInv) + if !ok { + return + } + + go func() { + for _, iv := range invMsg.InvList { + if iv.Type == wire.InvTypeBlock { + req := BlockRequest{ + Hash: &iv.Hash, + Peer: peer, + } + + h.blockRequestingCh <- req + } + // ignore INV with transaction or error + } + }() + + case wire.CmdBlock: + blockMsg, ok := msg.(*bcnet.BlockMessage) + if !ok { + h.logger.Error("Block msg receive", slog.Any("err", ErrUnableToCastWireMessage)) + return + } + + h.blockProcessingCh <- blockMsg + + default: + // ignore other messages + } +} + +// OnSend handles outgoing messages depending on command type +func (h *MsgHandler) OnSend(_ wire.Message, _ p2p.PeerI) { + // ignore +} diff --git a/internal/blocktx/peer_handler.go b/internal/blocktx/bcnet/init.go similarity index 56% rename from internal/blocktx/peer_handler.go rename to internal/blocktx/bcnet/init.go index 179adeb24..88dc6255e 100644 --- a/internal/blocktx/peer_handler.go +++ b/internal/blocktx/bcnet/init.go @@ -1,25 +1,20 @@ -package blocktx +package bcnet import ( "encoding/binary" - "errors" "io" - "log/slog" "github.com/bitcoin-sv/go-sdk/script" sdkTx "github.com/bitcoin-sv/go-sdk/transaction" "github.com/bitcoin-sv/go-sdk/util" - "github.com/libsv/go-p2p" "github.com/libsv/go-p2p/chaincfg/chainhash" "github.com/libsv/go-p2p/wire" ) -var ErrUnableToCastWireMessage = errors.New("unable to cast wire.Message to p2p.BlockMessage") - func init() { // override the default wire block handler with our own that streams and stores only the transaction ids wire.SetExternalHandler(wire.CmdBlock, func(reader io.Reader, _ uint64, bytesRead int) (int, wire.Message, []byte, error) { - blockMessage := &p2p.BlockMessage{ + blockMessage := &BlockMessage{ Header: &wire.BlockHeader{}, } @@ -42,7 +37,7 @@ func init() { var tx *sdkTx.Transaction var hash *chainhash.Hash var txBytes []byte - for i := 0; i < int(txCount); i++ { + for i := uint64(0); i < uint64(txCount); i++ { tx = sdkTx.NewTransaction() read, err = tx.ReadFrom(reader) if err != nil { @@ -62,66 +57,14 @@ func init() { } } - blockMessage.Size = uint64(bytesRead) + blockMessage.Size = uint64(bytesRead) // #nosec G115 + blockHash := blockMessage.Header.BlockHash() + blockMessage.Hash = &blockHash return bytesRead, blockMessage, nil, nil }) } -type PeerHandler struct { - logger *slog.Logger - blockRequestCh chan BlockRequest - blockProcessCh chan *p2p.BlockMessage -} - -func NewPeerHandler(logger *slog.Logger, blockRequestCh chan BlockRequest, blockProcessCh chan *p2p.BlockMessage) *PeerHandler { - return &PeerHandler{ - logger: logger.With(slog.String("module", "peer-handler")), - blockRequestCh: blockRequestCh, - blockProcessCh: blockProcessCh, - } -} - -func (ph *PeerHandler) HandleTransactionsGet(_ []*wire.InvVect, _ p2p.PeerI) ([][]byte, error) { - return nil, nil -} - -func (ph *PeerHandler) HandleTransactionSent(_ *wire.MsgTx, _ p2p.PeerI) error { - return nil -} - -func (ph *PeerHandler) HandleTransactionAnnouncement(_ *wire.InvVect, _ p2p.PeerI) error { - return nil -} - -func (ph *PeerHandler) HandleTransactionRejection(_ *wire.MsgReject, _ p2p.PeerI) error { - return nil -} - -func (ph *PeerHandler) HandleTransaction(_ *wire.MsgTx, _ p2p.PeerI) error { - return nil -} - -func (ph *PeerHandler) HandleBlockAnnouncement(msg *wire.InvVect, peer p2p.PeerI) error { - req := BlockRequest{ - Hash: &msg.Hash, - Peer: peer, - } - - ph.blockRequestCh <- req - return nil -} - -func (ph *PeerHandler) HandleBlock(wireMsg wire.Message, _ p2p.PeerI) error { - msg, ok := wireMsg.(*p2p.BlockMessage) - if !ok { - return ErrUnableToCastWireMessage - } - - ph.blockProcessCh <- msg - return nil -} - func extractHeightFromCoinbaseTx(tx *sdkTx.Transaction) uint64 { // Coinbase tx has a special format, the height is encoded in the first 4 bytes of the scriptSig (BIP-34) diff --git a/internal/blocktx/bcnet/init_test.go b/internal/blocktx/bcnet/init_test.go new file mode 100644 index 000000000..095c94f8f --- /dev/null +++ b/internal/blocktx/bcnet/init_test.go @@ -0,0 +1,33 @@ +package bcnet + +import ( + "testing" + + sdkTx "github.com/bitcoin-sv/go-sdk/transaction" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestExtractHeight(t *testing.T) { + // given + tx, err := sdkTx.NewTransactionFromHex("01000000010000000000000000000000000000000000000000000000000000000000000000ffffffff570350cc0b041547b5630cfabe6d6d0000000000000000000000000000000000000000000000000000000000000000010000000000000047ed20542096bd0000000000143362663865373833636662643732306431383436000000000140be4025000000001976a914c9b0abe09b7dd8e9d1e8c1e3502d32ab0d7119e488ac00000000") + require.NoError(t, err) + + // when + height := extractHeightFromCoinbaseTx(tx) + + // then + assert.Equalf(t, uint64(773200), height, "height should be 773200, got %d", height) +} + +func TestExtractHeightForRegtest(t *testing.T) { + // given + tx, err := sdkTx.NewTransactionFromHex("02000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0502dc070101ffffffff012f500900000000002321032efe256e14fd77eea05d0453374f8920e0a7a4a573bb3937ef3f567f3937129cac00000000") + require.NoError(t, err) + + // when + height := extractHeightFromCoinbaseTx(tx) + + // then + assert.Equalf(t, uint64(2012), height, "height should be 2012, got %d", height) +} diff --git a/internal/blocktx/health_check.go b/internal/blocktx/health_check.go index 72180cab8..f6c556fe8 100644 --- a/internal/blocktx/health_check.go +++ b/internal/blocktx/health_check.go @@ -31,13 +31,7 @@ func (s *Server) Check(ctx context.Context, req *grpc_health_v1.HealthCheckReque } // verify we have at least 1 node connected to blocktx - healthy := false - for _, peer := range s.pm.GetPeers() { - if peer.IsHealthy() && peer.Connected() { - healthy = true - break - } - } + healthy := s.pm.CountConnectedPeers() > 0 if !healthy { s.logger.Error("healthy peer not found") @@ -66,13 +60,7 @@ func (s *Server) Watch(req *grpc_health_v1.HealthCheckRequest, server grpc_healt } // verify we have at least 1 node connected to blocktx - healthy := false - for _, peer := range s.pm.GetPeers() { - if peer.IsHealthy() && peer.Connected() { - healthy = true - break - } - } + healthy := s.pm.CountConnectedPeers() > 0 if !healthy { s.logger.Error("healthy peer not found") diff --git a/internal/blocktx/health_check_test.go b/internal/blocktx/health_check_test.go index faffb0513..bf8abc5fa 100644 --- a/internal/blocktx/health_check_test.go +++ b/internal/blocktx/health_check_test.go @@ -7,7 +7,7 @@ import ( "os" "testing" - "github.com/libsv/go-p2p" + "github.com/bitcoin-sv/arc/internal/p2p" "github.com/stretchr/testify/require" "google.golang.org/grpc/health/grpc_health_v1" diff --git a/internal/blocktx/integration_test/helpers.go b/internal/blocktx/integration_test/helpers.go index 8e34b5b4b..b202cb3e4 100644 --- a/internal/blocktx/integration_test/helpers.go +++ b/internal/blocktx/integration_test/helpers.go @@ -7,11 +7,12 @@ import ( "os" "testing" - "github.com/libsv/go-p2p" "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" "github.com/bitcoin-sv/arc/internal/blocktx" + "github.com/bitcoin-sv/arc/internal/blocktx/bcnet" + "github.com/bitcoin-sv/arc/internal/blocktx/bcnet/blocktx_p2p" "github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api" "github.com/bitcoin-sv/arc/internal/blocktx/store/postgresql" "github.com/bitcoin-sv/arc/pkg/message_queue/nats/client/nats_core" @@ -19,12 +20,12 @@ import ( "github.com/bitcoin-sv/arc/pkg/test_utils" ) -func setupSut(t *testing.T, dbInfo string) (*blocktx.Processor, *blocktx.PeerHandler, *postgresql.PostgreSQL, chan []byte, chan *blocktx_api.TransactionBlock) { +func setupSut(t *testing.T, dbInfo string) (*blocktx.Processor, *blocktx_p2p.MsgHandler, *postgresql.PostgreSQL, chan []byte, chan *blocktx_api.TransactionBlock) { t.Helper() logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) - blockProcessCh := make(chan *p2p.BlockMessage, 10) + blockProcessCh := make(chan *bcnet.BlockMessage, 10) registerTxChannel := make(chan []byte, 10) publishedTxsCh := make(chan *blocktx_api.TransactionBlock, 10) @@ -44,7 +45,7 @@ func setupSut(t *testing.T, dbInfo string) (*blocktx.Processor, *blocktx.PeerHan } mqClient := nats_core.New(mockNatsConn, nats_core.WithLogger(logger)) - p2pMsgHandler := blocktx.NewPeerHandler(logger, nil, blockProcessCh) + p2pMsgHandler := blocktx_p2p.NewMsgHandler(logger, nil, blockProcessCh) processor, err := blocktx.NewProcessor( logger, store, diff --git a/internal/blocktx/integration_test/reorg_integration_test.go b/internal/blocktx/integration_test/reorg_integration_test.go index 5336a2808..cec7372d5 100644 --- a/internal/blocktx/integration_test/reorg_integration_test.go +++ b/internal/blocktx/integration_test/reorg_integration_test.go @@ -32,13 +32,13 @@ import ( "testing" "time" + "github.com/bitcoin-sv/arc/internal/blocktx/bcnet" "github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api" "github.com/bitcoin-sv/arc/pkg/test_utils" _ "github.com/golang-migrate/migrate/v4/source/file" _ "github.com/lib/pq" "github.com/libsv/go-bc" - "github.com/libsv/go-p2p" "github.com/libsv/go-p2p/chaincfg/chainhash" "github.com/libsv/go-p2p/wire" "github.com/stretchr/testify/require" @@ -56,7 +56,7 @@ func TestReorg(t *testing.T) { const blockHash822011 = "bf9be09b345cc2d904b59951cc8a2ed452d8d143e2e25cde64058270fb3a667a" - // blockHash := testutils.RevChainhash(t, blockHash822011) + blockHash := testutils.RevChainhash(t, blockHash822011) prevBlockHash := testutils.RevChainhash(t, "00000000000000000a00c377b260a3219b0c314763f486bc363df7aa7e22ad72") txHash, err := chainhash.NewHashFromStr("be181e91217d5f802f695e52144078f8dfbe51b8a815c3d6fb48c0d853ec683b") require.NoError(t, err) @@ -64,8 +64,8 @@ func TestReorg(t *testing.T) { require.NoError(t, err) // should become LONGEST - blockMessage := &p2p.BlockMessage{ - // Hash: blockHash, + blockMessage := &bcnet.BlockMessage{ + Hash: blockHash, Header: &wire.BlockHeader{ Version: 541065216, PrevBlock: *prevBlockHash, // NON-existent in the db @@ -77,8 +77,7 @@ func TestReorg(t *testing.T) { } processor.StartBlockProcessing() - err = p2pMsgHandler.HandleBlock(blockMessage, nil) - require.NoError(t, err) + p2pMsgHandler.OnReceive(blockMessage, nil) // Allow DB to process the block time.Sleep(200 * time.Millisecond) @@ -100,7 +99,7 @@ func TestReorg(t *testing.T) { txhash822015Competing = "b16cea53fc823e146fbb9ae4ad3124f7c273f30562585ad6e4831495d609f430" ) - // blockHash := testutils.RevChainhash(t, blockHash822015Fork) + blockHash := testutils.RevChainhash(t, blockHash822015Fork) prevBlockHash := testutils.RevChainhash(t, blockHash822014StartOfChain) txHash := testutils.RevChainhash(t, txhash822015) txHash2 := testutils.RevChainhash(t, txhash822015Competing) // should not be published - is already in the longest chain @@ -108,8 +107,8 @@ func TestReorg(t *testing.T) { merkleRoot := treeStore[len(treeStore)-1] // should become STALE - blockMessage := &p2p.BlockMessage{ - // Hash: blockHash, + blockMessage := &bcnet.BlockMessage{ + Hash: blockHash, Header: &wire.BlockHeader{ Version: 541065216, PrevBlock: *prevBlockHash, // block with status LONGEST at height 822014 @@ -121,8 +120,7 @@ func TestReorg(t *testing.T) { } processor.StartBlockProcessing() - err := p2pMsgHandler.HandleBlock(blockMessage, nil) - require.NoError(t, err) + p2pMsgHandler.OnReceive(blockMessage, nil) // Allow DB to process the block time.Sleep(200 * time.Millisecond) @@ -167,8 +165,8 @@ func TestReorg(t *testing.T) { // should become LONGEST // reorg should happen - blockMessage := &p2p.BlockMessage{ - // Hash: blockHash, + blockMessage := &bcnet.BlockMessage{ + Hash: blockHash, Header: &wire.BlockHeader{ Version: 541065216, PrevBlock: *prevhash, // block with status STALE at height 822015 @@ -180,8 +178,7 @@ func TestReorg(t *testing.T) { } processor.StartBlockProcessing() - err := p2pMsgHandler.HandleBlock(blockMessage, nil) - require.NoError(t, err) + p2pMsgHandler.OnReceive(blockMessage, nil) // Allow DB to process the block and perform reorg time.Sleep(1 * time.Second) @@ -248,14 +245,14 @@ func TestReorg(t *testing.T) { blockHash822023Orphan = "0000000000000000082131979a4e25a5101912a5f8461e18f306d23e158161cd" ) - // blockHash := testutils.RevChainhash(t, blockHash822021) + blockHash := testutils.RevChainhash(t, blockHash822021) txHash := testutils.RevChainhash(t, "de0753d9ce6f92e340843cbfdd11e58beff8c578956ecdec4c461b018a26b8a9") merkleRoot := testutils.RevChainhash(t, "de0753d9ce6f92e340843cbfdd11e58beff8c578956ecdec4c461b018a26b8a9") prevhash := testutils.RevChainhash(t, blockHash822020Orphan) // should become STALE - blockMessage := &p2p.BlockMessage{ - // Hash: blockHash, + blockMessage := &bcnet.BlockMessage{ + Hash: blockHash, Header: &wire.BlockHeader{ Version: 541065216, PrevBlock: *prevhash, // block with status ORPHANED at height 822020 - connected to STALE chain @@ -267,8 +264,7 @@ func TestReorg(t *testing.T) { } processor.StartBlockProcessing() - err := p2pMsgHandler.HandleBlock(blockMessage, nil) - require.NoError(t, err) + p2pMsgHandler.OnReceive(blockMessage, nil) // Allow DB to process the block and find orphans time.Sleep(1 * time.Second) @@ -321,15 +317,15 @@ func TestReorg(t *testing.T) { txhash822017 = "ece2b7e40d98749c03c551b783420d6e3fdc3c958244bbf275437839585829a6" ) - // blockHash := testutils.RevChainhash(t, blockHash822021) + blockHash := testutils.RevChainhash(t, blockHash822021) prevhash := testutils.RevChainhash(t, blockHash822020Orphan) txHash := testutils.RevChainhash(t, "3e15f823a7de25c26ce9001d4814a6f0ebc915a1ca4f1ba9cfac720bd941c39c") merkleRoot := testutils.RevChainhash(t, "3e15f823a7de25c26ce9001d4814a6f0ebc915a1ca4f1ba9cfac720bd941c39c") // should become LONGEST // reorg should happen - blockMessage := &p2p.BlockMessage{ - // Hash: blockHash, + blockMessage := &bcnet.BlockMessage{ + Hash: blockHash, Header: &wire.BlockHeader{ Version: 541065216, PrevBlock: *prevhash, // block with status ORPHANED at height 822020 - connected to STALE chain @@ -342,8 +338,7 @@ func TestReorg(t *testing.T) { } processor.StartBlockProcessing() - err := p2pMsgHandler.HandleBlock(blockMessage, nil) - require.NoError(t, err) + p2pMsgHandler.OnReceive(blockMessage, nil) // Allow DB to process the block, find orphans and perform reorg time.Sleep(3 * time.Second) diff --git a/internal/blocktx/interface.go b/internal/blocktx/interface.go deleted file mode 100644 index a17b15f6d..000000000 --- a/internal/blocktx/interface.go +++ /dev/null @@ -1,37 +0,0 @@ -package blocktx - -import ( - "github.com/libsv/go-p2p" - "github.com/libsv/go-p2p/chaincfg/chainhash" - "github.com/libsv/go-p2p/wire" -) - -type BlockRequest struct { - Hash *chainhash.Hash - Peer p2p.PeerI -} - -type Peer interface { - Connected() bool - WriteMsg(msg wire.Message) error - String() string - AnnounceTransaction(txHash *chainhash.Hash) - RequestTransaction(txHash *chainhash.Hash) - AnnounceBlock(blockHash *chainhash.Hash) - RequestBlock(blockHash *chainhash.Hash) - Network() wire.BitcoinNet - IsHealthy() bool - IsUnhealthyCh() <-chan struct{} - Shutdown() - Restart() -} - -type PeerManager interface { - AnnounceTransaction(txHash *chainhash.Hash, peers []p2p.PeerI) []p2p.PeerI - RequestTransaction(txHash *chainhash.Hash) p2p.PeerI - AnnounceBlock(blockHash *chainhash.Hash, peers []p2p.PeerI) []p2p.PeerI - RequestBlock(blockHash *chainhash.Hash) p2p.PeerI - AddPeer(peer p2p.PeerI) error - GetPeers() []p2p.PeerI - Shutdown() -} diff --git a/internal/blocktx/mocks/peer_mock.go b/internal/blocktx/mocks/peer_mock.go deleted file mode 100644 index 6b3f52020..000000000 --- a/internal/blocktx/mocks/peer_mock.go +++ /dev/null @@ -1,512 +0,0 @@ -// Code generated by moq; DO NOT EDIT. -// github.com/matryer/moq - -package mocks - -import ( - "github.com/bitcoin-sv/arc/internal/blocktx" - "github.com/libsv/go-p2p/chaincfg/chainhash" - "github.com/libsv/go-p2p/wire" - "sync" -) - -// Ensure, that PeerMock does implement blocktx.Peer. -// If this is not the case, regenerate this file with moq. -var _ blocktx.Peer = &PeerMock{} - -// PeerMock is a mock implementation of blocktx.Peer. -// -// func TestSomethingThatUsesPeer(t *testing.T) { -// -// // make and configure a mocked blocktx.Peer -// mockedPeer := &PeerMock{ -// AnnounceBlockFunc: func(blockHash *chainhash.Hash) { -// panic("mock out the AnnounceBlock method") -// }, -// AnnounceTransactionFunc: func(txHash *chainhash.Hash) { -// panic("mock out the AnnounceTransaction method") -// }, -// ConnectedFunc: func() bool { -// panic("mock out the Connected method") -// }, -// IsHealthyFunc: func() bool { -// panic("mock out the IsHealthy method") -// }, -// IsUnhealthyChFunc: func() <-chan struct{} { -// panic("mock out the IsUnhealthyCh method") -// }, -// NetworkFunc: func() wire.BitcoinNet { -// panic("mock out the Network method") -// }, -// RequestBlockFunc: func(blockHash *chainhash.Hash) { -// panic("mock out the RequestBlock method") -// }, -// RequestTransactionFunc: func(txHash *chainhash.Hash) { -// panic("mock out the RequestTransaction method") -// }, -// RestartFunc: func() { -// panic("mock out the Restart method") -// }, -// ShutdownFunc: func() { -// panic("mock out the Shutdown method") -// }, -// StringFunc: func() string { -// panic("mock out the String method") -// }, -// WriteMsgFunc: func(msg wire.Message) error { -// panic("mock out the WriteMsg method") -// }, -// } -// -// // use mockedPeer in code that requires blocktx.Peer -// // and then make assertions. -// -// } -type PeerMock struct { - // AnnounceBlockFunc mocks the AnnounceBlock method. - AnnounceBlockFunc func(blockHash *chainhash.Hash) - - // AnnounceTransactionFunc mocks the AnnounceTransaction method. - AnnounceTransactionFunc func(txHash *chainhash.Hash) - - // ConnectedFunc mocks the Connected method. - ConnectedFunc func() bool - - // IsHealthyFunc mocks the IsHealthy method. - IsHealthyFunc func() bool - - // IsUnhealthyChFunc mocks the IsUnhealthyCh method. - IsUnhealthyChFunc func() <-chan struct{} - - // NetworkFunc mocks the Network method. - NetworkFunc func() wire.BitcoinNet - - // RequestBlockFunc mocks the RequestBlock method. - RequestBlockFunc func(blockHash *chainhash.Hash) - - // RequestTransactionFunc mocks the RequestTransaction method. - RequestTransactionFunc func(txHash *chainhash.Hash) - - // RestartFunc mocks the Restart method. - RestartFunc func() - - // ShutdownFunc mocks the Shutdown method. - ShutdownFunc func() - - // StringFunc mocks the String method. - StringFunc func() string - - // WriteMsgFunc mocks the WriteMsg method. - WriteMsgFunc func(msg wire.Message) error - - // calls tracks calls to the methods. - calls struct { - // AnnounceBlock holds details about calls to the AnnounceBlock method. - AnnounceBlock []struct { - // BlockHash is the blockHash argument value. - BlockHash *chainhash.Hash - } - // AnnounceTransaction holds details about calls to the AnnounceTransaction method. - AnnounceTransaction []struct { - // TxHash is the txHash argument value. - TxHash *chainhash.Hash - } - // Connected holds details about calls to the Connected method. - Connected []struct { - } - // IsHealthy holds details about calls to the IsHealthy method. - IsHealthy []struct { - } - // IsUnhealthyCh holds details about calls to the IsUnhealthyCh method. - IsUnhealthyCh []struct { - } - // Network holds details about calls to the Network method. - Network []struct { - } - // RequestBlock holds details about calls to the RequestBlock method. - RequestBlock []struct { - // BlockHash is the blockHash argument value. - BlockHash *chainhash.Hash - } - // RequestTransaction holds details about calls to the RequestTransaction method. - RequestTransaction []struct { - // TxHash is the txHash argument value. - TxHash *chainhash.Hash - } - // Restart holds details about calls to the Restart method. - Restart []struct { - } - // Shutdown holds details about calls to the Shutdown method. - Shutdown []struct { - } - // String holds details about calls to the String method. - String []struct { - } - // WriteMsg holds details about calls to the WriteMsg method. - WriteMsg []struct { - // Msg is the msg argument value. - Msg wire.Message - } - } - lockAnnounceBlock sync.RWMutex - lockAnnounceTransaction sync.RWMutex - lockConnected sync.RWMutex - lockIsHealthy sync.RWMutex - lockIsUnhealthyCh sync.RWMutex - lockNetwork sync.RWMutex - lockRequestBlock sync.RWMutex - lockRequestTransaction sync.RWMutex - lockRestart sync.RWMutex - lockShutdown sync.RWMutex - lockString sync.RWMutex - lockWriteMsg sync.RWMutex -} - -// AnnounceBlock calls AnnounceBlockFunc. -func (mock *PeerMock) AnnounceBlock(blockHash *chainhash.Hash) { - if mock.AnnounceBlockFunc == nil { - panic("PeerMock.AnnounceBlockFunc: method is nil but Peer.AnnounceBlock was just called") - } - callInfo := struct { - BlockHash *chainhash.Hash - }{ - BlockHash: blockHash, - } - mock.lockAnnounceBlock.Lock() - mock.calls.AnnounceBlock = append(mock.calls.AnnounceBlock, callInfo) - mock.lockAnnounceBlock.Unlock() - mock.AnnounceBlockFunc(blockHash) -} - -// AnnounceBlockCalls gets all the calls that were made to AnnounceBlock. -// Check the length with: -// -// len(mockedPeer.AnnounceBlockCalls()) -func (mock *PeerMock) AnnounceBlockCalls() []struct { - BlockHash *chainhash.Hash -} { - var calls []struct { - BlockHash *chainhash.Hash - } - mock.lockAnnounceBlock.RLock() - calls = mock.calls.AnnounceBlock - mock.lockAnnounceBlock.RUnlock() - return calls -} - -// AnnounceTransaction calls AnnounceTransactionFunc. -func (mock *PeerMock) AnnounceTransaction(txHash *chainhash.Hash) { - if mock.AnnounceTransactionFunc == nil { - panic("PeerMock.AnnounceTransactionFunc: method is nil but Peer.AnnounceTransaction was just called") - } - callInfo := struct { - TxHash *chainhash.Hash - }{ - TxHash: txHash, - } - mock.lockAnnounceTransaction.Lock() - mock.calls.AnnounceTransaction = append(mock.calls.AnnounceTransaction, callInfo) - mock.lockAnnounceTransaction.Unlock() - mock.AnnounceTransactionFunc(txHash) -} - -// AnnounceTransactionCalls gets all the calls that were made to AnnounceTransaction. -// Check the length with: -// -// len(mockedPeer.AnnounceTransactionCalls()) -func (mock *PeerMock) AnnounceTransactionCalls() []struct { - TxHash *chainhash.Hash -} { - var calls []struct { - TxHash *chainhash.Hash - } - mock.lockAnnounceTransaction.RLock() - calls = mock.calls.AnnounceTransaction - mock.lockAnnounceTransaction.RUnlock() - return calls -} - -// Connected calls ConnectedFunc. -func (mock *PeerMock) Connected() bool { - if mock.ConnectedFunc == nil { - panic("PeerMock.ConnectedFunc: method is nil but Peer.Connected was just called") - } - callInfo := struct { - }{} - mock.lockConnected.Lock() - mock.calls.Connected = append(mock.calls.Connected, callInfo) - mock.lockConnected.Unlock() - return mock.ConnectedFunc() -} - -// ConnectedCalls gets all the calls that were made to Connected. -// Check the length with: -// -// len(mockedPeer.ConnectedCalls()) -func (mock *PeerMock) ConnectedCalls() []struct { -} { - var calls []struct { - } - mock.lockConnected.RLock() - calls = mock.calls.Connected - mock.lockConnected.RUnlock() - return calls -} - -// IsHealthy calls IsHealthyFunc. -func (mock *PeerMock) IsHealthy() bool { - if mock.IsHealthyFunc == nil { - panic("PeerMock.IsHealthyFunc: method is nil but Peer.IsHealthy was just called") - } - callInfo := struct { - }{} - mock.lockIsHealthy.Lock() - mock.calls.IsHealthy = append(mock.calls.IsHealthy, callInfo) - mock.lockIsHealthy.Unlock() - return mock.IsHealthyFunc() -} - -// IsHealthyCalls gets all the calls that were made to IsHealthy. -// Check the length with: -// -// len(mockedPeer.IsHealthyCalls()) -func (mock *PeerMock) IsHealthyCalls() []struct { -} { - var calls []struct { - } - mock.lockIsHealthy.RLock() - calls = mock.calls.IsHealthy - mock.lockIsHealthy.RUnlock() - return calls -} - -// IsUnhealthyCh calls IsUnhealthyChFunc. -func (mock *PeerMock) IsUnhealthyCh() <-chan struct{} { - if mock.IsUnhealthyChFunc == nil { - panic("PeerMock.IsUnhealthyChFunc: method is nil but Peer.IsUnhealthyCh was just called") - } - callInfo := struct { - }{} - mock.lockIsUnhealthyCh.Lock() - mock.calls.IsUnhealthyCh = append(mock.calls.IsUnhealthyCh, callInfo) - mock.lockIsUnhealthyCh.Unlock() - return mock.IsUnhealthyChFunc() -} - -// IsUnhealthyChCalls gets all the calls that were made to IsUnhealthyCh. -// Check the length with: -// -// len(mockedPeer.IsUnhealthyChCalls()) -func (mock *PeerMock) IsUnhealthyChCalls() []struct { -} { - var calls []struct { - } - mock.lockIsUnhealthyCh.RLock() - calls = mock.calls.IsUnhealthyCh - mock.lockIsUnhealthyCh.RUnlock() - return calls -} - -// Network calls NetworkFunc. -func (mock *PeerMock) Network() wire.BitcoinNet { - if mock.NetworkFunc == nil { - panic("PeerMock.NetworkFunc: method is nil but Peer.Network was just called") - } - callInfo := struct { - }{} - mock.lockNetwork.Lock() - mock.calls.Network = append(mock.calls.Network, callInfo) - mock.lockNetwork.Unlock() - return mock.NetworkFunc() -} - -// NetworkCalls gets all the calls that were made to Network. -// Check the length with: -// -// len(mockedPeer.NetworkCalls()) -func (mock *PeerMock) NetworkCalls() []struct { -} { - var calls []struct { - } - mock.lockNetwork.RLock() - calls = mock.calls.Network - mock.lockNetwork.RUnlock() - return calls -} - -// RequestBlock calls RequestBlockFunc. -func (mock *PeerMock) RequestBlock(blockHash *chainhash.Hash) { - if mock.RequestBlockFunc == nil { - panic("PeerMock.RequestBlockFunc: method is nil but Peer.RequestBlock was just called") - } - callInfo := struct { - BlockHash *chainhash.Hash - }{ - BlockHash: blockHash, - } - mock.lockRequestBlock.Lock() - mock.calls.RequestBlock = append(mock.calls.RequestBlock, callInfo) - mock.lockRequestBlock.Unlock() - mock.RequestBlockFunc(blockHash) -} - -// RequestBlockCalls gets all the calls that were made to RequestBlock. -// Check the length with: -// -// len(mockedPeer.RequestBlockCalls()) -func (mock *PeerMock) RequestBlockCalls() []struct { - BlockHash *chainhash.Hash -} { - var calls []struct { - BlockHash *chainhash.Hash - } - mock.lockRequestBlock.RLock() - calls = mock.calls.RequestBlock - mock.lockRequestBlock.RUnlock() - return calls -} - -// RequestTransaction calls RequestTransactionFunc. -func (mock *PeerMock) RequestTransaction(txHash *chainhash.Hash) { - if mock.RequestTransactionFunc == nil { - panic("PeerMock.RequestTransactionFunc: method is nil but Peer.RequestTransaction was just called") - } - callInfo := struct { - TxHash *chainhash.Hash - }{ - TxHash: txHash, - } - mock.lockRequestTransaction.Lock() - mock.calls.RequestTransaction = append(mock.calls.RequestTransaction, callInfo) - mock.lockRequestTransaction.Unlock() - mock.RequestTransactionFunc(txHash) -} - -// RequestTransactionCalls gets all the calls that were made to RequestTransaction. -// Check the length with: -// -// len(mockedPeer.RequestTransactionCalls()) -func (mock *PeerMock) RequestTransactionCalls() []struct { - TxHash *chainhash.Hash -} { - var calls []struct { - TxHash *chainhash.Hash - } - mock.lockRequestTransaction.RLock() - calls = mock.calls.RequestTransaction - mock.lockRequestTransaction.RUnlock() - return calls -} - -// Restart calls RestartFunc. -func (mock *PeerMock) Restart() { - if mock.RestartFunc == nil { - panic("PeerMock.RestartFunc: method is nil but Peer.Restart was just called") - } - callInfo := struct { - }{} - mock.lockRestart.Lock() - mock.calls.Restart = append(mock.calls.Restart, callInfo) - mock.lockRestart.Unlock() - mock.RestartFunc() -} - -// RestartCalls gets all the calls that were made to Restart. -// Check the length with: -// -// len(mockedPeer.RestartCalls()) -func (mock *PeerMock) RestartCalls() []struct { -} { - var calls []struct { - } - mock.lockRestart.RLock() - calls = mock.calls.Restart - mock.lockRestart.RUnlock() - return calls -} - -// Shutdown calls ShutdownFunc. -func (mock *PeerMock) Shutdown() { - if mock.ShutdownFunc == nil { - panic("PeerMock.ShutdownFunc: method is nil but Peer.Shutdown was just called") - } - callInfo := struct { - }{} - mock.lockShutdown.Lock() - mock.calls.Shutdown = append(mock.calls.Shutdown, callInfo) - mock.lockShutdown.Unlock() - mock.ShutdownFunc() -} - -// ShutdownCalls gets all the calls that were made to Shutdown. -// Check the length with: -// -// len(mockedPeer.ShutdownCalls()) -func (mock *PeerMock) ShutdownCalls() []struct { -} { - var calls []struct { - } - mock.lockShutdown.RLock() - calls = mock.calls.Shutdown - mock.lockShutdown.RUnlock() - return calls -} - -// String calls StringFunc. -func (mock *PeerMock) String() string { - if mock.StringFunc == nil { - panic("PeerMock.StringFunc: method is nil but Peer.String was just called") - } - callInfo := struct { - }{} - mock.lockString.Lock() - mock.calls.String = append(mock.calls.String, callInfo) - mock.lockString.Unlock() - return mock.StringFunc() -} - -// StringCalls gets all the calls that were made to String. -// Check the length with: -// -// len(mockedPeer.StringCalls()) -func (mock *PeerMock) StringCalls() []struct { -} { - var calls []struct { - } - mock.lockString.RLock() - calls = mock.calls.String - mock.lockString.RUnlock() - return calls -} - -// WriteMsg calls WriteMsgFunc. -func (mock *PeerMock) WriteMsg(msg wire.Message) error { - if mock.WriteMsgFunc == nil { - panic("PeerMock.WriteMsgFunc: method is nil but Peer.WriteMsg was just called") - } - callInfo := struct { - Msg wire.Message - }{ - Msg: msg, - } - mock.lockWriteMsg.Lock() - mock.calls.WriteMsg = append(mock.calls.WriteMsg, callInfo) - mock.lockWriteMsg.Unlock() - return mock.WriteMsgFunc(msg) -} - -// WriteMsgCalls gets all the calls that were made to WriteMsg. -// Check the length with: -// -// len(mockedPeer.WriteMsgCalls()) -func (mock *PeerMock) WriteMsgCalls() []struct { - Msg wire.Message -} { - var calls []struct { - Msg wire.Message - } - mock.lockWriteMsg.RLock() - calls = mock.calls.WriteMsg - mock.lockWriteMsg.RUnlock() - return calls -} diff --git a/internal/blocktx/processor.go b/internal/blocktx/processor.go index 3630c086a..10877f7f8 100644 --- a/internal/blocktx/processor.go +++ b/internal/blocktx/processor.go @@ -14,12 +14,13 @@ import ( "time" "github.com/libsv/go-bc" - "github.com/libsv/go-p2p" "github.com/libsv/go-p2p/chaincfg/chainhash" "github.com/libsv/go-p2p/wire" "go.opentelemetry.io/otel/attribute" "golang.org/x/sync/errgroup" + "github.com/bitcoin-sv/arc/internal/blocktx/bcnet" + "github.com/bitcoin-sv/arc/internal/blocktx/bcnet/blocktx_p2p" "github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api" "github.com/bitcoin-sv/arc/internal/blocktx/store" "github.com/bitcoin-sv/arc/pkg/tracing" @@ -52,8 +53,8 @@ const ( type Processor struct { hostname string - blockRequestCh chan BlockRequest - blockProcessCh chan *p2p.BlockMessage + blockRequestCh chan blocktx_p2p.BlockRequest + blockProcessCh chan *bcnet.BlockMessage store store.BlocktxStore logger *slog.Logger transactionStorageBatchSize int @@ -80,8 +81,8 @@ type Processor struct { func NewProcessor( logger *slog.Logger, storeI store.BlocktxStore, - blockRequestCh chan BlockRequest, - blockProcessCh chan *p2p.BlockMessage, + blockRequestCh chan blocktx_p2p.BlockRequest, + blockProcessCh chan *bcnet.BlockMessage, opts ...func(*Processor), ) (*Processor, error) { hostname, err := os.Hostname() @@ -170,7 +171,7 @@ func (p *Processor) StartBlockRequesting() { p.logger.Info("Sending block request", slog.String("hash", hash.String())) msg := wire.NewMsgGetDataSizeHint(1) _ = msg.AddInvVect(wire.NewInvVect(wire.InvTypeBlock, hash)) // ignore error at this point - _ = peer.WriteMsg(msg) + peer.WriteMsg(msg) p.logger.Info("Block request message sent to peer", slog.String("hash", hash.String()), slog.String("peer", peer.String())) } @@ -192,7 +193,7 @@ func (p *Processor) StartBlockProcessing() { var err error timeStart := time.Now() - hash := blockMsg.Header.BlockHash() + hash := blockMsg.Hash p.logger.Info("received block", slog.String("hash", hash.String())) @@ -202,7 +203,7 @@ func (p *Processor) StartBlockProcessing() { continue } - storeErr := p.store.MarkBlockAsDone(p.ctx, &hash, blockMsg.Size, uint64(len(blockMsg.TransactionHashes))) + storeErr := p.store.MarkBlockAsDone(p.ctx, hash, blockMsg.Size, uint64(len(blockMsg.TransactionHashes))) if storeErr != nil { p.logger.Error("unable to mark block as processed", slog.String("hash", hash.String()), slog.String("err", storeErr.Error())) continue @@ -307,11 +308,11 @@ func (p *Processor) registerTransactions(txHashes [][]byte) { } } -func (p *Processor) processBlock(blockMsg *p2p.BlockMessage) (err error) { +func (p *Processor) processBlock(blockMsg *bcnet.BlockMessage) (err error) { ctx := p.ctx var block *blocktx_api.Block - blockHash := blockMsg.Header.BlockHash() + blockHash := blockMsg.Hash ctx, span := tracing.StartTracing(ctx, "processBlock", p.tracingEnabled, p.tracingAttributes...) defer func() { @@ -328,7 +329,7 @@ func (p *Processor) processBlock(blockMsg *p2p.BlockMessage) (err error) { p.logger.Info("processing incoming block", slog.String("hash", blockHash.String()), slog.Uint64("height", blockMsg.Height)) // check if we've already processed that block - existingBlock, _ := p.store.GetBlock(ctx, &blockHash) + existingBlock, _ := p.store.GetBlock(ctx, blockHash) if existingBlock != nil { p.logger.Warn("ignoring already existing block", slog.String("hash", blockHash.String()), slog.Uint64("height", blockMsg.Height)) @@ -371,13 +372,13 @@ func (p *Processor) processBlock(blockMsg *p2p.BlockMessage) (err error) { return nil } -func (p *Processor) verifyAndInsertBlock(ctx context.Context, blockMsg *p2p.BlockMessage) (incomingBlock *blocktx_api.Block, err error) { +func (p *Processor) verifyAndInsertBlock(ctx context.Context, blockMsg *bcnet.BlockMessage) (incomingBlock *blocktx_api.Block, err error) { ctx, span := tracing.StartTracing(ctx, "verifyAndInsertBlock", p.tracingEnabled, p.tracingAttributes...) defer func() { tracing.EndTracing(span, err) }() - blockHash := blockMsg.Header.BlockHash() + blockHash := blockMsg.Hash previousBlockHash := blockMsg.Header.PrevBlock merkleRoot := blockMsg.Header.MerkleRoot diff --git a/internal/blocktx/processor_test.go b/internal/blocktx/processor_test.go index 5f81465c9..dbef6fde6 100644 --- a/internal/blocktx/processor_test.go +++ b/internal/blocktx/processor_test.go @@ -9,17 +9,19 @@ import ( "testing" "time" - "github.com/libsv/go-p2p" "github.com/libsv/go-p2p/chaincfg/chainhash" "github.com/libsv/go-p2p/wire" "github.com/stretchr/testify/require" "google.golang.org/protobuf/reflect/protoreflect" "github.com/bitcoin-sv/arc/internal/blocktx" + "github.com/bitcoin-sv/arc/internal/blocktx/bcnet" + "github.com/bitcoin-sv/arc/internal/blocktx/bcnet/blocktx_p2p" "github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api" "github.com/bitcoin-sv/arc/internal/blocktx/mocks" "github.com/bitcoin-sv/arc/internal/blocktx/store" storeMocks "github.com/bitcoin-sv/arc/internal/blocktx/store/mocks" + p2p_mocks "github.com/bitcoin-sv/arc/internal/p2p/mocks" "github.com/bitcoin-sv/arc/internal/testdata" ) @@ -198,14 +200,15 @@ func TestHandleBlock(t *testing.T) { } logger := slog.Default() - blockProcessCh := make(chan *p2p.BlockMessage, 1) - p2pMsgHandler := blocktx.NewPeerHandler(logger, nil, blockProcessCh) + blockProcessCh := make(chan *bcnet.BlockMessage, 1) + p2pMsgHandler := blocktx_p2p.NewMsgHandler(logger, nil, blockProcessCh) sut, err := blocktx.NewProcessor(logger, storeMock, nil, blockProcessCh, blocktx.WithTransactionBatchSize(batchSize), blocktx.WithMessageQueueClient(mq)) require.NoError(t, err) - blockMessage := &p2p.BlockMessage{ + blockMessage := &bcnet.BlockMessage{ // Hash: testdata.Block1Hash, + Hash: testdata.Block1Hash, Header: &wire.BlockHeader{ Version: 541065216, PrevBlock: tc.prevBlockHash, @@ -222,8 +225,7 @@ func TestHandleBlock(t *testing.T) { sut.StartBlockProcessing() // simulate receiving block from node - err = p2pMsgHandler.HandleBlock(blockMessage, &mocks.PeerMock{StringFunc: func() string { return "peer" }}) - require.NoError(t, err) + p2pMsgHandler.OnReceive(blockMessage, &p2p_mocks.PeerIMock{StringFunc: func() string { return "peer" }}) var actualInsertedBlockTransactions []string time.Sleep(20 * time.Millisecond) @@ -439,8 +441,8 @@ func TestHandleBlockReorgAndOrphans(t *testing.T) { // build peer manager and processor logger := slog.Default() - blockProcessCh := make(chan *p2p.BlockMessage, 10) - p2pMsgHandler := blocktx.NewPeerHandler(logger, nil, blockProcessCh) + blockProcessCh := make(chan *bcnet.BlockMessage, 10) + p2pMsgHandler := blocktx_p2p.NewMsgHandler(logger, nil, blockProcessCh) sut, err := blocktx.NewProcessor(logger, storeMock, nil, blockProcessCh) require.NoError(t, err) @@ -450,8 +452,8 @@ func TestHandleBlockReorgAndOrphans(t *testing.T) { merkleRoot, err := chainhash.NewHashFromStr("be181e91217d5f802f695e52144078f8dfbe51b8a815c3d6fb48c0d853ec683b") require.NoError(t, err) - blockMessage := &p2p.BlockMessage{ - // Hash: testdata.Block1Hash, + blockMessage := &bcnet.BlockMessage{ + Hash: testdata.Block1Hash, Header: &wire.BlockHeader{ Version: 541065216, MerkleRoot: *merkleRoot, @@ -465,8 +467,7 @@ func TestHandleBlockReorgAndOrphans(t *testing.T) { sut.StartBlockProcessing() // simulate receiving block from node - err = p2pMsgHandler.HandleBlock(blockMessage, nil) - require.NoError(t, err) + p2pMsgHandler.OnReceive(blockMessage, nil) // then time.Sleep(20 * time.Millisecond) @@ -596,18 +597,18 @@ func TestStartBlockRequesting(t *testing.T) { }, } - peerMock := &mocks.PeerMock{ - WriteMsgFunc: func(_ wire.Message) error { return nil }, + peerMock := &p2p_mocks.PeerIMock{ + WriteMsgFunc: func(_ wire.Message) {}, StringFunc: func() string { return "peer" }, } // build peer manager logger := slog.Default() - blockRequestCh := make(chan blocktx.BlockRequest, 10) - blockProcessCh := make(chan *p2p.BlockMessage, 10) + blockRequestCh := make(chan blocktx_p2p.BlockRequest, 10) + blockProcessCh := make(chan *bcnet.BlockMessage, 10) - peerHandler := blocktx.NewPeerHandler(logger, blockRequestCh, blockProcessCh) + peerHandler := blocktx_p2p.NewMsgHandler(logger, blockRequestCh, blockProcessCh) sut, err := blocktx.NewProcessor(logger, storeMock, blockRequestCh, blockProcessCh) require.NoError(t, err) @@ -619,8 +620,7 @@ func TestStartBlockRequesting(t *testing.T) { invMsg := wire.NewMsgInvSizeHint(1) err = invMsg.AddInvVect(wire.NewInvVect(wire.InvTypeBlock, blockHash)) require.NoError(t, err) - err = peerHandler.HandleBlockAnnouncement(invMsg.InvList[0], peerMock) - require.NoError(t, err) + peerHandler.OnReceive(invMsg, peerMock) time.Sleep(200 * time.Millisecond) diff --git a/internal/blocktx/server.go b/internal/blocktx/server.go index 6a5b99193..9e58a26db 100644 --- a/internal/blocktx/server.go +++ b/internal/blocktx/server.go @@ -9,12 +9,11 @@ import ( "google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/timestamppb" - "github.com/libsv/go-p2p" - "github.com/bitcoin-sv/arc/config" "github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api" "github.com/bitcoin-sv/arc/internal/blocktx/store" "github.com/bitcoin-sv/arc/internal/grpc_opts" + "github.com/bitcoin-sv/arc/internal/p2p" ) // Server type carries the logger within it. @@ -23,14 +22,14 @@ type Server struct { grpc_opts.GrpcServer logger *slog.Logger - pm p2p.PeerManagerI + pm *p2p.PeerManager store store.BlocktxStore maxAllowedBlockHeightMismatch int } // NewServer will return a server instance with the logger stored within it. func NewServer(prometheusEndpoint string, maxMsgSize int, logger *slog.Logger, - store store.BlocktxStore, pm p2p.PeerManagerI, maxAllowedBlockHeightMismatch int, tracingConfig *config.TracingConfig) (*Server, error) { + store store.BlocktxStore, pm *p2p.PeerManager, maxAllowedBlockHeightMismatch int, tracingConfig *config.TracingConfig) (*Server, error) { logger = logger.With(slog.String("module", "server")) grpcServer, err := grpc_opts.NewGrpcServer(logger, "blocktx", prometheusEndpoint, maxMsgSize, tracingConfig) diff --git a/internal/blocktx/server_test.go b/internal/blocktx/server_test.go index 9c479f20e..7010b7435 100644 --- a/internal/blocktx/server_test.go +++ b/internal/blocktx/server_test.go @@ -6,11 +6,11 @@ import ( "testing" "time" - "github.com/libsv/go-p2p" "github.com/stretchr/testify/require" "github.com/bitcoin-sv/arc/internal/blocktx" storeMocks "github.com/bitcoin-sv/arc/internal/blocktx/store/mocks" + "github.com/bitcoin-sv/arc/internal/p2p" ) func TestListenAndServe(t *testing.T) { diff --git a/internal/p2p/tests/wire_reader_test.go b/internal/p2p/tests/wire_reader_test.go new file mode 100644 index 000000000..8b397a1f5 --- /dev/null +++ b/internal/p2p/tests/wire_reader_test.go @@ -0,0 +1,119 @@ +package p2p_tests + +import ( + "bytes" + "context" + "io" + "testing" + "time" + + "github.com/bitcoin-sv/arc/internal/p2p" + "github.com/libsv/go-p2p/wire" + "github.com/stretchr/testify/require" +) + +func TestWireReader_ReadNextMsg(t *testing.T) { + t.Run("Success", func(t *testing.T) { + // given + expectedMsg := wire.NewMsgGetBlocks(blockHash) + + var buff bytes.Buffer + err := wire.WriteMessage(&buff, expectedMsg, wire.ProtocolVersion, bitcoinNet) + require.NoError(t, err) + + sut := p2p.NewWireReader(&buff, 4096) + + // when + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + res, err := sut.ReadNextMsg(ctx, wire.ProtocolVersion, bitcoinNet) + + // then + require.NoError(t, err) + require.Equal(t, expectedMsg, res) + }) + + t.Run("Unknown msg", func(t *testing.T) { + // given + unknownMsg := unknownMsg{} + + expectedMsg := wire.NewMsgGetBlocks(blockHash) + + var buff bytes.Buffer + // first write unknown msg + err := wire.WriteMessage(&buff, &unknownMsg, wire.ProtocolVersion, bitcoinNet) + require.NoError(t, err) + + // next write regular msg + err = wire.WriteMessage(&buff, expectedMsg, wire.ProtocolVersion, bitcoinNet) + require.NoError(t, err) + + sut := p2p.NewWireReader(&buff, 4096) + + // when + ctx, cancel := context.WithTimeout(context.Background(), 35*time.Second) + defer cancel() + + res, err := sut.ReadNextMsg(ctx, wire.ProtocolVersion, bitcoinNet) + + // then + require.NoError(t, err) + require.Equal(t, expectedMsg, res) + }) + + t.Run("Context cancelled", func(t *testing.T) { + // given + expectedMsg := wire.NewMsgGetBlocks(blockHash) + + var buff bytes.Buffer + err := wire.WriteMessage(&buff, expectedMsg, wire.ProtocolVersion, bitcoinNet) + require.NoError(t, err) + + sut := p2p.NewWireReader(&buff, 4096) + + // when + ctx, cancel := context.WithCancel(context.Background()) + cancel() // Cancel the context immediately + + res, err := sut.ReadNextMsg(ctx, wire.ProtocolVersion, bitcoinNet) + + // then + require.ErrorIs(t, err, context.Canceled) + require.Nil(t, res) + }) + + t.Run("Read error", func(t *testing.T) { + var buff bytes.Buffer + sut := p2p.NewWireReader(&buff, 4096) + + // when + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + res, err := sut.ReadNextMsg(ctx, wire.ProtocolVersion, bitcoinNet) + + // then + require.Error(t, err) + require.Nil(t, res) + }) +} + +type unknownMsg struct { +} + +func (m *unknownMsg) Bsvdecode(_ io.Reader, _ uint32, _ wire.MessageEncoding) error { + return nil +} + +func (m *unknownMsg) BsvEncode(_ io.Writer, _ uint32, _ wire.MessageEncoding) error { + return nil +} + +func (m *unknownMsg) Command() string { + return "test-cmd" +} + +func (m *unknownMsg) MaxPayloadLength(_ uint32) uint64 { + return 0 +}