Skip to content

Commit

Permalink
feat: use internal p2p in blocktx
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadiuszos4chain committed Dec 18, 2024
1 parent 3235709 commit 8d2250f
Show file tree
Hide file tree
Showing 20 changed files with 230 additions and 1,048 deletions.
24 changes: 15 additions & 9 deletions cmd/arc/services/blocktx.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@ import (
"github.com/bitcoin-sv/arc/internal/message_queue/nats/client/nats_jetstream"
"github.com/bitcoin-sv/arc/internal/message_queue/nats/nats_connection"
"github.com/bitcoin-sv/arc/internal/tracing"
"github.com/libsv/go-p2p"

"github.com/bitcoin-sv/arc/config"
"github.com/bitcoin-sv/arc/internal/blocktx"
"github.com/bitcoin-sv/arc/internal/blocktx/bcnet"
"github.com/bitcoin-sv/arc/internal/blocktx/bcnet/blocktx_p2p"
"github.com/bitcoin-sv/arc/internal/blocktx/store"
"github.com/bitcoin-sv/arc/internal/blocktx/store/postgresql"
"github.com/bitcoin-sv/arc/internal/p2p"

"github.com/bitcoin-sv/arc/internal/version"
)
Expand All @@ -38,7 +40,7 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err
blockStore store.BlocktxStore
mqClient blocktx.MessageQueueClient
processor *blocktx.Processor
pm p2p.PeerManagerI
pm *p2p.PeerManager
server *blocktx.Server
healthServer *grpc_opts.GrpcServer
workers *blocktx.BackgroundWorkers
Expand Down Expand Up @@ -129,8 +131,8 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err
blocktx.WithMaxBlockProcessingDuration(btxConfig.MaxBlockProcessingDuration),
)

blockRequestCh := make(chan blocktx.BlockRequest, blockProcessingBuffer)
blockProcessCh := make(chan *p2p.BlockMessage, blockProcessingBuffer)
blockRequestCh := make(chan blocktx_p2p.BlockRequest, blockProcessingBuffer)
blockProcessCh := make(chan *bcnet.BlockMessage, blockProcessingBuffer)

processor, err = blocktx.NewProcessor(logger, blockStore, blockRequestCh, blockProcessCh, processorOpts...)
if err != nil {
Expand All @@ -144,15 +146,18 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err
return nil, fmt.Errorf("failed to start peer handler: %v", err)
}

pmOpts := []p2p.PeerManagerOptions{p2p.WithExcessiveBlockSize(maximumBlockSize)}
// p2p global setting
p2p.SetExcessiveBlockSize(maximumBlockSize)

pmOpts := []p2p.PeerManagerOptions{}
if arcConfig.Blocktx.MonitorPeers {
pmOpts = append(pmOpts, p2p.WithRestartUnhealthyPeers())
}

pm = p2p.NewPeerManager(logger.With(slog.String("module", "peer-mng")), network, pmOpts...)
peers := make([]p2p.PeerI, len(arcConfig.Broadcasting.Unicast.Peers))

peerHandler := blocktx.NewPeerHandler(logger, blockRequestCh, blockProcessCh)
peerHandler := blocktx_p2p.NewMsgHandler(logger, blockRequestCh, blockProcessCh)

peerOpts := []p2p.PeerOptions{
p2p.WithMaximumMessageSize(maximumBlockSize),
Expand All @@ -171,8 +176,9 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err
return nil, fmt.Errorf("error getting peer url: %v", err)
}

