Skip to content

Commit

Permalink
feat: replace go-p2p with internal implementation in blocktx (#679)
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadiuszos4chain authored Nov 29, 2024
1 parent 631726a commit 22fd0dc
Show file tree
Hide file tree
Showing 20 changed files with 258 additions and 1,193 deletions.
42 changes: 24 additions & 18 deletions cmd/arc/services/blocktx.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"os"
"time"

"github.com/libsv/go-p2p"
"go.opentelemetry.io/otel/attribute"

"github.com/bitcoin-sv/arc/internal/grpc_opts"
Expand All @@ -17,8 +16,11 @@ import (

"github.com/bitcoin-sv/arc/config"
"github.com/bitcoin-sv/arc/internal/blocktx"
blockchain "github.com/bitcoin-sv/arc/internal/blocktx/blockchain_communication"
blocktx_p2p "github.com/bitcoin-sv/arc/internal/blocktx/blockchain_communication/p2p"
"github.com/bitcoin-sv/arc/internal/blocktx/store"
"github.com/bitcoin-sv/arc/internal/blocktx/store/postgresql"
"github.com/bitcoin-sv/arc/internal/p2p"
"github.com/bitcoin-sv/arc/internal/version"
)

Expand All @@ -37,7 +39,7 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err
blockStore store.BlocktxStore
mqClient blocktx.MessageQueueClient
processor *blocktx.Processor
pm p2p.PeerManagerI
pm *p2p.PeerManager
server *blocktx.Server
healthServer *grpc_opts.GrpcServer
workers *blocktx.BackgroundWorkers
Expand Down Expand Up @@ -125,8 +127,8 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err
blocktx.WithMessageQueueClient(mqClient),
)

blockRequestCh := make(chan blocktx.BlockRequest, blockProcessingBuffer)
blockProcessCh := make(chan *p2p.BlockMessage, blockProcessingBuffer)
blockRequestCh := make(chan blocktx_p2p.BlockRequest, blockProcessingBuffer)
blockProcessCh := make(chan *blockchain.BlockMessage, blockProcessingBuffer)

processor, err = blocktx.NewProcessor(logger, blockStore, blockRequestCh, blockProcessCh, processorOpts...)
if err != nil {
Expand All @@ -140,25 +142,27 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err
return nil, fmt.Errorf("failed to start peer handler: %v", err)
}

peerOpts := []p2p.PeerOptions{
p2p.WithMaximumMessageSize(maximumBlockSize),
p2p.WithRetryReadWriteMessageInterval(5 * time.Second),
p2p.WithPingInterval(30*time.Second, 1*time.Minute),
}

if version.Version != "" {
peerOpts = append(peerOpts, p2p.WithUserAgent("ARC", version.Version))
}
// p2p global setting
p2p.SetExcessiveBlockSize(maximumBlockSize)

pmOpts := []p2p.PeerManagerOptions{p2p.WithExcessiveBlockSize(maximumBlockSize)}
pmOpts := []p2p.PeerManagerOptions{}
if arcConfig.Metamorph.MonitorPeers {
pmOpts = append(pmOpts, p2p.WithRestartUnhealthyPeers())
}

pm = p2p.NewPeerManager(logger.With(slog.String("module", "peer-mng")), network, pmOpts...)
peers := make([]p2p.PeerI, len(arcConfig.Broadcasting.Unicast.Peers))

peerHandler := blocktx.NewPeerHandler(logger, blockRequestCh, blockProcessCh)
peerHandler := blocktx_p2p.NewMsgHandler(logger, blockRequestCh, blockProcessCh)

peerOpts := []p2p.PeerOptions{
p2p.WithMaximumMessageSize(maximumBlockSize),
p2p.WithPingInterval(30*time.Second, 1*time.Minute),
}

if version.Version != "" {
peerOpts = append(peerOpts, p2p.WithUserAgent("ARC", version.Version))
}

for i, peerSetting := range arcConfig.Broadcasting.Unicast.Peers {
peerURL, err := peerSetting.GetP2PUrl()
Expand All @@ -167,11 +171,13 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err
return nil, fmt.Errorf("error getting peer url: %v", err)
}

