Skip to content

Commit

Permalink
Merge branch 'main' of github.com:bitcoin-sv/arc into update-sdk
Browse files Browse the repository at this point in the history
  • Loading branch information
shotasilagadzetaal committed Feb 4, 2025
2 parents 563656e + 59689ba commit a5cf9d9
Show file tree
Hide file tree
Showing 50 changed files with 529 additions and 838 deletions.
25 changes: 15 additions & 10 deletions cmd/arc/services/blocktx.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ import (

"go.opentelemetry.io/otel/attribute"

"github.com/libsv/go-p2p"

"github.com/bitcoin-sv/arc/config"
"github.com/bitcoin-sv/arc/internal/blocktx"
"github.com/bitcoin-sv/arc/internal/blocktx/bcnet"
"github.com/bitcoin-sv/arc/internal/blocktx/bcnet/blocktx_p2p"
"github.com/bitcoin-sv/arc/internal/blocktx/store"
"github.com/bitcoin-sv/arc/internal/blocktx/store/postgresql"
"github.com/bitcoin-sv/arc/internal/grpc_opts"
"github.com/bitcoin-sv/arc/internal/p2p"
"github.com/bitcoin-sv/arc/internal/version"
"github.com/bitcoin-sv/arc/pkg/message_queue/nats/client/nats_core"
"github.com/bitcoin-sv/arc/pkg/message_queue/nats/client/nats_jetstream"
Expand All @@ -37,7 +38,7 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err
blockStore store.BlocktxStore
mqClient blocktx.MessageQueueClient
processor *blocktx.Processor
pm p2p.PeerManagerI
pm *p2p.PeerManager
server *blocktx.Server
healthServer *grpc_opts.GrpcServer
workers *blocktx.BackgroundWorkers
Expand Down Expand Up @@ -127,8 +128,8 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err
blocktx.WithIncomingIsLongest(btxConfig.IncomingIsLongest),
)

blockRequestCh := make(chan blocktx.BlockRequest, blockProcessingBuffer)
blockProcessCh := make(chan *p2p.BlockMessage, blockProcessingBuffer)
blockRequestCh := make(chan blocktx_p2p.BlockRequest, blockProcessingBuffer)
blockProcessCh := make(chan *bcnet.BlockMessage, blockProcessingBuffer)

processor, err = blocktx.NewProcessor(logger, blockStore, blockRequestCh, blockProcessCh, processorOpts...)
if err != nil {
Expand All @@ -142,15 +143,18 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err
return nil, fmt.Errorf("failed to start peer handler: %v", err)
}

pmOpts := []p2p.PeerManagerOptions{p2p.WithExcessiveBlockSize(maximumBlockSize)}
// p2p global setting
p2p.SetExcessiveBlockSize(maximumBlockSize)

pmOpts := []p2p.PeerManagerOptions{}
if arcConfig.Blocktx.MonitorPeers {
pmOpts = append(pmOpts, p2p.WithRestartUnhealthyPeers())
}

pm = p2p.NewPeerManager(logger.With(slog.String("module", "peer-mng")), network, pmOpts...)
peers := make([]p2p.PeerI, len(arcConfig.Broadcasting.Unicast.Peers))

peerHandler := blocktx.NewPeerHandler(logger, blockRequestCh, blockProcessCh)
peerHandler := blocktx_p2p.NewMsgHandler(logger, blockRequestCh, blockProcessCh)

peerOpts := []p2p.PeerOptions{
p2p.WithMaximumMessageSize(maximumBlockSize),
Expand All @@ -169,8 +173,9 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err
return nil, fmt.Errorf("error getting peer url: %v", err)
}

