Skip to content

Commit

Permalink
feat(ARCO-148): get txs data for peers in bulk (#499)
Browse files Browse the repository at this point in the history
* feat(ARCO-148): get txs data for peers in bulk

* feat(ARCO-148): refactor and perf-improve of HandleTransactionsGet method

* feat: improve psql method description

* feat(ARCO-148): add defer rows.Close() to postgres functions

* fix(postgres): fix sonar issue for unchecked error

* feat: udpate go-p2p version
  • Loading branch information
kuba-4chain authored Jul 16, 2024
1 parent 13a4276 commit 2187ff6
Show file tree
Hide file tree
Showing 10 changed files with 189 additions and 57 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
github.com/libsv/go-bc v0.1.29
github.com/libsv/go-bk v0.1.6
github.com/libsv/go-bt/v2 v2.2.5
github.com/libsv/go-p2p v0.2.6
github.com/libsv/go-p2p v0.3.0
github.com/lmittmann/tint v1.0.3
github.com/mitchellh/mapstructure v1.5.0
github.com/nats-io/nats.go v1.31.0
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,10 @@ github.com/libsv/go-p2p v0.2.5 h1:Kg7WQphZDZycNtihMOUr49qNX1oieXxLDeirSqklsnM=
github.com/libsv/go-p2p v0.2.5/go.mod h1:TENFxbTT/bfSfuiirjU6l+PfAWxwZgF8GYUxs5tzc/M=
github.com/libsv/go-p2p v0.2.6 h1:lquJf+Wrpk54ttD9mrX7gdHEqAfJq1u+HOv96+dmYU8=
github.com/libsv/go-p2p v0.2.6/go.mod h1:TENFxbTT/bfSfuiirjU6l+PfAWxwZgF8GYUxs5tzc/M=
github.com/libsv/go-p2p v0.2.7 h1:RSYPJzhJL3zURY0qwOhfXKqcR0Lb8rRna8GbTFuJaJg=
github.com/libsv/go-p2p v0.2.7/go.mod h1:TENFxbTT/bfSfuiirjU6l+PfAWxwZgF8GYUxs5tzc/M=
github.com/libsv/go-p2p v0.3.0 h1:/jtNveHFNbCaC7+FQzvGQb988MaTLupw0bC0yHGANVY=
github.com/libsv/go-p2p v0.3.0/go.mod h1:TENFxbTT/bfSfuiirjU6l+PfAWxwZgF8GYUxs5tzc/M=
github.com/lmittmann/tint v1.0.3 h1:W5PHeA2D8bBJVvabNfQD/XW9HPLZK1XoPZH0cq8NouQ=
github.com/lmittmann/tint v1.0.3/go.mod h1:HIS3gSy7qNwGCj+5oRjAutErFBl4BzdQP6cJZ0NfMwE=
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
Expand Down
13 changes: 1 addition & 12 deletions internal/blocktx/peer_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ func WithTracer() func(handler *PeerHandler) {
}

func NewPeerHandler(logger *slog.Logger, storeI store.BlocktxStore, opts ...func(*PeerHandler)) (*PeerHandler, error) {

hostname, err := os.Hostname()
if err != nil {
return nil, err
Expand Down Expand Up @@ -421,33 +420,27 @@ func (ph *PeerHandler) registerTransactions(txHashes []*blocktx_api.TransactionA
}
}

func (ph *PeerHandler) HandleTransactionGet(_ *wire.InvVect, peer p2p.PeerI) ([]byte, error) {

func (ph *PeerHandler) HandleTransactionsGet(_ []*wire.InvVect, peer p2p.PeerI) ([][]byte, error) {
return nil, nil
}

func (ph *PeerHandler) HandleTransactionSent(_ *wire.MsgTx, peer p2p.PeerI) error {

return nil
}

func (ph *PeerHandler) HandleTransactionAnnouncement(_ *wire.InvVect, peer p2p.PeerI) error {

return nil
}

func (ph *PeerHandler) HandleTransactionRejection(_ *wire.MsgReject, _ p2p.PeerI) error {

return nil
}

func (ph *PeerHandler) HandleTransaction(_ *wire.MsgTx, _ p2p.PeerI) error {

return nil
}

func (ph *PeerHandler) HandleBlockAnnouncement(msg *wire.InvVect, peer p2p.PeerI) error {

pair := hashPeer{
Hash: &msg.Hash,
Peer: peer,
Expand Down Expand Up @@ -546,7 +539,6 @@ const (
)

func (ph *PeerHandler) fillGaps(peer p2p.PeerI) error {

heightRange := ph.dataRetentionDays * hoursPerDay * blocksPerHour

blockHeightGaps, err := ph.store.GetBlockGaps(ph.ctx, heightRange)
Expand Down Expand Up @@ -576,7 +568,6 @@ func (ph *PeerHandler) fillGaps(peer p2p.PeerI) error {
}

func (ph *PeerHandler) insertBlock(ctx context.Context, blockHash *chainhash.Hash, merkleRoot *chainhash.Hash, previousBlockHash *chainhash.Hash, height uint64) (uint64, error) {

ph.logger.Info("Inserting block", slog.String("hash", blockHash.String()), slog.Int64("height", int64(height)))

block := &blocktx_api.Block{
Expand Down Expand Up @@ -736,12 +727,10 @@ func ExtractHeightFromCoinbaseTx(tx *bt.Tx) uint64 {
}

func (ph *PeerHandler) Shutdown() {

if ph.cancelAll != nil {
ph.cancelAll()
}
ph.waitGroup.Wait()

}

// for testing purposes
Expand Down
28 changes: 11 additions & 17 deletions internal/metamorph/peer_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ func NewPeerHandler(s store.MetamorphStore, messageCh chan *PeerTxMessage) *Peer

// HandleTransactionSent is called when a transaction is sent to a peer.
func (m *PeerHandler) HandleTransactionSent(msg *wire.MsgTx, peer p2p.PeerI) error {

hash := msg.TxHash()
m.messageCh <- &PeerTxMessage{
Hash: &hash,
Expand All @@ -46,7 +45,6 @@ func (m *PeerHandler) HandleTransactionSent(msg *wire.MsgTx, peer p2p.PeerI) err

// HandleTransactionAnnouncement is a message sent to the PeerHandler when a transaction INV message is received from a peer.
func (m *PeerHandler) HandleTransactionAnnouncement(msg *wire.InvVect, peer p2p.PeerI) error {

m.messageCh <- &PeerTxMessage{
Hash: &msg.Hash,
Status: metamorph_api.Status_SEEN_ON_NETWORK,
Expand All @@ -58,7 +56,6 @@ func (m *PeerHandler) HandleTransactionAnnouncement(msg *wire.InvVect, peer p2p.

// HandleTransactionRejection is called when a transaction is rejected by a peer.
func (m *PeerHandler) HandleTransactionRejection(rejMsg *wire.MsgReject, peer p2p.PeerI) error {

m.messageCh <- &PeerTxMessage{
Hash: &rejMsg.Hash,
Status: metamorph_api.Status_REJECTED,
Expand All @@ -69,21 +66,21 @@ func (m *PeerHandler) HandleTransactionRejection(rejMsg *wire.MsgReject, peer p2
return nil
}

// HandleTransactionGet is called when a peer requests a transaction.
func (m *PeerHandler) HandleTransactionGet(msg *wire.InvVect, peer p2p.PeerI) ([]byte, error) {
// HandleTransactionsGet is called when a peer requests a transaction.
func (m *PeerHandler) HandleTransactionsGet(msgs []*wire.InvVect, peer p2p.PeerI) ([][]byte, error) {
hashes := make([][]byte, len(msgs))

m.messageCh <- &PeerTxMessage{
Hash: &msg.Hash,
Status: metamorph_api.Status_REQUESTED_BY_NETWORK,
Peer: peer.String(),
}
for i, msg := range msgs {
m.messageCh <- &PeerTxMessage{
Hash: &msg.Hash,
Status: metamorph_api.Status_REQUESTED_BY_NETWORK,
Peer: peer.String(),
}

sd, err := m.store.Get(m.ctx, msg.Hash.CloneBytes())
if err != nil {
return nil, err
hashes[i] = msg.Hash[:]
}

return sd.RawTx, nil
return m.store.GetRawTxs(m.ctx, hashes)
}

// HandleTransaction is called when a transaction is received from a peer.
Expand All @@ -101,18 +98,15 @@ func (m *PeerHandler) HandleTransaction(msg *wire.MsgTx, peer p2p.PeerI) error {

// HandleBlockAnnouncement is called when a block INV message is received from a peer.
func (m *PeerHandler) HandleBlockAnnouncement(_ *wire.InvVect, _ p2p.PeerI) error {

return nil
}

// HandleBlock is called when a block is received from a peer.
func (m *PeerHandler) HandleBlock(_ wire.Message, _ p2p.PeerI) error {

return nil
}

func (m *PeerHandler) Shutdown() {

if m.cancelAll != nil {
m.cancelAll()
}
Expand Down
50 changes: 30 additions & 20 deletions internal/metamorph/peer_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"github.com/bitcoin-sv/arc/internal/metamorph"
"github.com/bitcoin-sv/arc/internal/metamorph/store"
storeMocks "github.com/bitcoin-sv/arc/internal/metamorph/store/mocks"
"github.com/bitcoin-sv/arc/pkg/metamorph/metamorph_api"
"github.com/libsv/go-p2p"
Expand All @@ -20,10 +19,9 @@ import (
func TestPeerHandler(t *testing.T) {
messageCh := make(chan *metamorph.PeerTxMessage)
mtmStore := &storeMocks.MetamorphStoreMock{
GetFunc: func(ctx context.Context, key []byte) (*store.StoreData, error) {
return &store.StoreData{
RawTx: []byte("1234"),
}, nil
GetRawTxsFunc: func(ctx context.Context, hashes [][]byte) ([][]byte, error) {
rawTx := []byte("1234")
return [][]byte{rawTx, rawTx}, nil
},
}

Expand Down Expand Up @@ -102,28 +100,40 @@ func TestPeerHandler(t *testing.T) {
}
})

t.Run("HandleTransactionGet", func(t *testing.T) {
hash, err := chainhash.NewHashFromStr("1234")
require.NoError(t, err)
t.Run("HandleTransactionsGet", func(t *testing.T) {
txsCount := 2
invMsgs := make([]*wire.InvVect, txsCount)
expectedMsgs := make([]*metamorph.PeerTxMessage, txsCount)

msgInv := wire.NewInvVect(wire.InvTypeBlock, hash)
require.NoError(t, err)
for i := 0; i < txsCount; i++ {
hash, err := chainhash.NewHashFromStr("1234")
require.NoError(t, err)

expectedMsg := &metamorph.PeerTxMessage{
Hash: &msgInv.Hash,
Status: metamorph_api.Status_REQUESTED_BY_NETWORK,
Peer: "mock_peer",
msgInv := wire.NewInvVect(wire.InvTypeTx, hash)
require.NoError(t, err)

invMsgs[i] = msgInv

expectedMsgs[i] = &metamorph.PeerTxMessage{
Hash: hash,
Status: metamorph_api.Status_REQUESTED_BY_NETWORK,
Peer: "mock_peer",
}
}

go func() {
_, _ = peerHandler.HandleTransactionGet(msgInv, peer)
_, _ = peerHandler.HandleTransactionsGet(invMsgs, peer)
}()

select {
case msg := <-messageCh:
assert.Equal(t, expectedMsg, msg)
case <-time.After(time.Second):
t.Fatal("test timed out or error while executing goroutine")
counter := 0
for i := 0; i < txsCount; i++ {
select {
case msg := <-messageCh:
assert.Equal(t, expectedMsgs[counter], msg)
counter++
case <-time.After(5 * time.Second):
t.Fatal("test timed out or error while executing goroutine")
}
}
})

Expand Down
50 changes: 50 additions & 0 deletions internal/metamorph/store/mocks/store_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
- hash: 0xcd3d2f97dfc0cdb6a07ec4b72df5e1794c9553ff2f62d90ed4add047e8088853
raw_tx: 0x010000000000000000ef016f8828b2d3f8085561d0b4ff6f5d17c269206fa3d32bcd3b22e26ce659ed12e7000000006b483045022100d3649d120249a09af44b4673eecec873109a3e120b9610b78858087fb225c9b9022037f16999b7a4fecdd9f47ebdc44abd74567a18940c37e1481ab0fe84d62152e4412102f87ce69f6ba5444aed49c34470041189c1e1060acd99341959c0594002c61bf0ffffffffe7030000000000001976a914c2b6fd4319122b9b5156a2a0060d19864c24f49a88ac01e7030000000000001976a914c2b6fd4319122b9b5156a2a0060d19864c24f49a88ac00000000
locked_by: metamorph-3
status: 4
stored_at: 2023-10-01 14:00:00
last_submitted_at: 2023-10-01 14:00:00
- hash: 0x21132d32cb5411c058bb4391f24f6a36ed9b810df851d0e36cac514fd03d6b4e
raw_tx: 0x020000000000000000ef016f8828b2d3f8085561d0b4ff6f5d17c269206fa3d32bcd3b22e26ce659ed12e7000000006b483045022100d3649d120249a09af44b4673eecec873109a3e120b9610b78858087fb225c9b9022037f16999b7a4fecdd9f47ebdc44abd74567a18940c37e1481ab0fe84d62152e4412102f87ce69f6ba5444aed49c34470041189c1e1060acd99341959c0594002c61bf0ffffffffe7030000000000001976a914c2b6fd4319122b9b5156a2a0060d19864c24f49a88ac01e7030000000000001976a914c2b6fd4319122b9b5156a2a0060d19864c24f49a88ac00000000
locked_by: metamorph-3
status: 4
stored_at: 2023-10-01 14:00:00
last_submitted_at: 2023-10-01 14:00:00
- hash: 0x3e0b5b218c344110f09bf485bc58de4ea5378e55744185edf9c1dafa40068ecd
raw_tx: 0x030000000000000000ef016f8828b2d3f8085561d0b4ff6f5d17c269206fa3d32bcd3b22e26ce659ed12e7000000006b483045022100d3649d120249a09af44b4673eecec873109a3e120b9610b78858087fb225c9b9022037f16999b7a4fecdd9f47ebdc44abd74567a18940c37e1481ab0fe84d62152e4412102f87ce69f6ba5444aed49c34470041189c1e1060acd99341959c0594002c61bf0ffffffffe7030000000000001976a914c2b6fd4319122b9b5156a2a0060d19864c24f49a88ac01e7030000000000001976a914c2b6fd4319122b9b5156a2a0060d19864c24f49a88ac00000000
locked_by: metamorph-1
status: 4
stored_at: 2023-10-01 14:00:00
last_submitted_at: 2023-10-01 14:00:00
Loading

0 comments on commit 2187ff6

Please sign in to comment.