peer, err := p2p.NewPeer(logger.With(slog.String("module", "peer")), peerURL, peerHandler, network, peerOpts...)
if err != nil {
peer := p2p.NewPeer(logger.With(slog.String("module", "peer")), peerHandler, peerURL, network, peerOpts...)
ok := peer.Connect()
if !ok {
stopFn()
return nil, fmt.Errorf("error creating peer %s: %v", peerURL, err)
}
Expand Down Expand Up @@ -245,7 +251,7 @@ func NewBlocktxStore(logger *slog.Logger, dbConfig *config.DbConfig, tracingConf
}

func disposeBlockTx(l *slog.Logger, server *blocktx.Server, processor *blocktx.Processor,
pm p2p.PeerManagerI, mqClient blocktx.MessageQueueClient,
pm *p2p.PeerManager, mqClient blocktx.MessageQueueClient,
store store.BlocktxStore, healthServer *grpc_opts.GrpcServer, workers *blocktx.BackgroundWorkers,
shutdownFns []func(),
) {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0
go.opentelemetry.io/otel/sdk v1.28.0
go.opentelemetry.io/otel/trace v1.31.0
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56
google.golang.org/genproto/googleapis/rpc v0.0.0-20240805194559-2c9e96a0b5d4
google.golang.org/grpc v1.65.0
google.golang.org/protobuf v1.34.2
Expand Down Expand Up @@ -154,6 +153,7 @@ require (
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.28.0 // indirect
golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect
golang.org/x/net v0.30.0 // indirect
golang.org/x/oauth2 v0.22.0 // indirect
golang.org/x/sync v0.8.0 // indirect
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,6 @@ github.com/libsv/go-bt v1.0.4 h1:2Css5lfomk/J97tM5Gk56Lp+tTK6xWYnmHNc/fGO6lE=
github.com/libsv/go-bt v1.0.4/go.mod h1:AfXoLFYEbY/TvCq/84xTce2xGjPUuC5imokHmcykF2k=
github.com/libsv/go-bt/v2 v2.2.5 h1:VoggBLMRW9NYoFujqe5bSYKqnw5y+fYfufgERSoubog=
github.com/libsv/go-bt/v2 v2.2.5/go.mod h1:cV45+jDlPOLfhJLfpLmpQoWzrIvVth9Ao2ZO1f6CcqU=
github.com/libsv/go-p2p v0.3.2 h1:O32CzkqM+jhSuleRHJln6JjL2pKH8aaRTx8lAfhIiic=
github.com/libsv/go-p2p v0.3.2/go.mod h1:TENFxbTT/bfSfuiirjU6l+PfAWxwZgF8GYUxs5tzc/M=
github.com/libsv/go-p2p v0.3.3 h1:5h+69MsGgFwQWyD8MEqyPeqbqKGRpKLzzOcI5cSLfgY=
github.com/libsv/go-p2p v0.3.3/go.mod h1:TENFxbTT/bfSfuiirjU6l+PfAWxwZgF8GYUxs5tzc/M=
github.com/lmittmann/tint v1.0.5 h1:NQclAutOfYsqs2F1Lenue6OoWCajs5wJcP3DfWVpePw=
Expand Down
9 changes: 5 additions & 4 deletions internal/blocktx/background_workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import (
"sync"
"time"

"github.com/bitcoin-sv/arc/internal/blocktx/bcnet/blocktx_p2p"
"github.com/bitcoin-sv/arc/internal/blocktx/store"
"github.com/libsv/go-p2p"
"github.com/bitcoin-sv/arc/internal/p2p"
)

type BackgroundWorkers struct {
Expand Down Expand Up @@ -40,7 +41,7 @@ func (w *BackgroundWorkers) GracefulStop() {
w.l.Info("Shutdown complete")
}

func (w *BackgroundWorkers) StartFillGaps(peers []p2p.PeerI, interval time.Duration, retentionDays int, blockRequestingCh chan<- BlockRequest) {
func (w *BackgroundWorkers) StartFillGaps(peers []p2p.PeerI, interval time.Duration, retentionDays int, blockRequestingCh chan<- blocktx_p2p.BlockRequest) {
w.workersWg.Add(1)

go func() {
Expand Down Expand Up @@ -68,7 +69,7 @@ func (w *BackgroundWorkers) StartFillGaps(peers []p2p.PeerI, interval time.Durat
}()
}

func (w *BackgroundWorkers) fillGaps(peer p2p.PeerI, retentionDays int, blockRequestingCh chan<- BlockRequest) error {
func (w *BackgroundWorkers) fillGaps(peer p2p.PeerI, retentionDays int, blockRequestingCh chan<- blocktx_p2p.BlockRequest) error {
const (
hoursPerDay = 24
blocksPerHour = 6
Expand All @@ -91,7 +92,7 @@ func (w *BackgroundWorkers) fillGaps(peer p2p.PeerI, retentionDays int, blockReq
slog.String("peer", peer.String()),
)

blockRequestingCh <- BlockRequest{
blockRequestingCh <- blocktx_p2p.BlockRequest{
Hash: block.Hash,
Peer: peer,
}
Expand Down
9 changes: 5 additions & 4 deletions internal/blocktx/background_workers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ import (
"time"

"github.com/bitcoin-sv/arc/internal/blocktx"
"github.com/bitcoin-sv/arc/internal/blocktx/mocks"
"github.com/bitcoin-sv/arc/internal/blocktx/bcnet/blocktx_p2p"
"github.com/bitcoin-sv/arc/internal/blocktx/store"
storeMocks "github.com/bitcoin-sv/arc/internal/blocktx/store/mocks"
"github.com/libsv/go-p2p"

"github.com/bitcoin-sv/arc/internal/p2p"
p2pMocks "github.com/bitcoin-sv/arc/internal/p2p/mocks"
"github.com/bitcoin-sv/arc/internal/testdata"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -67,7 +68,7 @@ func TestStartFillGaps(t *testing.T) {
// given
const fillGapsInterval = 50 * time.Millisecond

blockRequestingCh := make(chan blocktx.BlockRequest, 10)
blockRequestingCh := make(chan blocktx_p2p.BlockRequest, 10)
getBlockErrCh := make(chan error)

getBlockGapTestErr := tc.getBlockGapsErr
Expand All @@ -82,7 +83,7 @@ func TestStartFillGaps(t *testing.T) {
},
}

peerMock := &mocks.PeerMock{
peerMock := &p2pMocks.PeerIMock{
StringFunc: func() string {
return ""
},
Expand Down
30 changes: 30 additions & 0 deletions internal/blocktx/bcnet/block_message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package bcnet

import (
"io"

"github.com/libsv/go-p2p/chaincfg/chainhash"
"github.com/libsv/go-p2p/wire"
)

// BlockMessage only stores the transaction IDs of the block, not the full transactions
type BlockMessage struct {
Hash *chainhash.Hash
Header *wire.BlockHeader
Height uint64
TransactionHashes []*chainhash.Hash
Size uint64
}

func (bm *BlockMessage) Bsvdecode(io.Reader, uint32, wire.MessageEncoding) error {
return nil
}
func (bm *BlockMessage) BsvEncode(io.Writer, uint32, wire.MessageEncoding) error {
return nil
}
func (bm *BlockMessage) Command() string {
return "block"
}
func (bm *BlockMessage) MaxPayloadLength(uint32) uint64 {
return wire.MaxExtMsgPayload
}
78 changes: 78 additions & 0 deletions internal/blocktx/bcnet/blocktx_p2p/message_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package blocktx_p2p

import (
"errors"
"log/slog"

"github.com/bitcoin-sv/arc/internal/blocktx/bcnet"
"github.com/bitcoin-sv/arc/internal/p2p"
"github.com/libsv/go-p2p/chaincfg/chainhash"
"github.com/libsv/go-p2p/wire"
)

var ErrUnableToCastWireMessage = errors.New("unable to cast wire.Message to blockchain.BlockMessage")

type BlockRequest struct {
Hash *chainhash.Hash
Peer p2p.PeerI
}

var _ p2p.MessageHandlerI = (*MsgHandler)(nil)

type MsgHandler struct {
logger *slog.Logger
blockRequestingCh chan<- BlockRequest
blockProcessingCh chan<- *bcnet.BlockMessage
}

func NewMsgHandler(logger *slog.Logger, blockRequestCh chan<- BlockRequest, blockProcessCh chan<- *bcnet.BlockMessage) *MsgHandler {
return &MsgHandler{
logger: logger.With(slog.String("module", "peer-msg-handler")),
blockRequestingCh: blockRequestCh,
blockProcessingCh: blockProcessCh,
}
}

// OnReceive handles incoming messages depending on command type
func (h *MsgHandler) OnReceive(msg wire.Message, peer p2p.PeerI) {
cmd := msg.Command()

switch cmd {
case wire.CmdInv:
invMsg, ok := msg.(*wire.MsgInv)
if !ok {
return
}

go func() {
for _, iv := range invMsg.InvList {
if iv.Type == wire.InvTypeBlock {
req := BlockRequest{
Hash: &iv.Hash,
Peer: peer,
}

h.blockRequestingCh <- req
}
// ignore INV with transaction or error
}
}()

case wire.CmdBlock:
blockMsg, ok := msg.(*bcnet.BlockMessage)
if !ok {
h.logger.Error("Block msg receive", slog.Any("err", ErrUnableToCastWireMessage))
return
}

h.blockProcessingCh <- blockMsg

default:
// ignore other messages
}
}

// OnSend handles outgoing messages depending on command type
func (h *MsgHandler) OnSend(_ wire.Message, _ p2p.PeerI) {
// ignore
}
Original file line number Diff line number Diff line change
@@ -1,25 +1,20 @@
package blocktx
package bcnet

import (
"encoding/binary"
"errors"
"io"
"log/slog"

"github.com/bitcoin-sv/go-sdk/script"
sdkTx "github.com/bitcoin-sv/go-sdk/transaction"
"github.com/bitcoin-sv/go-sdk/util"
"github.com/libsv/go-p2p"
"github.com/libsv/go-p2p/chaincfg/chainhash"
"github.com/libsv/go-p2p/wire"
)

var ErrUnableToCastWireMessage = errors.New("unable to cast wire.Message to p2p.BlockMessage")

func init() {
// override the default wire block handler with our own that streams and stores only the transaction ids
wire.SetExternalHandler(wire.CmdBlock, func(reader io.Reader, _ uint64, bytesRead int) (int, wire.Message, []byte, error) {
blockMessage := &p2p.BlockMessage{
blockMessage := &BlockMessage{
Header: &wire.BlockHeader{},
}

Expand All @@ -42,7 +37,7 @@ func init() {
var tx *sdkTx.Transaction
var hash *chainhash.Hash
var txBytes []byte
for i := 0; i < int(txCount); i++ {
for i := uint64(0); i < uint64(txCount); i++ {
tx = sdkTx.NewTransaction()
read, err = tx.ReadFrom(reader)
if err != nil {
Expand All @@ -62,66 +57,14 @@ func init() {
}
}

blockMessage.Size = uint64(bytesRead)
blockMessage.Size = uint64(bytesRead) // #nosec G115
blockHash := blockMessage.Header.BlockHash()
blockMessage.Hash = &blockHash

return bytesRead, blockMessage, nil, nil
})
}

type PeerHandler struct {
logger *slog.Logger
blockRequestCh chan BlockRequest
blockProcessCh chan *p2p.BlockMessage
}

func NewPeerHandler(logger *slog.Logger, blockRequestCh chan BlockRequest, blockProcessCh chan *p2p.BlockMessage) *PeerHandler {
return &PeerHandler{
logger: logger.With(slog.String("module", "peer-handler")),
blockRequestCh: blockRequestCh,
blockProcessCh: blockProcessCh,
}
}

func (ph *PeerHandler) HandleTransactionsGet(_ []*wire.InvVect, _ p2p.PeerI) ([][]byte, error) {
return nil, nil
}

func (ph *PeerHandler) HandleTransactionSent(_ *wire.MsgTx, _ p2p.PeerI) error {
return nil
}

func (ph *PeerHandler) HandleTransactionAnnouncement(_ *wire.InvVect, _ p2p.PeerI) error {
return nil
}

func (ph *PeerHandler) HandleTransactionRejection(_ *wire.MsgReject, _ p2p.PeerI) error {
return nil
}

func (ph *PeerHandler) HandleTransaction(_ *wire.MsgTx, _ p2p.PeerI) error {
return nil
}

func (ph *PeerHandler) HandleBlockAnnouncement(msg *wire.InvVect, peer p2p.PeerI) error {
req := BlockRequest{
Hash: &msg.Hash,
Peer: peer,
}

ph.blockRequestCh <- req
return nil
}

func (ph *PeerHandler) HandleBlock(wireMsg wire.Message, _ p2p.PeerI) error {
msg, ok := wireMsg.(*p2p.BlockMessage)
if !ok {
return ErrUnableToCastWireMessage
}

ph.blockProcessCh <- msg
return nil
}

func extractHeightFromCoinbaseTx(tx *sdkTx.Transaction) uint64 {
// Coinbase tx has a special format, the height is encoded in the first 4 bytes of the scriptSig (BIP-34)

Expand Down
Loading

0 comments on commit 8d2250f

Please sign in to comment.