peer, err := p2p.NewPeer(logger.With(slog.String("module", "peer")), peerURL, peerHandler, network, peerOpts...)
if err != nil {
peer := p2p.NewPeer(logger.With(slog.String("module", "peer")), peerHandler, peerURL, network, peerOpts...)
ok := peer.Connect()
if !ok {
stopFn()
return nil, fmt.Errorf("error creating peer %s: %v", peerURL, err)
}
Expand Down Expand Up @@ -243,7 +248,7 @@ func NewBlocktxStore(logger *slog.Logger, dbConfig *config.DbConfig, tracingConf
}

func disposeBlockTx(l *slog.Logger, server *blocktx.Server, processor *blocktx.Processor,
pm p2p.PeerManagerI, mqClient blocktx.MessageQueueClient,
pm *p2p.PeerManager, mqClient blocktx.MessageQueueClient,
store store.BlocktxStore, healthServer *grpc_opts.GrpcServer, workers *blocktx.BackgroundWorkers,
shutdownFns []func(),
) {
Expand Down
18 changes: 9 additions & 9 deletions internal/api/handler/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,14 +613,14 @@ func (m ArcDefaultHandler) getTxIDs(txsHex []byte) ([]string, *api.ErrorFields)
return nil, api.NewErrorFields(api.ErrStatusMalformed, errStr)
}
txsHex = remainingBytes
txIDs = append(txIDs, beefTx.GetLatestTx().TxID())
txIDs = append(txIDs, beefTx.GetLatestTx().TxID().String())
} else {
transaction, bytesUsed, err := sdkTx.NewTransactionFromStream(txsHex)
if err != nil {
return nil, api.NewErrorFields(api.ErrStatusBadRequest, err.Error())
}
txsHex = txsHex[bytesUsed:]
txIDs = append(txIDs, transaction.TxID())
txIDs = append(txIDs, transaction.TxID().String())
}
}

Expand Down Expand Up @@ -663,7 +663,7 @@ func (m ArcDefaultHandler) processTransactions(ctx context.Context, txsHex []byt
}
}

txIDs = append(txIDs, beefTx.GetLatestTx().TxID())
txIDs = append(txIDs, beefTx.GetLatestTx().TxID().String())
} else {
transaction, bytesUsed, err := sdkTx.NewTransactionFromStream(txsHex)
if err != nil {
Expand All @@ -679,7 +679,7 @@ func (m ArcDefaultHandler) processTransactions(ctx context.Context, txsHex []byt
}

submittedTxs = append(submittedTxs, transaction)
txIDs = append(txIDs, transaction.TxID())
txIDs = append(txIDs, transaction.TxID().String())
}
}