peer, err := p2p.NewPeer(logger.With(slog.String("module", "peer")), peerURL, peerHandler, network, peerOpts...)
if err != nil {
peer := p2p.NewPeer(logger.With(slog.String("module", "peer")), peerHandler, peerURL, network, peerOpts...)
ok := peer.Connect()
if !ok {
stopFn()
return nil, fmt.Errorf("error creating peer %s: %v", peerURL, err)
}

err = pm.AddPeer(peer)
if err != nil {
stopFn()
Expand Down Expand Up @@ -238,7 +244,7 @@ func NewBlocktxStore(logger *slog.Logger, dbConfig *config.DbConfig, tracingConf
}

func disposeBlockTx(l *slog.Logger, server *blocktx.Server, processor *blocktx.Processor,
pm p2p.PeerManagerI, mqClient blocktx.MessageQueueClient,
pm *p2p.PeerManager, mqClient blocktx.MessageQueueClient,
store store.BlocktxStore, healthServer *grpc_opts.GrpcServer, workers *blocktx.BackgroundWorkers,
shutdownFns []func(),
) {
Expand Down
11 changes: 6 additions & 5 deletions internal/blocktx/background_workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import (
"sync"
"time"

blocktx_p2p "github.com/bitcoin-sv/arc/internal/blocktx/blockchain_communication/p2p"
"github.com/bitcoin-sv/arc/internal/blocktx/store"
"github.com/libsv/go-p2p"
"github.com/bitcoin-sv/arc/internal/p2p"
)

type BackgroundWorkers struct {
Expand Down Expand Up @@ -40,7 +41,7 @@ func (w *BackgroundWorkers) GracefulStop() {
w.l.Info("Shutdown complete")
}

func (w *BackgroundWorkers) StartFillGaps(peers []p2p.PeerI, interval time.Duration, retentionDays int, blockRequestCh chan<- BlockRequest) {
func (w *BackgroundWorkers) StartFillGaps(peers []p2p.PeerI, interval time.Duration, retentionDays int, blockRequestingCh chan<- blocktx_p2p.BlockRequest) {
w.workersWg.Add(1)

go func() {
Expand All @@ -53,7 +54,7 @@ func (w *BackgroundWorkers) StartFillGaps(peers []p2p.PeerI, interval time.Durat
select {
case <-t.C:
i = i % len(peers)
err := w.fillGaps(peers[i], retentionDays, blockRequestCh)
err := w.fillGaps(peers[i], retentionDays, blockRequestingCh)
if err != nil {
w.l.Error("failed to fill blocks gaps", slog.String("err", err.Error()))
}
Expand All @@ -68,7 +69,7 @@ func (w *BackgroundWorkers) StartFillGaps(peers []p2p.PeerI, interval time.Durat
}()
}

func (w *BackgroundWorkers) fillGaps(peer p2p.PeerI, retentionDays int, blockRequestCh chan<- BlockRequest) error {
func (w *BackgroundWorkers) fillGaps(peer p2p.PeerI, retentionDays int, blockRequestingCh chan<- blocktx_p2p.BlockRequest) error {
const (
hoursPerDay = 24
blocksPerHour = 6
Expand All @@ -91,7 +92,7 @@ func (w *BackgroundWorkers) fillGaps(peer p2p.PeerI, retentionDays int, blockReq
slog.String("peer", peer.String()),
)

blockRequestCh <- BlockRequest{
blockRequestingCh <- blocktx_p2p.BlockRequest{
Hash: block.Hash,
Peer: peer,
}
Expand Down
27 changes: 11 additions & 16 deletions internal/blocktx/background_workers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ import (
"time"

"github.com/bitcoin-sv/arc/internal/blocktx"
"github.com/bitcoin-sv/arc/internal/blocktx/mocks"
blocktx_p2p "github.com/bitcoin-sv/arc/internal/blocktx/blockchain_communication/p2p"
"github.com/bitcoin-sv/arc/internal/blocktx/store"
storeMocks "github.com/bitcoin-sv/arc/internal/blocktx/store/mocks"
"github.com/bitcoin-sv/arc/internal/p2p"
p2pMocks "github.com/bitcoin-sv/arc/internal/p2p/mocks"
"github.com/bitcoin-sv/arc/internal/testdata"
"github.com/libsv/go-p2p"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -64,6 +65,9 @@ func TestStartFillGaps(t *testing.T) {
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
// given
const fillGapsInterval = 50 * time.Millisecond

blockRequestingCh := make(chan blocktx_p2p.BlockRequest, 10)
getBlockErrCh := make(chan error)

getBlockGapTestErr := tc.getBlockGapsErr
Expand All @@ -78,30 +82,21 @@ func TestStartFillGaps(t *testing.T) {
},
}

peerMock := &mocks.PeerMock{
StringFunc: func() string {
return ""
},
}
peerMock := &p2pMocks.PeerIMock{StringFunc: func() string { return "peer" }}
peers := []p2p.PeerI{peerMock}

logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))
blockRequestCh := make(chan blocktx.BlockRequest, 10)

sut := blocktx.NewBackgroundWorkers(storeMock, logger)

interval := 50 * time.Millisecond
sut := blocktx.NewBackgroundWorkers(storeMock, slog.Default())

// when
sut.StartFillGaps(peers, interval, 28, blockRequestCh)
sut.StartFillGaps(peers, fillGapsInterval, 28, blockRequestingCh)

// then
select {
case hashPeer := <-blockRequestCh:
case hashPeer := <-blockRequestingCh:
require.True(t, testdata.Block1Hash.IsEqual(hashPeer.Hash))
case err = <-getBlockErrCh:
require.ErrorIs(t, err, tc.getBlockGapsErr)
case <-time.After(time.Duration(3.5 * float64(interval))):
case <-time.After(time.Duration(3.5 * float64(fillGapsInterval))):
}

sut.GracefulStop()
Expand Down
29 changes: 29 additions & 0 deletions internal/blocktx/blockchain_communication/block_message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package blockchaincommunication

import (
"io"

"github.com/libsv/go-p2p/chaincfg/chainhash"
"github.com/libsv/go-p2p/wire"
)

// BlockMessage only stores the transaction IDs of the block, not the full transactions
type BlockMessage struct {
Header *wire.BlockHeader
Height uint64
TransactionHashes []*chainhash.Hash
Size uint64
}

func (bm *BlockMessage) Bsvdecode(io.Reader, uint32, wire.MessageEncoding) error {
return nil
}
func (bm *BlockMessage) BsvEncode(io.Writer, uint32, wire.MessageEncoding) error {
return nil
}
func (bm *BlockMessage) Command() string {
return "block"
}
func (bm *BlockMessage) MaxPayloadLength(uint32) uint64 {
return wire.MaxExtMsgPayload
}
Original file line number Diff line number Diff line change
@@ -1,25 +1,22 @@
package blocktx
package blocktx_p2p

import (
"encoding/binary"
"errors"
"io"
"log/slog"

blockchain "github.com/bitcoin-sv/arc/internal/blocktx/blockchain_communication"

"github.com/bitcoin-sv/go-sdk/script"
sdkTx "github.com/bitcoin-sv/go-sdk/transaction"
"github.com/bitcoin-sv/go-sdk/util"
"github.com/libsv/go-p2p"
"github.com/libsv/go-p2p/chaincfg/chainhash"
"github.com/libsv/go-p2p/wire"
)

var ErrUnableToCastWireMessage = errors.New("unable to cast wire.Message to p2p.BlockMessage")

func init() {
// override the default wire block handler with our own that streams and stores only the transaction ids
wire.SetExternalHandler(wire.CmdBlock, func(reader io.Reader, _ uint64, bytesRead int) (int, wire.Message, []byte, error) {
blockMessage := &p2p.BlockMessage{
blockMessage := &blockchain.BlockMessage{
Header: &wire.BlockHeader{},
}

Expand Down Expand Up @@ -68,60 +65,6 @@ func init() {
})
}

type PeerHandler struct {
logger *slog.Logger
blockRequestCh chan BlockRequest
blockProcessCh chan *p2p.BlockMessage
}

func NewPeerHandler(logger *slog.Logger, blockRequestCh chan BlockRequest, blockProcessCh chan *p2p.BlockMessage) *PeerHandler {
return &PeerHandler{
logger: logger.With(slog.String("module", "peer-handler")),
blockRequestCh: blockRequestCh,
blockProcessCh: blockProcessCh,
}
}

func (ph *PeerHandler) HandleTransactionsGet(_ []*wire.InvVect, _ p2p.PeerI) ([][]byte, error) {
return nil, nil
}

func (ph *PeerHandler) HandleTransactionSent(_ *wire.MsgTx, _ p2p.PeerI) error {
return nil
}

func (ph *PeerHandler) HandleTransactionAnnouncement(_ *wire.InvVect, _ p2p.PeerI) error {
return nil
}

func (ph *PeerHandler) HandleTransactionRejection(_ *wire.MsgReject, _ p2p.PeerI) error {
return nil
}

func (ph *PeerHandler) HandleTransaction(_ *wire.MsgTx, _ p2p.PeerI) error {
return nil
}

func (ph *PeerHandler) HandleBlockAnnouncement(msg *wire.InvVect, peer p2p.PeerI) error {
req := BlockRequest{
Hash: &msg.Hash,
Peer: peer,
}

ph.blockRequestCh <- req
return nil
}

func (ph *PeerHandler) HandleBlock(wireMsg wire.Message, _ p2p.PeerI) error {
msg, ok := wireMsg.(*p2p.BlockMessage)
if !ok {
return ErrUnableToCastWireMessage
}

ph.blockProcessCh <- msg
return nil
}

func extractHeightFromCoinbaseTx(tx *sdkTx.Transaction) uint64 {
// Coinbase tx has a special format, the height is encoded in the first 4 bytes of the scriptSig (BIP-34)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package blocktx
package blocktx_p2p

import (
"testing"
Expand Down
Loading

0 comments on commit 22fd0dc

Please sign in to comment.