Skip to content

Commit

Permalink
Merge pull request #25 from kuba-4chain/feat/get-txs-batching
Browse files Browse the repository at this point in the history
feat!: improve performance of HandleTransactionGet, by requesting all transactions from InvMsg at once
  • Loading branch information
boecklim authored Jul 5, 2024
2 parents f57e79b + 684dd18 commit 5d6f580
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 89 deletions.
9 changes: 4 additions & 5 deletions examples/simple/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"fmt"

"log/slog"

"github.com/libsv/go-p2p"
Expand All @@ -15,10 +14,10 @@ type SimplePeerHandler struct {
logger *slog.Logger
}

func (s *SimplePeerHandler) HandleTransactionGet(msg *wire.InvVect, peer p2p.PeerI) ([]byte, error) {
s.logger.Info("Peer requested transaction", slog.String("hash", msg.Hash.String()), slog.String("peer", peer.String()))
// You should implement a store and return the transaction bytes here.
return nil, fmt.Errorf("transaction not found")
func (s *SimplePeerHandler) HandleTransactionsGet(msgs []*wire.InvVect, peer p2p.PeerI) ([][]byte, error) {
s.logger.Info("Peer requested transactions", slog.Int("count", len(msgs)), slog.String("peer", peer.String()))
// You should implement a store and return the transactions bytes here.
return nil, fmt.Errorf("transactions not found")
}

func (s *SimplePeerHandler) HandleTransactionSent(msg *wire.MsgTx, peer p2p.PeerI) error {
Expand Down
6 changes: 2 additions & 4 deletions interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ import (
"github.com/libsv/go-p2p/wire"
)

var (
ErrPeerNetworkMismatch = fmt.Errorf("peer network mismatch")
)
var ErrPeerNetworkMismatch = fmt.Errorf("peer network mismatch")

type PeerManagerI interface {
AnnounceTransaction(txHash *chainhash.Hash, peers []PeerI) []PeerI
Expand All @@ -36,7 +34,7 @@ type PeerI interface {
}

type PeerHandlerI interface {
HandleTransactionGet(msg *wire.InvVect, peer PeerI) ([]byte, error)
HandleTransactionsGet(msgs []*wire.InvVect, peer PeerI) ([][]byte, error)
HandleTransactionSent(msg *wire.MsgTx, peer PeerI) error
HandleTransactionAnnouncement(msg *wire.InvVect, peer PeerI) error
HandleTransactionRejection(rejMsg *wire.MsgReject, peer PeerI) error
Expand Down
45 changes: 26 additions & 19 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,36 +550,43 @@ func (p *Peer) startReadHandler(ctx context.Context) {
}

func (p *Peer) handleGetDataMsg(dataMsg *wire.MsgGetData, logger *slog.Logger) {
txRequests := make([]*wire.InvVect, 0)

for _, invVect := range dataMsg.InvList {
switch invVect.Type {
case wire.InvTypeTx:
logger.Debug("Request for TX", slog.String(hashKey, invVect.Hash.String()))

txBytes, err := p.peerHandler.HandleTransactionGet(invVect, p)
if err != nil {
logger.Warn("Unable to fetch tx from store", slog.String(hashKey, invVect.Hash.String()), slog.String(typeKey, invVect.Type.String()), slog.String(errKey, err.Error()))
continue
}

if txBytes == nil {
logger.Warn("tx does not exist", slog.String(hashKey, invVect.Hash.String()), slog.String(typeKey, invVect.Type.String()))
continue
}

tx, err := bsvutil.NewTxFromBytes(txBytes)
if err != nil {
logger.Error("failed to parse tx", slog.String(hashKey, invVect.Hash.String()), slog.String(typeKey, invVect.Type.String()), slog.String("rawHex", hex.EncodeToString(txBytes)), slog.String(errKey, err.Error()))
continue
}

p.writeChan <- tx.MsgTx()
txRequests = append(txRequests, invVect)

case wire.InvTypeBlock:
logger.Info("Request for block", slog.String(hashKey, invVect.Hash.String()), slog.String(typeKey, invVect.Type.String()))
continue

default:
logger.Warn("Unknown type", slog.String(hashKey, invVect.Hash.String()), slog.String(typeKey, invVect.Type.String()))
continue
}
}

rawTxs, err := p.peerHandler.HandleTransactionsGet(txRequests, p)
if err != nil {
logger.Warn("Unable to fetch txs from store", slog.Int("count", len(txRequests)), slog.String(errKey, err.Error()))
return
}

for _, txBytes := range rawTxs {
if txBytes == nil {
logger.Warn("tx does not exist")
continue
}

tx, err := bsvutil.NewTxFromBytes(txBytes)
if err != nil {
logger.Error("failed to parse tx", slog.String("rawHex", hex.EncodeToString(txBytes)), slog.String(errKey, err.Error()))
continue
}

p.writeChan <- tx.MsgTx()
}
}

Expand Down
100 changes: 50 additions & 50 deletions peer_handler_gen_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 12 additions & 6 deletions peer_handler_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,23 @@ func NewMockPeerHandler() *MockPeerHandler {
}
}

func (m *MockPeerHandler) HandleTransactionGet(msg *wire.InvVect, _ PeerI) ([]byte, error) {
func (m *MockPeerHandler) HandleTransactionsGet(msgs []*wire.InvVect, _ PeerI) ([][]byte, error) {
m.mu.Lock()
defer m.mu.Unlock()

m.transactionGet = append(m.transactionGet, *msg)
rawTxs := make([][]byte, 0)

bytes, ok := m.transactionGetBytes[msg.Hash.String()]
if !ok {
return nil, fmt.Errorf("no bytes for transaction %s", msg.Hash.String())
for _, msg := range msgs {
m.transactionGet = append(m.transactionGet, *msg)
bytes, ok := m.transactionGetBytes[msg.Hash.String()]
if !ok {
return nil, fmt.Errorf("no bytes for transaction %s", msg.Hash.String())
}

rawTxs = append(rawTxs, bytes)
}
return bytes, nil

return rawTxs, nil
}

func (m *MockPeerHandler) GetTransactionGet() []wire.InvVect {
Expand Down
10 changes: 5 additions & 5 deletions test/peer_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package test

import (
"fmt"
"github.com/libsv/go-p2p"
"log"
"log/slog"
"os"
"testing"
"time"

"github.com/libsv/go-p2p"

"github.com/libsv/go-p2p/wire"
"github.com/ory/dockertest/v3"
"github.com/ory/dockertest/v3/docker"
Expand All @@ -19,7 +20,7 @@ const (
p2pPortBinding = "18335"
)

//go:generate moq -out ./peer_handler_gen_mock.go . PeerHandlerI
//go:generate moq -out ../peer_handler_gen_mock.go ../ PeerHandlerI

var (
pool *dockertest.Pool
Expand Down Expand Up @@ -148,15 +149,14 @@ func TestNewPeer(t *testing.T) {
})

t.Run("announce transaction", func(t *testing.T) {

logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))

pm := p2p.NewPeerManager(logger, wire.TestNet)
require.NotNil(t, pm)

peerHandler := &p2p.PeerHandlerIMock{
HandleTransactionGetFunc: func(msg *wire.InvVect, peer p2p.PeerI) ([]byte, error) {
return TX1RawBytes, nil
HandleTransactionsGetFunc: func(msgs []*wire.InvVect, peer p2p.PeerI) ([][]byte, error) {
return [][]byte{TX1RawBytes}, nil
},
}

Expand Down

0 comments on commit 5d6f580

Please sign in to comment.