diff --git a/examples/simple/handler.go b/examples/simple/handler.go index 0991738..b98c198 100644 --- a/examples/simple/handler.go +++ b/examples/simple/handler.go @@ -2,7 +2,6 @@ package main import ( "fmt" - "log/slog" "github.com/libsv/go-p2p" @@ -15,10 +14,10 @@ type SimplePeerHandler struct { logger *slog.Logger } -func (s *SimplePeerHandler) HandleTransactionGet(msg *wire.InvVect, peer p2p.PeerI) ([]byte, error) { - s.logger.Info("Peer requested transaction", slog.String("hash", msg.Hash.String()), slog.String("peer", peer.String())) - // You should implement a store and return the transaction bytes here. - return nil, fmt.Errorf("transaction not found") +func (s *SimplePeerHandler) HandleTransactionsGet(msgs []*wire.InvVect, peer p2p.PeerI) ([][]byte, error) { + s.logger.Info("Peer requested transactions", slog.Int("count", len(msgs)), slog.String("peer", peer.String())) + // You should implement a store and return the transactions bytes here. + return nil, fmt.Errorf("transactions not found") } func (s *SimplePeerHandler) HandleTransactionSent(msg *wire.MsgTx, peer p2p.PeerI) error { diff --git a/interface.go b/interface.go index 4a66222..4221807 100644 --- a/interface.go +++ b/interface.go @@ -7,9 +7,7 @@ import ( "github.com/libsv/go-p2p/wire" ) -var ( - ErrPeerNetworkMismatch = fmt.Errorf("peer network mismatch") -) +var ErrPeerNetworkMismatch = fmt.Errorf("peer network mismatch") type PeerManagerI interface { AnnounceTransaction(txHash *chainhash.Hash, peers []PeerI) []PeerI @@ -36,7 +34,7 @@ type PeerI interface { } type PeerHandlerI interface { - HandleTransactionGet(msg *wire.InvVect, peer PeerI) ([]byte, error) + HandleTransactionsGet(msgs []*wire.InvVect, peer PeerI) ([][]byte, error) HandleTransactionSent(msg *wire.MsgTx, peer PeerI) error HandleTransactionAnnouncement(msg *wire.InvVect, peer PeerI) error HandleTransactionRejection(rejMsg *wire.MsgReject, peer PeerI) error diff --git a/peer.go b/peer.go index 7dbcf85..ef198fe 100644 --- a/peer.go +++ b/peer.go @@ -550,36 +550,43 @@ func (p *Peer) startReadHandler(ctx context.Context) { } func (p *Peer) handleGetDataMsg(dataMsg *wire.MsgGetData, logger *slog.Logger) { + txRequests := make([]*wire.InvVect, 0) + for _, invVect := range dataMsg.InvList { switch invVect.Type { case wire.InvTypeTx: logger.Debug("Request for TX", slog.String(hashKey, invVect.Hash.String())) - - txBytes, err := p.peerHandler.HandleTransactionGet(invVect, p) - if err != nil { - logger.Warn("Unable to fetch tx from store", slog.String(hashKey, invVect.Hash.String()), slog.String(typeKey, invVect.Type.String()), slog.String(errKey, err.Error())) - continue - } - - if txBytes == nil { - logger.Warn("tx does not exist", slog.String(hashKey, invVect.Hash.String()), slog.String(typeKey, invVect.Type.String())) - continue - } - - tx, err := bsvutil.NewTxFromBytes(txBytes) - if err != nil { - logger.Error("failed to parse tx", slog.String(hashKey, invVect.Hash.String()), slog.String(typeKey, invVect.Type.String()), slog.String("rawHex", hex.EncodeToString(txBytes)), slog.String(errKey, err.Error())) - continue - } - - p.writeChan <- tx.MsgTx() + txRequests = append(txRequests, invVect) case wire.InvTypeBlock: logger.Info("Request for block", slog.String(hashKey, invVect.Hash.String()), slog.String(typeKey, invVect.Type.String())) + continue default: logger.Warn("Unknown type", slog.String(hashKey, invVect.Hash.String()), slog.String(typeKey, invVect.Type.String())) + continue + } + } + + rawTxs, err := p.peerHandler.HandleTransactionsGet(txRequests, p) + if err != nil { + logger.Warn("Unable to fetch txs from store", slog.Int("count", len(txRequests)), slog.String(errKey, err.Error())) + return + } + + for _, txBytes := range rawTxs { + if txBytes == nil { + logger.Warn("tx does not exist") + continue + } + + tx, err := bsvutil.NewTxFromBytes(txBytes) + if err != nil { + logger.Error("failed to parse tx", slog.String("rawHex", hex.EncodeToString(txBytes)), slog.String(errKey, err.Error())) + continue } + + p.writeChan <- tx.MsgTx() } } diff --git a/peer_handler_gen_mock.go b/peer_handler_gen_mock.go index d4db662..1eaa06b 100644 --- a/peer_handler_gen_mock.go +++ b/peer_handler_gen_mock.go @@ -30,15 +30,15 @@ var _ PeerHandlerI = &PeerHandlerIMock{} // HandleTransactionAnnouncementFunc: func(msg *wire.InvVect, peer PeerI) error { // panic("mock out the HandleTransactionAnnouncement method") // }, -// HandleTransactionGetFunc: func(msg *wire.InvVect, peer PeerI) ([]byte, error) { -// panic("mock out the HandleTransactionGet method") -// }, // HandleTransactionRejectionFunc: func(rejMsg *wire.MsgReject, peer PeerI) error { // panic("mock out the HandleTransactionRejection method") // }, // HandleTransactionSentFunc: func(msg *wire.MsgTx, peer PeerI) error { // panic("mock out the HandleTransactionSent method") // }, +// HandleTransactionsGetFunc: func(msgs []*wire.InvVect, peer PeerI) ([][]byte, error) { +// panic("mock out the HandleTransactionsGet method") +// }, // } // // // use mockedPeerHandlerI in code that requires PeerHandlerI @@ -58,15 +58,15 @@ type PeerHandlerIMock struct { // HandleTransactionAnnouncementFunc mocks the HandleTransactionAnnouncement method. HandleTransactionAnnouncementFunc func(msg *wire.InvVect, peer PeerI) error - // HandleTransactionGetFunc mocks the HandleTransactionGet method. - HandleTransactionGetFunc func(msg *wire.InvVect, peer PeerI) ([]byte, error) - // HandleTransactionRejectionFunc mocks the HandleTransactionRejection method. HandleTransactionRejectionFunc func(rejMsg *wire.MsgReject, peer PeerI) error // HandleTransactionSentFunc mocks the HandleTransactionSent method. HandleTransactionSentFunc func(msg *wire.MsgTx, peer PeerI) error + // HandleTransactionsGetFunc mocks the HandleTransactionsGet method. + HandleTransactionsGetFunc func(msgs []*wire.InvVect, peer PeerI) ([][]byte, error) + // calls tracks calls to the methods. calls struct { // HandleBlock holds details about calls to the HandleBlock method. @@ -97,13 +97,6 @@ type PeerHandlerIMock struct { // Peer is the peer argument value. Peer PeerI } - // HandleTransactionGet holds details about calls to the HandleTransactionGet method. - HandleTransactionGet []struct { - // Msg is the msg argument value. - Msg *wire.InvVect - // Peer is the peer argument value. - Peer PeerI - } // HandleTransactionRejection holds details about calls to the HandleTransactionRejection method. HandleTransactionRejection []struct { // RejMsg is the rejMsg argument value. @@ -118,14 +111,21 @@ type PeerHandlerIMock struct { // Peer is the peer argument value. Peer PeerI } + // HandleTransactionsGet holds details about calls to the HandleTransactionsGet method. + HandleTransactionsGet []struct { + // Msgs is the msgs argument value. + Msgs []*wire.InvVect + // Peer is the peer argument value. + Peer PeerI + } } lockHandleBlock sync.RWMutex lockHandleBlockAnnouncement sync.RWMutex lockHandleTransaction sync.RWMutex lockHandleTransactionAnnouncement sync.RWMutex - lockHandleTransactionGet sync.RWMutex lockHandleTransactionRejection sync.RWMutex lockHandleTransactionSent sync.RWMutex + lockHandleTransactionsGet sync.RWMutex } // HandleBlock calls HandleBlockFunc. @@ -272,42 +272,6 @@ func (mock *PeerHandlerIMock) HandleTransactionAnnouncementCalls() []struct { return calls } -// HandleTransactionGet calls HandleTransactionGetFunc. -func (mock *PeerHandlerIMock) HandleTransactionGet(msg *wire.InvVect, peer PeerI) ([]byte, error) { - if mock.HandleTransactionGetFunc == nil { - panic("PeerHandlerIMock.HandleTransactionGetFunc: method is nil but PeerHandlerI.HandleTransactionGet was just called") - } - callInfo := struct { - Msg *wire.InvVect - Peer PeerI - }{ - Msg: msg, - Peer: peer, - } - mock.lockHandleTransactionGet.Lock() - mock.calls.HandleTransactionGet = append(mock.calls.HandleTransactionGet, callInfo) - mock.lockHandleTransactionGet.Unlock() - return mock.HandleTransactionGetFunc(msg, peer) -} - -// HandleTransactionGetCalls gets all the calls that were made to HandleTransactionGet. -// Check the length with: -// -// len(mockedPeerHandlerI.HandleTransactionGetCalls()) -func (mock *PeerHandlerIMock) HandleTransactionGetCalls() []struct { - Msg *wire.InvVect - Peer PeerI -} { - var calls []struct { - Msg *wire.InvVect - Peer PeerI - } - mock.lockHandleTransactionGet.RLock() - calls = mock.calls.HandleTransactionGet - mock.lockHandleTransactionGet.RUnlock() - return calls -} - // HandleTransactionRejection calls HandleTransactionRejectionFunc. func (mock *PeerHandlerIMock) HandleTransactionRejection(rejMsg *wire.MsgReject, peer PeerI) error { if mock.HandleTransactionRejectionFunc == nil { @@ -379,3 +343,39 @@ func (mock *PeerHandlerIMock) HandleTransactionSentCalls() []struct { mock.lockHandleTransactionSent.RUnlock() return calls } + +// HandleTransactionsGet calls HandleTransactionsGetFunc. +func (mock *PeerHandlerIMock) HandleTransactionsGet(msgs []*wire.InvVect, peer PeerI) ([][]byte, error) { + if mock.HandleTransactionsGetFunc == nil { + panic("PeerHandlerIMock.HandleTransactionsGetFunc: method is nil but PeerHandlerI.HandleTransactionsGet was just called") + } + callInfo := struct { + Msgs []*wire.InvVect + Peer PeerI + }{ + Msgs: msgs, + Peer: peer, + } + mock.lockHandleTransactionsGet.Lock() + mock.calls.HandleTransactionsGet = append(mock.calls.HandleTransactionsGet, callInfo) + mock.lockHandleTransactionsGet.Unlock() + return mock.HandleTransactionsGetFunc(msgs, peer) +} + +// HandleTransactionsGetCalls gets all the calls that were made to HandleTransactionsGet. +// Check the length with: +// +// len(mockedPeerHandlerI.HandleTransactionsGetCalls()) +func (mock *PeerHandlerIMock) HandleTransactionsGetCalls() []struct { + Msgs []*wire.InvVect + Peer PeerI +} { + var calls []struct { + Msgs []*wire.InvVect + Peer PeerI + } + mock.lockHandleTransactionsGet.RLock() + calls = mock.calls.HandleTransactionsGet + mock.lockHandleTransactionsGet.RUnlock() + return calls +} diff --git a/peer_handler_mock.go b/peer_handler_mock.go index fa8ef3a..014a650 100644 --- a/peer_handler_mock.go +++ b/peer_handler_mock.go @@ -27,17 +27,23 @@ func NewMockPeerHandler() *MockPeerHandler { } } -func (m *MockPeerHandler) HandleTransactionGet(msg *wire.InvVect, _ PeerI) ([]byte, error) { +func (m *MockPeerHandler) HandleTransactionsGet(msgs []*wire.InvVect, _ PeerI) ([][]byte, error) { m.mu.Lock() defer m.mu.Unlock() - m.transactionGet = append(m.transactionGet, *msg) + rawTxs := make([][]byte, 0) - bytes, ok := m.transactionGetBytes[msg.Hash.String()] - if !ok { - return nil, fmt.Errorf("no bytes for transaction %s", msg.Hash.String()) + for _, msg := range msgs { + m.transactionGet = append(m.transactionGet, *msg) + bytes, ok := m.transactionGetBytes[msg.Hash.String()] + if !ok { + return nil, fmt.Errorf("no bytes for transaction %s", msg.Hash.String()) + } + + rawTxs = append(rawTxs, bytes) } - return bytes, nil + + return rawTxs, nil } func (m *MockPeerHandler) GetTransactionGet() []wire.InvVect { diff --git a/test/peer_integration_test.go b/test/peer_integration_test.go index b2fac44..d443af3 100644 --- a/test/peer_integration_test.go +++ b/test/peer_integration_test.go @@ -2,13 +2,14 @@ package test import ( "fmt" - "github.com/libsv/go-p2p" "log" "log/slog" "os" "testing" "time" + "github.com/libsv/go-p2p" + "github.com/libsv/go-p2p/wire" "github.com/ory/dockertest/v3" "github.com/ory/dockertest/v3/docker" @@ -19,7 +20,7 @@ const ( p2pPortBinding = "18335" ) -//go:generate moq -out ./peer_handler_gen_mock.go . PeerHandlerI +//go:generate moq -out ../peer_handler_gen_mock.go ../ PeerHandlerI var ( pool *dockertest.Pool @@ -148,15 +149,14 @@ func TestNewPeer(t *testing.T) { }) t.Run("announce transaction", func(t *testing.T) { - logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) pm := p2p.NewPeerManager(logger, wire.TestNet) require.NotNil(t, pm) peerHandler := &p2p.PeerHandlerIMock{ - HandleTransactionGetFunc: func(msg *wire.InvVect, peer p2p.PeerI) ([]byte, error) { - return TX1RawBytes, nil + HandleTransactionsGetFunc: func(msgs []*wire.InvVect, peer p2p.PeerI) ([][]byte, error) { + return [][]byte{TX1RawBytes}, nil }, }