Expand All @@ -702,7 +702,7 @@ func (m ArcDefaultHandler) processTransactions(ctx context.Context, txsHex []byt
for idx, tx := range txStatuses {
txID := tx.TxID
if txID == "" {
txID = submittedTxs[idx].TxID()
txID = submittedTxs[idx].TxID().String()
}

successes = append(successes, &api.TransactionResponse{
Expand Down Expand Up @@ -738,7 +738,7 @@ func (m ArcDefaultHandler) validateEFTransaction(ctx context.Context, txValidato
err = txValidator.ValidateTransaction(ctx, transaction, feeOpts, scriptOpts, m.tracingEnabled, m.tracingAttributes...)
if err != nil {
statusCode, arcError := m.handleError(ctx, transaction, err)
m.logger.ErrorContext(ctx, "failed to validate transaction", slog.String("id", transaction.TxID()), slog.Int("status", int(statusCode)), slog.String("err", err.Error()))
m.logger.ErrorContext(ctx, "failed to validate transaction", slog.String("id", transaction.TxID().String()), slog.Int("status", int(statusCode)), slog.String("err", err.Error()))
return arcError
}

Expand All @@ -761,7 +761,7 @@ func (m ArcDefaultHandler) validateBEEFTransaction(ctx context.Context, txValida
errTx, err := txValidator.ValidateTransaction(ctx, beefTx, feeOpts, scriptOpts)
if err != nil {
statusCode, arcError := m.handleError(ctx, errTx, err)
m.logger.ErrorContext(ctx, "failed to validate transaction", slog.String("id", errTx.TxID()), slog.Int("status", int(statusCode)), slog.String("err", err.Error()))
m.logger.ErrorContext(ctx, "failed to validate transaction", slog.String("id", errTx.TxID().String()), slog.Int("status", int(statusCode)), slog.String("err", err.Error()))

return arcError
}
Expand Down Expand Up @@ -794,7 +794,7 @@ func (m ArcDefaultHandler) submitTransactions(ctx context.Context, txs []*sdkTx.
status, err = m.TransactionHandler.SubmitTransaction(ctx, tx, options)
if err != nil {
statusCode, arcError := m.handleError(ctx, tx, err)
m.logger.ErrorContext(ctx, "failed to submit transaction", slog.String("id", tx.TxID()), slog.Int("status", int(statusCode)), slog.String("err", err.Error()))
m.logger.ErrorContext(ctx, "failed to submit transaction", slog.String("id", tx.TxID().String()), slog.Int("status", int(statusCode)), slog.String("err", err.Error()))

return nil, arcError
}
Expand Down Expand Up @@ -858,7 +858,7 @@ func (ArcDefaultHandler) handleError(_ context.Context, transaction *sdkTx.Trans
arcError := api.NewErrorFields(status, submitErr.Error())

if transaction != nil {
arcError.Txid = PtrTo(transaction.TxID())
arcError.Txid = PtrTo(transaction.TxID().String())
}

return status, arcError
Expand Down
6 changes: 3 additions & 3 deletions internal/api/handler/default_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ func TestPOSTTransaction(t *testing.T) { //nolint:funlen
tx, _ := sdkTx.NewTransactionFromBytes(tc.getTx)

mt := metamorph.Transaction{
TxID: tx.TxID(),
TxID: tx.TxID().String(),
Bytes: tc.getTx,
BlockHeight: 100,
}
Expand Down Expand Up @@ -1054,7 +1054,7 @@ func TestPOSTTransactions(t *testing.T) { //nolint:funlen
tx, _ := sdkTx.NewTransactionFromBytes(validTxParentBytes)
return []*metamorph.Transaction{
{
TxID: tx.TxID(),
TxID: tx.TxID().String(),
Bytes: validTxParentBytes,
BlockHeight: 100,
},
Expand All @@ -1067,7 +1067,7 @@ func TestPOSTTransactions(t *testing.T) { //nolint:funlen
var res []*metamorph.TransactionStatus
for _, t := range txs {
txID := t.TxID()
if status, found := find(txResults, func(e *metamorph.TransactionStatus) bool { return e.TxID == txID }); found {
if status, found := find(txResults, func(e *metamorph.TransactionStatus) bool { return e.TxID == txID.String() }); found {
res = append(res, status)
}
}
Expand Down
13 changes: 1 addition & 12 deletions internal/beef/bump.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package beef

import (
"encoding/hex"
"errors"

"github.com/bitcoin-sv/go-sdk/chainhash"
sdkTx "github.com/bitcoin-sv/go-sdk/transaction"
)

Expand Down Expand Up @@ -47,16 +45,7 @@ func calculateMerkleRootFromBump(bump *sdkTx.MerklePath) (string, error) {
for _, pathElement := range bump.Path {
for _, pe := range pathElement {
if pe.Txid != nil {
txID := pe.Hash.String()
txIDBytes, err := hex.DecodeString(txID)
if err != nil {
return "", err
}
hash, err := chainhash.NewHash(txIDBytes)
if err != nil {
return "", err
}
mr, err := bump.ComputeRoot(hash)
mr, err := bump.ComputeRoot(pe.Hash)
if err != nil {
return "", err
}
Expand Down
10 changes: 5 additions & 5 deletions internal/blocktx/background_workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"sync"
"time"

"github.com/libsv/go-p2p"

"github.com/bitcoin-sv/arc/internal/blocktx/bcnet/blocktx_p2p"
"github.com/bitcoin-sv/arc/internal/blocktx/store"
"github.com/bitcoin-sv/arc/internal/p2p"
)

type BackgroundWorkers struct {
Expand Down Expand Up @@ -41,7 +41,7 @@ func (w *BackgroundWorkers) GracefulStop() {
w.logger.Info("Shutdown complete")
}

func (w *BackgroundWorkers) StartFillGaps(peers []p2p.PeerI, interval time.Duration, retentionDays int, blockRequestingCh chan<- BlockRequest) {
func (w *BackgroundWorkers) StartFillGaps(peers []p2p.PeerI, interval time.Duration, retentionDays int, blockRequestingCh chan<- blocktx_p2p.BlockRequest) {
w.workersWg.Add(1)

go func() {
Expand Down Expand Up @@ -69,7 +69,7 @@ func (w *BackgroundWorkers) StartFillGaps(peers []p2p.PeerI, interval time.Durat
}()
}

func (w *BackgroundWorkers) fillGaps(peer p2p.PeerI, retentionDays int, blockRequestingCh chan<- BlockRequest) error {
func (w *BackgroundWorkers) fillGaps(peer p2p.PeerI, retentionDays int, blockRequestingCh chan<- blocktx_p2p.BlockRequest) error {
const (
hoursPerDay = 24
blocksPerHour = 6
Expand All @@ -92,7 +92,7 @@ func (w *BackgroundWorkers) fillGaps(peer p2p.PeerI, retentionDays int, blockReq
slog.String("peer", peer.String()),
)

blockRequestingCh <- BlockRequest{
blockRequestingCh <- blocktx_p2p.BlockRequest{
Hash: block.Hash,
Peer: peer,
}
Expand Down
13 changes: 6 additions & 7 deletions internal/blocktx/background_workers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,14 @@ import (
"testing"
"time"

"github.com/libsv/go-p2p"
"github.com/stretchr/testify/require"

"github.com/bitcoin-sv/arc/internal/blocktx"
"github.com/bitcoin-sv/arc/internal/blocktx/mocks"
"github.com/bitcoin-sv/arc/internal/blocktx/bcnet/blocktx_p2p"
"github.com/bitcoin-sv/arc/internal/blocktx/store"
storeMocks "github.com/bitcoin-sv/arc/internal/blocktx/store/mocks"

"github.com/stretchr/testify/require"

"github.com/bitcoin-sv/arc/internal/p2p"
p2pMocks "github.com/bitcoin-sv/arc/internal/p2p/mocks"
"github.com/bitcoin-sv/arc/internal/testdata"
)

Expand Down Expand Up @@ -69,7 +68,7 @@ func TestStartFillGaps(t *testing.T) {
// given
const fillGapsInterval = 50 * time.Millisecond

blockRequestingCh := make(chan blocktx.BlockRequest, 10)
blockRequestingCh := make(chan blocktx_p2p.BlockRequest, 10)
getBlockErrCh := make(chan error)

getBlockGapTestErr := tc.getBlockGapsErr
Expand All @@ -84,7 +83,7 @@ func TestStartFillGaps(t *testing.T) {
},
}

peerMock := &mocks.PeerMock{
peerMock := &p2pMocks.PeerIMock{
StringFunc: func() string {
return ""
},
Expand Down
30 changes: 30 additions & 0 deletions internal/blocktx/bcnet/block_message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package bcnet

import (
"io"

"github.com/libsv/go-p2p/chaincfg/chainhash"
"github.com/libsv/go-p2p/wire"
)

// BlockMessage only stores the transaction IDs of the block, not the full transactions
type BlockMessage struct {
Hash *chainhash.Hash
Header *wire.BlockHeader
Height uint64
TransactionHashes []*chainhash.Hash
Size uint64
}

func (bm *BlockMessage) Bsvdecode(io.Reader, uint32, wire.MessageEncoding) error {
return nil
}
func (bm *BlockMessage) BsvEncode(io.Writer, uint32, wire.MessageEncoding) error {
return nil
}
func (bm *BlockMessage) Command() string {
return "block"
}
func (bm *BlockMessage) MaxPayloadLength(uint32) uint64 {
return wire.MaxExtMsgPayload
}
Loading

0 comments on commit a5cf9d9

Please sign in to comment.