diff --git a/cmd/arc/services/blocktx.go b/cmd/arc/services/blocktx.go index 611ae815c..5c328b0a9 100644 --- a/cmd/arc/services/blocktx.go +++ b/cmd/arc/services/blocktx.go @@ -8,13 +8,14 @@ import ( "go.opentelemetry.io/otel/attribute" - "github.com/libsv/go-p2p" - "github.com/bitcoin-sv/arc/config" "github.com/bitcoin-sv/arc/internal/blocktx" + "github.com/bitcoin-sv/arc/internal/blocktx/bcnet" + "github.com/bitcoin-sv/arc/internal/blocktx/bcnet/blocktx_p2p" "github.com/bitcoin-sv/arc/internal/blocktx/store" "github.com/bitcoin-sv/arc/internal/blocktx/store/postgresql" "github.com/bitcoin-sv/arc/internal/grpc_opts" + "github.com/bitcoin-sv/arc/internal/p2p" "github.com/bitcoin-sv/arc/internal/version" "github.com/bitcoin-sv/arc/pkg/message_queue/nats/client/nats_core" "github.com/bitcoin-sv/arc/pkg/message_queue/nats/client/nats_jetstream" @@ -37,7 +38,7 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err blockStore store.BlocktxStore mqClient blocktx.MessageQueueClient processor *blocktx.Processor - pm p2p.PeerManagerI + pm *p2p.PeerManager server *blocktx.Server healthServer *grpc_opts.GrpcServer workers *blocktx.BackgroundWorkers @@ -127,8 +128,8 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err blocktx.WithIncomingIsLongest(btxConfig.IncomingIsLongest), ) - blockRequestCh := make(chan blocktx.BlockRequest, blockProcessingBuffer) - blockProcessCh := make(chan *p2p.BlockMessage, blockProcessingBuffer) + blockRequestCh := make(chan blocktx_p2p.BlockRequest, blockProcessingBuffer) + blockProcessCh := make(chan *bcnet.BlockMessage, blockProcessingBuffer) processor, err = blocktx.NewProcessor(logger, blockStore, blockRequestCh, blockProcessCh, processorOpts...) if err != nil { @@ -142,7 +143,10 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err return nil, fmt.Errorf("failed to start peer handler: %v", err) } - pmOpts := []p2p.PeerManagerOptions{p2p.WithExcessiveBlockSize(maximumBlockSize)} + // p2p global setting + p2p.SetExcessiveBlockSize(maximumBlockSize) + + pmOpts := []p2p.PeerManagerOptions{} if arcConfig.Blocktx.MonitorPeers { pmOpts = append(pmOpts, p2p.WithRestartUnhealthyPeers()) } @@ -150,7 +154,7 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err pm = p2p.NewPeerManager(logger.With(slog.String("module", "peer-mng")), network, pmOpts...) peers := make([]p2p.PeerI, len(arcConfig.Broadcasting.Unicast.Peers)) - peerHandler := blocktx.NewPeerHandler(logger, blockRequestCh, blockProcessCh) + peerHandler := blocktx_p2p.NewMsgHandler(logger, blockRequestCh, blockProcessCh) peerOpts := []p2p.PeerOptions{ p2p.WithMaximumMessageSize(maximumBlockSize), @@ -169,8 +173,9 @@ func StartBlockTx(logger *slog.Logger, arcConfig *config.ArcConfig) (func(), err return nil, fmt.Errorf("error getting peer url: %v", err) } - peer, err := p2p.NewPeer(logger.With(slog.String("module", "peer")), peerURL, peerHandler, network, peerOpts...) - if err != nil { + peer := p2p.NewPeer(logger.With(slog.String("module", "peer")), peerHandler, peerURL, network, peerOpts...) + ok := peer.Connect() + if !ok { stopFn() return nil, fmt.Errorf("error creating peer %s: %v", peerURL, err) } @@ -243,7 +248,7 @@ func NewBlocktxStore(logger *slog.Logger, dbConfig *config.DbConfig, tracingConf } func disposeBlockTx(l *slog.Logger, server *blocktx.Server, processor *blocktx.Processor, - pm p2p.PeerManagerI, mqClient blocktx.MessageQueueClient, + pm *p2p.PeerManager, mqClient blocktx.MessageQueueClient, store store.BlocktxStore, healthServer *grpc_opts.GrpcServer, workers *blocktx.BackgroundWorkers, shutdownFns []func(), ) { diff --git a/internal/api/handler/default.go b/internal/api/handler/default.go index 41942deb5..9edb481d1 100644 --- a/internal/api/handler/default.go +++ b/internal/api/handler/default.go @@ -613,14 +613,14 @@ func (m ArcDefaultHandler) getTxIDs(txsHex []byte) ([]string, *api.ErrorFields) return nil, api.NewErrorFields(api.ErrStatusMalformed, errStr) } txsHex = remainingBytes - txIDs = append(txIDs, beefTx.GetLatestTx().TxID()) + txIDs = append(txIDs, beefTx.GetLatestTx().TxID().String()) } else { transaction, bytesUsed, err := sdkTx.NewTransactionFromStream(txsHex) if err != nil { return nil, api.NewErrorFields(api.ErrStatusBadRequest, err.Error()) } txsHex = txsHex[bytesUsed:] - txIDs = append(txIDs, transaction.TxID()) + txIDs = append(txIDs, transaction.TxID().String()) } } @@ -663,7 +663,7 @@ func (m ArcDefaultHandler) processTransactions(ctx context.Context, txsHex []byt } } - txIDs = append(txIDs, beefTx.GetLatestTx().TxID()) + txIDs = append(txIDs, beefTx.GetLatestTx().TxID().String()) } else { transaction, bytesUsed, err := sdkTx.NewTransactionFromStream(txsHex) if err != nil { @@ -679,7 +679,7 @@ func (m ArcDefaultHandler) processTransactions(ctx context.Context, txsHex []byt } submittedTxs = append(submittedTxs, transaction) - txIDs = append(txIDs, transaction.TxID()) + txIDs = append(txIDs, transaction.TxID().String()) } } @@ -702,7 +702,7 @@ func (m ArcDefaultHandler) processTransactions(ctx context.Context, txsHex []byt for idx, tx := range txStatuses { txID := tx.TxID if txID == "" { - txID = submittedTxs[idx].TxID() + txID = submittedTxs[idx].TxID().String() } successes = append(successes, &api.TransactionResponse{ @@ -738,7 +738,7 @@ func (m ArcDefaultHandler) validateEFTransaction(ctx context.Context, txValidato err = txValidator.ValidateTransaction(ctx, transaction, feeOpts, scriptOpts, m.tracingEnabled, m.tracingAttributes...) if err != nil { statusCode, arcError := m.handleError(ctx, transaction, err) - m.logger.ErrorContext(ctx, "failed to validate transaction", slog.String("id", transaction.TxID()), slog.Int("status", int(statusCode)), slog.String("err", err.Error())) + m.logger.ErrorContext(ctx, "failed to validate transaction", slog.String("id", transaction.TxID().String()), slog.Int("status", int(statusCode)), slog.String("err", err.Error())) return arcError } @@ -761,7 +761,7 @@ func (m ArcDefaultHandler) validateBEEFTransaction(ctx context.Context, txValida errTx, err := txValidator.ValidateTransaction(ctx, beefTx, feeOpts, scriptOpts) if err != nil { statusCode, arcError := m.handleError(ctx, errTx, err) - m.logger.ErrorContext(ctx, "failed to validate transaction", slog.String("id", errTx.TxID()), slog.Int("status", int(statusCode)), slog.String("err", err.Error())) + m.logger.ErrorContext(ctx, "failed to validate transaction", slog.String("id", errTx.TxID().String()), slog.Int("status", int(statusCode)), slog.String("err", err.Error())) return arcError } @@ -794,7 +794,7 @@ func (m ArcDefaultHandler) submitTransactions(ctx context.Context, txs []*sdkTx. status, err = m.TransactionHandler.SubmitTransaction(ctx, tx, options) if err != nil { statusCode, arcError := m.handleError(ctx, tx, err) - m.logger.ErrorContext(ctx, "failed to submit transaction", slog.String("id", tx.TxID()), slog.Int("status", int(statusCode)), slog.String("err", err.Error())) + m.logger.ErrorContext(ctx, "failed to submit transaction", slog.String("id", tx.TxID().String()), slog.Int("status", int(statusCode)), slog.String("err", err.Error())) return nil, arcError } @@ -858,7 +858,7 @@ func (ArcDefaultHandler) handleError(_ context.Context, transaction *sdkTx.Trans arcError := api.NewErrorFields(status, submitErr.Error()) if transaction != nil { - arcError.Txid = PtrTo(transaction.TxID()) + arcError.Txid = PtrTo(transaction.TxID().String()) } return status, arcError diff --git a/internal/api/handler/default_test.go b/internal/api/handler/default_test.go index 627bde9be..e776aa6a8 100644 --- a/internal/api/handler/default_test.go +++ b/internal/api/handler/default_test.go @@ -616,7 +616,7 @@ func TestPOSTTransaction(t *testing.T) { //nolint:funlen tx, _ := sdkTx.NewTransactionFromBytes(tc.getTx) mt := metamorph.Transaction{ - TxID: tx.TxID(), + TxID: tx.TxID().String(), Bytes: tc.getTx, BlockHeight: 100, } @@ -1054,7 +1054,7 @@ func TestPOSTTransactions(t *testing.T) { //nolint:funlen tx, _ := sdkTx.NewTransactionFromBytes(validTxParentBytes) return []*metamorph.Transaction{ { - TxID: tx.TxID(), + TxID: tx.TxID().String(), Bytes: validTxParentBytes, BlockHeight: 100, }, @@ -1067,7 +1067,7 @@ func TestPOSTTransactions(t *testing.T) { //nolint:funlen var res []*metamorph.TransactionStatus for _, t := range txs { txID := t.TxID() - if status, found := find(txResults, func(e *metamorph.TransactionStatus) bool { return e.TxID == txID }); found { + if status, found := find(txResults, func(e *metamorph.TransactionStatus) bool { return e.TxID == txID.String() }); found { res = append(res, status) } } diff --git a/internal/blocktx/background_workers.go b/internal/blocktx/background_workers.go index d2d4ee44d..da394e8d3 100644 --- a/internal/blocktx/background_workers.go +++ b/internal/blocktx/background_workers.go @@ -6,9 +6,9 @@ import ( "sync" "time" - "github.com/libsv/go-p2p" - + "github.com/bitcoin-sv/arc/internal/blocktx/bcnet/blocktx_p2p" "github.com/bitcoin-sv/arc/internal/blocktx/store" + "github.com/bitcoin-sv/arc/internal/p2p" ) type BackgroundWorkers struct { @@ -41,7 +41,7 @@ func (w *BackgroundWorkers) GracefulStop() { w.logger.Info("Shutdown complete") } -func (w *BackgroundWorkers) StartFillGaps(peers []p2p.PeerI, interval time.Duration, retentionDays int, blockRequestingCh chan<- BlockRequest) { +func (w *BackgroundWorkers) StartFillGaps(peers []p2p.PeerI, interval time.Duration, retentionDays int, blockRequestingCh chan<- blocktx_p2p.BlockRequest) { w.workersWg.Add(1) go func() { @@ -69,7 +69,7 @@ func (w *BackgroundWorkers) StartFillGaps(peers []p2p.PeerI, interval time.Durat }() } -func (w *BackgroundWorkers) fillGaps(peer p2p.PeerI, retentionDays int, blockRequestingCh chan<- BlockRequest) error { +func (w *BackgroundWorkers) fillGaps(peer p2p.PeerI, retentionDays int, blockRequestingCh chan<- blocktx_p2p.BlockRequest) error { const ( hoursPerDay = 24 blocksPerHour = 6 @@ -92,7 +92,7 @@ func (w *BackgroundWorkers) fillGaps(peer p2p.PeerI, retentionDays int, blockReq slog.String("peer", peer.String()), ) - blockRequestingCh <- BlockRequest{ + blockRequestingCh <- blocktx_p2p.BlockRequest{ Hash: block.Hash, Peer: peer, } diff --git a/internal/blocktx/background_workers_test.go b/internal/blocktx/background_workers_test.go index edcdd5c90..f39c5b2e1 100644 --- a/internal/blocktx/background_workers_test.go +++ b/internal/blocktx/background_workers_test.go @@ -8,15 +8,14 @@ import ( "testing" "time" - "github.com/libsv/go-p2p" + "github.com/stretchr/testify/require" "github.com/bitcoin-sv/arc/internal/blocktx" - "github.com/bitcoin-sv/arc/internal/blocktx/mocks" + "github.com/bitcoin-sv/arc/internal/blocktx/bcnet/blocktx_p2p" "github.com/bitcoin-sv/arc/internal/blocktx/store" storeMocks "github.com/bitcoin-sv/arc/internal/blocktx/store/mocks" - - "github.com/stretchr/testify/require" - + "github.com/bitcoin-sv/arc/internal/p2p" + p2pMocks "github.com/bitcoin-sv/arc/internal/p2p/mocks" "github.com/bitcoin-sv/arc/internal/testdata" ) @@ -69,7 +68,7 @@ func TestStartFillGaps(t *testing.T) { // given const fillGapsInterval = 50 * time.Millisecond - blockRequestingCh := make(chan blocktx.BlockRequest, 10) + blockRequestingCh := make(chan blocktx_p2p.BlockRequest, 10) getBlockErrCh := make(chan error) getBlockGapTestErr := tc.getBlockGapsErr @@ -84,7 +83,7 @@ func TestStartFillGaps(t *testing.T) { }, } - peerMock := &mocks.PeerMock{ + peerMock := &p2pMocks.PeerIMock{ StringFunc: func() string { return "" }, diff --git a/internal/blocktx/bcnet/block_message.go b/internal/blocktx/bcnet/block_message.go new file mode 100644 index 000000000..2114465c5 --- /dev/null +++ b/internal/blocktx/bcnet/block_message.go @@ -0,0 +1,30 @@ +package bcnet + +import ( + "io" + + "github.com/libsv/go-p2p/chaincfg/chainhash" + "github.com/libsv/go-p2p/wire" +) + +// BlockMessage only stores the transaction IDs of the block, not the full transactions +type BlockMessage struct { + Hash *chainhash.Hash + Header *wire.BlockHeader + Height uint64 + TransactionHashes []*chainhash.Hash + Size uint64 +} + +func (bm *BlockMessage) Bsvdecode(io.Reader, uint32, wire.MessageEncoding) error { + return nil +} +func (bm *BlockMessage) BsvEncode(io.Writer, uint32, wire.MessageEncoding) error { + return nil +} +func (bm *BlockMessage) Command() string { + return "block" +} +func (bm *BlockMessage) MaxPayloadLength(uint32) uint64 { + return wire.MaxExtMsgPayload +} diff --git a/internal/blocktx/bcnet/blocktx_p2p/message_handler.go b/internal/blocktx/bcnet/blocktx_p2p/message_handler.go new file mode 100644 index 000000000..a4aafbe3d --- /dev/null +++ b/internal/blocktx/bcnet/blocktx_p2p/message_handler.go @@ -0,0 +1,78 @@ +package blocktx_p2p + +import ( + "errors" + "log/slog" + + "github.com/bitcoin-sv/arc/internal/blocktx/bcnet" + "github.com/bitcoin-sv/arc/internal/p2p" + "github.com/libsv/go-p2p/chaincfg/chainhash" + "github.com/libsv/go-p2p/wire" +) + +var ErrUnableToCastWireMessage = errors.New("unable to cast wire.Message to blockchain.BlockMessage") + +type BlockRequest struct { + Hash *chainhash.Hash + Peer p2p.PeerI +} + +var _ p2p.MessageHandlerI = (*MsgHandler)(nil) + +type MsgHandler struct { + logger *slog.Logger + blockRequestingCh chan<- BlockRequest + blockProcessingCh chan<- *bcnet.BlockMessage +} + +func NewMsgHandler(logger *slog.Logger, blockRequestCh chan<- BlockRequest, blockProcessCh chan<- *bcnet.BlockMessage) *MsgHandler { + return &MsgHandler{ + logger: logger.With(slog.String("module", "peer-msg-handler")), + blockRequestingCh: blockRequestCh, + blockProcessingCh: blockProcessCh, + } +} + +// OnReceive handles incoming messages depending on command type +func (h *MsgHandler) OnReceive(msg wire.Message, peer p2p.PeerI) { + cmd := msg.Command() + + switch cmd { + case wire.CmdInv: + invMsg, ok := msg.(*wire.MsgInv) + if !ok { + return + } + + go func() { + for _, iv := range invMsg.InvList { + if iv.Type == wire.InvTypeBlock { + req := BlockRequest{ + Hash: &iv.Hash, + Peer: peer, + } + + h.blockRequestingCh <- req + } + // ignore INV with transaction or error + } + }() + + case wire.CmdBlock: + blockMsg, ok := msg.(*bcnet.BlockMessage) + if !ok { + h.logger.Error("Block msg receive", slog.Any("err", ErrUnableToCastWireMessage)) + return + } + + h.blockProcessingCh <- blockMsg + + default: + // ignore other messages + } +} + +// OnSend handles outgoing messages depending on command type +func (h *MsgHandler) OnSend(_ wire.Message, _ p2p.PeerI) { + // ignore +} diff --git a/internal/blocktx/peer_handler.go b/internal/blocktx/bcnet/init.go similarity index 54% rename from internal/blocktx/peer_handler.go rename to internal/blocktx/bcnet/init.go index 179adeb24..c4f4be1fb 100644 --- a/internal/blocktx/peer_handler.go +++ b/internal/blocktx/bcnet/init.go @@ -1,25 +1,20 @@ -package blocktx +package bcnet import ( "encoding/binary" - "errors" "io" - "log/slog" "github.com/bitcoin-sv/go-sdk/script" sdkTx "github.com/bitcoin-sv/go-sdk/transaction" "github.com/bitcoin-sv/go-sdk/util" - "github.com/libsv/go-p2p" "github.com/libsv/go-p2p/chaincfg/chainhash" "github.com/libsv/go-p2p/wire" ) -var ErrUnableToCastWireMessage = errors.New("unable to cast wire.Message to p2p.BlockMessage") - func init() { // override the default wire block handler with our own that streams and stores only the transaction ids wire.SetExternalHandler(wire.CmdBlock, func(reader io.Reader, _ uint64, bytesRead int) (int, wire.Message, []byte, error) { - blockMessage := &p2p.BlockMessage{ + blockMessage := &BlockMessage{ Header: &wire.BlockHeader{}, } @@ -42,14 +37,14 @@ func init() { var tx *sdkTx.Transaction var hash *chainhash.Hash var txBytes []byte - for i := 0; i < int(txCount); i++ { + for i := uint64(0); i < uint64(txCount); i++ { tx = sdkTx.NewTransaction() read, err = tx.ReadFrom(reader) if err != nil { return bytesRead, nil, nil, err } bytesRead += int(read) - txBytes = tx.TxIDBytes() // this returns the bytes in BigEndian + txBytes = tx.TxID().CloneBytes() // this returns the bytes in BigEndian hash, err = chainhash.NewHash(util.ReverseBytes(txBytes)) if err != nil { return 0, nil, nil, err @@ -62,66 +57,14 @@ func init() { } } - blockMessage.Size = uint64(bytesRead) + blockMessage.Size = uint64(bytesRead) // #nosec G115 + blockHash := blockMessage.Header.BlockHash() + blockMessage.Hash = &blockHash return bytesRead, blockMessage, nil, nil }) } -type PeerHandler struct { - logger *slog.Logger - blockRequestCh chan BlockRequest - blockProcessCh chan *p2p.BlockMessage -} - -func NewPeerHandler(logger *slog.Logger, blockRequestCh chan BlockRequest, blockProcessCh chan *p2p.BlockMessage) *PeerHandler { - return &PeerHandler{ - logger: logger.With(slog.String("module", "peer-handler")), - blockRequestCh: blockRequestCh, - blockProcessCh: blockProcessCh, - } -} - -func (ph *PeerHandler) HandleTransactionsGet(_ []*wire.InvVect, _ p2p.PeerI) ([][]byte, error) { - return nil, nil -} - -func (ph *PeerHandler) HandleTransactionSent(_ *wire.MsgTx, _ p2p.PeerI) error { - return nil -} - -func (ph *PeerHandler) HandleTransactionAnnouncement(_ *wire.InvVect, _ p2p.PeerI) error { - return nil -} - -func (ph *PeerHandler) HandleTransactionRejection(_ *wire.MsgReject, _ p2p.PeerI) error { - return nil -} - -func (ph *PeerHandler) HandleTransaction(_ *wire.MsgTx, _ p2p.PeerI) error { - return nil -} - -func (ph *PeerHandler) HandleBlockAnnouncement(msg *wire.InvVect, peer p2p.PeerI) error { - req := BlockRequest{ - Hash: &msg.Hash, - Peer: peer, - } - - ph.blockRequestCh <- req - return nil -} - -func (ph *PeerHandler) HandleBlock(wireMsg wire.Message, _ p2p.PeerI) error { - msg, ok := wireMsg.(*p2p.BlockMessage) - if !ok { - return ErrUnableToCastWireMessage - } - - ph.blockProcessCh <- msg - return nil -} - func extractHeightFromCoinbaseTx(tx *sdkTx.Transaction) uint64 { // Coinbase tx has a special format, the height is encoded in the first 4 bytes of the scriptSig (BIP-34) diff --git a/internal/blocktx/bcnet/init_test.go b/internal/blocktx/bcnet/init_test.go new file mode 100644 index 000000000..095c94f8f --- /dev/null +++ b/internal/blocktx/bcnet/init_test.go @@ -0,0 +1,33 @@ +package bcnet + +import ( + "testing" + + sdkTx "github.com/bitcoin-sv/go-sdk/transaction" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestExtractHeight(t *testing.T) { + // given + tx, err := sdkTx.NewTransactionFromHex("01000000010000000000000000000000000000000000000000000000000000000000000000ffffffff570350cc0b041547b5630cfabe6d6d0000000000000000000000000000000000000000000000000000000000000000010000000000000047ed20542096bd0000000000143362663865373833636662643732306431383436000000000140be4025000000001976a914c9b0abe09b7dd8e9d1e8c1e3502d32ab0d7119e488ac00000000") + require.NoError(t, err) + + // when + height := extractHeightFromCoinbaseTx(tx) + + // then + assert.Equalf(t, uint64(773200), height, "height should be 773200, got %d", height) +} + +func TestExtractHeightForRegtest(t *testing.T) { + // given + tx, err := sdkTx.NewTransactionFromHex("02000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0502dc070101ffffffff012f500900000000002321032efe256e14fd77eea05d0453374f8920e0a7a4a573bb3937ef3f567f3937129cac00000000") + require.NoError(t, err) + + // when + height := extractHeightFromCoinbaseTx(tx) + + // then + assert.Equalf(t, uint64(2012), height, "height should be 2012, got %d", height) +} diff --git a/internal/blocktx/health_check.go b/internal/blocktx/health_check.go index 72180cab8..f6c556fe8 100644 --- a/internal/blocktx/health_check.go +++ b/internal/blocktx/health_check.go @@ -31,13 +31,7 @@ func (s *Server) Check(ctx context.Context, req *grpc_health_v1.HealthCheckReque } // verify we have at least 1 node connected to blocktx - healthy := false - for _, peer := range s.pm.GetPeers() { - if peer.IsHealthy() && peer.Connected() { - healthy = true - break - } - } + healthy := s.pm.CountConnectedPeers() > 0 if !healthy { s.logger.Error("healthy peer not found") @@ -66,13 +60,7 @@ func (s *Server) Watch(req *grpc_health_v1.HealthCheckRequest, server grpc_healt } // verify we have at least 1 node connected to blocktx - healthy := false - for _, peer := range s.pm.GetPeers() { - if peer.IsHealthy() && peer.Connected() { - healthy = true - break - } - } + healthy := s.pm.CountConnectedPeers() > 0 if !healthy { s.logger.Error("healthy peer not found") diff --git a/internal/blocktx/health_check_test.go b/internal/blocktx/health_check_test.go index faffb0513..bf8abc5fa 100644 --- a/internal/blocktx/health_check_test.go +++ b/internal/blocktx/health_check_test.go @@ -7,7 +7,7 @@ import ( "os" "testing" - "github.com/libsv/go-p2p" + "github.com/bitcoin-sv/arc/internal/p2p" "github.com/stretchr/testify/require" "google.golang.org/grpc/health/grpc_health_v1" diff --git a/internal/blocktx/integration_test/helpers.go b/internal/blocktx/integration_test/helpers.go index 8e34b5b4b..b202cb3e4 100644 --- a/internal/blocktx/integration_test/helpers.go +++ b/internal/blocktx/integration_test/helpers.go @@ -7,11 +7,12 @@ import ( "os" "testing" - "github.com/libsv/go-p2p" "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" "github.com/bitcoin-sv/arc/internal/blocktx" + "github.com/bitcoin-sv/arc/internal/blocktx/bcnet" + "github.com/bitcoin-sv/arc/internal/blocktx/bcnet/blocktx_p2p" "github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api" "github.com/bitcoin-sv/arc/internal/blocktx/store/postgresql" "github.com/bitcoin-sv/arc/pkg/message_queue/nats/client/nats_core" @@ -19,12 +20,12 @@ import ( "github.com/bitcoin-sv/arc/pkg/test_utils" ) -func setupSut(t *testing.T, dbInfo string) (*blocktx.Processor, *blocktx.PeerHandler, *postgresql.PostgreSQL, chan []byte, chan *blocktx_api.TransactionBlock) { +func setupSut(t *testing.T, dbInfo string) (*blocktx.Processor, *blocktx_p2p.MsgHandler, *postgresql.PostgreSQL, chan []byte, chan *blocktx_api.TransactionBlock) { t.Helper() logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) - blockProcessCh := make(chan *p2p.BlockMessage, 10) + blockProcessCh := make(chan *bcnet.BlockMessage, 10) registerTxChannel := make(chan []byte, 10) publishedTxsCh := make(chan *blocktx_api.TransactionBlock, 10) @@ -44,7 +45,7 @@ func setupSut(t *testing.T, dbInfo string) (*blocktx.Processor, *blocktx.PeerHan } mqClient := nats_core.New(mockNatsConn, nats_core.WithLogger(logger)) - p2pMsgHandler := blocktx.NewPeerHandler(logger, nil, blockProcessCh) + p2pMsgHandler := blocktx_p2p.NewMsgHandler(logger, nil, blockProcessCh) processor, err := blocktx.NewProcessor( logger, store, diff --git a/internal/blocktx/integration_test/reorg_integration_test.go b/internal/blocktx/integration_test/reorg_integration_test.go index 5336a2808..cec7372d5 100644 --- a/internal/blocktx/integration_test/reorg_integration_test.go +++ b/internal/blocktx/integration_test/reorg_integration_test.go @@ -32,13 +32,13 @@ import ( "testing" "time" + "github.com/bitcoin-sv/arc/internal/blocktx/bcnet" "github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api" "github.com/bitcoin-sv/arc/pkg/test_utils" _ "github.com/golang-migrate/migrate/v4/source/file" _ "github.com/lib/pq" "github.com/libsv/go-bc" - "github.com/libsv/go-p2p" "github.com/libsv/go-p2p/chaincfg/chainhash" "github.com/libsv/go-p2p/wire" "github.com/stretchr/testify/require" @@ -56,7 +56,7 @@ func TestReorg(t *testing.T) { const blockHash822011 = "bf9be09b345cc2d904b59951cc8a2ed452d8d143e2e25cde64058270fb3a667a" - // blockHash := testutils.RevChainhash(t, blockHash822011) + blockHash := testutils.RevChainhash(t, blockHash822011) prevBlockHash := testutils.RevChainhash(t, "00000000000000000a00c377b260a3219b0c314763f486bc363df7aa7e22ad72") txHash, err := chainhash.NewHashFromStr("be181e91217d5f802f695e52144078f8dfbe51b8a815c3d6fb48c0d853ec683b") require.NoError(t, err) @@ -64,8 +64,8 @@ func TestReorg(t *testing.T) { require.NoError(t, err) // should become LONGEST - blockMessage := &p2p.BlockMessage{ - // Hash: blockHash, + blockMessage := &bcnet.BlockMessage{ + Hash: blockHash, Header: &wire.BlockHeader{ Version: 541065216, PrevBlock: *prevBlockHash, // NON-existent in the db @@ -77,8 +77,7 @@ func TestReorg(t *testing.T) { } processor.StartBlockProcessing() - err = p2pMsgHandler.HandleBlock(blockMessage, nil) - require.NoError(t, err) + p2pMsgHandler.OnReceive(blockMessage, nil) // Allow DB to process the block time.Sleep(200 * time.Millisecond) @@ -100,7 +99,7 @@ func TestReorg(t *testing.T) { txhash822015Competing = "b16cea53fc823e146fbb9ae4ad3124f7c273f30562585ad6e4831495d609f430" ) - // blockHash := testutils.RevChainhash(t, blockHash822015Fork) + blockHash := testutils.RevChainhash(t, blockHash822015Fork) prevBlockHash := testutils.RevChainhash(t, blockHash822014StartOfChain) txHash := testutils.RevChainhash(t, txhash822015) txHash2 := testutils.RevChainhash(t, txhash822015Competing) // should not be published - is already in the longest chain @@ -108,8 +107,8 @@ func TestReorg(t *testing.T) { merkleRoot := treeStore[len(treeStore)-1] // should become STALE - blockMessage := &p2p.BlockMessage{ - // Hash: blockHash, + blockMessage := &bcnet.BlockMessage{ + Hash: blockHash, Header: &wire.BlockHeader{ Version: 541065216, PrevBlock: *prevBlockHash, // block with status LONGEST at height 822014 @@ -121,8 +120,7 @@ func TestReorg(t *testing.T) { } processor.StartBlockProcessing() - err := p2pMsgHandler.HandleBlock(blockMessage, nil) - require.NoError(t, err) + p2pMsgHandler.OnReceive(blockMessage, nil) // Allow DB to process the block time.Sleep(200 * time.Millisecond) @@ -167,8 +165,8 @@ func TestReorg(t *testing.T) { // should become LONGEST // reorg should happen - blockMessage := &p2p.BlockMessage{ - // Hash: blockHash, + blockMessage := &bcnet.BlockMessage{ + Hash: blockHash, Header: &wire.BlockHeader{ Version: 541065216, PrevBlock: *prevhash, // block with status STALE at height 822015 @@ -180,8 +178,7 @@ func TestReorg(t *testing.T) { } processor.StartBlockProcessing() - err := p2pMsgHandler.HandleBlock(blockMessage, nil) - require.NoError(t, err) + p2pMsgHandler.OnReceive(blockMessage, nil) // Allow DB to process the block and perform reorg time.Sleep(1 * time.Second) @@ -248,14 +245,14 @@ func TestReorg(t *testing.T) { blockHash822023Orphan = "0000000000000000082131979a4e25a5101912a5f8461e18f306d23e158161cd" ) - // blockHash := testutils.RevChainhash(t, blockHash822021) + blockHash := testutils.RevChainhash(t, blockHash822021) txHash := testutils.RevChainhash(t, "de0753d9ce6f92e340843cbfdd11e58beff8c578956ecdec4c461b018a26b8a9") merkleRoot := testutils.RevChainhash(t, "de0753d9ce6f92e340843cbfdd11e58beff8c578956ecdec4c461b018a26b8a9") prevhash := testutils.RevChainhash(t, blockHash822020Orphan) // should become STALE - blockMessage := &p2p.BlockMessage{ - // Hash: blockHash, + blockMessage := &bcnet.BlockMessage{ + Hash: blockHash, Header: &wire.BlockHeader{ Version: 541065216, PrevBlock: *prevhash, // block with status ORPHANED at height 822020 - connected to STALE chain @@ -267,8 +264,7 @@ func TestReorg(t *testing.T) { } processor.StartBlockProcessing() - err := p2pMsgHandler.HandleBlock(blockMessage, nil) - require.NoError(t, err) + p2pMsgHandler.OnReceive(blockMessage, nil) // Allow DB to process the block and find orphans time.Sleep(1 * time.Second) @@ -321,15 +317,15 @@ func TestReorg(t *testing.T) { txhash822017 = "ece2b7e40d98749c03c551b783420d6e3fdc3c958244bbf275437839585829a6" ) - // blockHash := testutils.RevChainhash(t, blockHash822021) + blockHash := testutils.RevChainhash(t, blockHash822021) prevhash := testutils.RevChainhash(t, blockHash822020Orphan) txHash := testutils.RevChainhash(t, "3e15f823a7de25c26ce9001d4814a6f0ebc915a1ca4f1ba9cfac720bd941c39c") merkleRoot := testutils.RevChainhash(t, "3e15f823a7de25c26ce9001d4814a6f0ebc915a1ca4f1ba9cfac720bd941c39c") // should become LONGEST // reorg should happen - blockMessage := &p2p.BlockMessage{ - // Hash: blockHash, + blockMessage := &bcnet.BlockMessage{ + Hash: blockHash, Header: &wire.BlockHeader{ Version: 541065216, PrevBlock: *prevhash, // block with status ORPHANED at height 822020 - connected to STALE chain @@ -342,8 +338,7 @@ func TestReorg(t *testing.T) { } processor.StartBlockProcessing() - err := p2pMsgHandler.HandleBlock(blockMessage, nil) - require.NoError(t, err) + p2pMsgHandler.OnReceive(blockMessage, nil) // Allow DB to process the block, find orphans and perform reorg time.Sleep(3 * time.Second) diff --git a/internal/blocktx/interface.go b/internal/blocktx/interface.go deleted file mode 100644 index a17b15f6d..000000000 --- a/internal/blocktx/interface.go +++ /dev/null @@ -1,37 +0,0 @@ -package blocktx - -import ( - "github.com/libsv/go-p2p" - "github.com/libsv/go-p2p/chaincfg/chainhash" - "github.com/libsv/go-p2p/wire" -) - -type BlockRequest struct { - Hash *chainhash.Hash - Peer p2p.PeerI -} - -type Peer interface { - Connected() bool - WriteMsg(msg wire.Message) error - String() string - AnnounceTransaction(txHash *chainhash.Hash) - RequestTransaction(txHash *chainhash.Hash) - AnnounceBlock(blockHash *chainhash.Hash) - RequestBlock(blockHash *chainhash.Hash) - Network() wire.BitcoinNet - IsHealthy() bool - IsUnhealthyCh() <-chan struct{} - Shutdown() - Restart() -} - -type PeerManager interface { - AnnounceTransaction(txHash *chainhash.Hash, peers []p2p.PeerI) []p2p.PeerI - RequestTransaction(txHash *chainhash.Hash) p2p.PeerI - AnnounceBlock(blockHash *chainhash.Hash, peers []p2p.PeerI) []p2p.PeerI - RequestBlock(blockHash *chainhash.Hash) p2p.PeerI - AddPeer(peer p2p.PeerI) error - GetPeers() []p2p.PeerI - Shutdown() -} diff --git a/internal/blocktx/mocks/peer_mock.go b/internal/blocktx/mocks/peer_mock.go deleted file mode 100644 index 6b3f52020..000000000 --- a/internal/blocktx/mocks/peer_mock.go +++ /dev/null @@ -1,512 +0,0 @@ -// Code generated by moq; DO NOT EDIT. -// github.com/matryer/moq - -package mocks - -import ( - "github.com/bitcoin-sv/arc/internal/blocktx" - "github.com/libsv/go-p2p/chaincfg/chainhash" - "github.com/libsv/go-p2p/wire" - "sync" -) - -// Ensure, that PeerMock does implement blocktx.Peer. -// If this is not the case, regenerate this file with moq. -var _ blocktx.Peer = &PeerMock{} - -// PeerMock is a mock implementation of blocktx.Peer. -// -// func TestSomethingThatUsesPeer(t *testing.T) { -// -// // make and configure a mocked blocktx.Peer -// mockedPeer := &PeerMock{ -// AnnounceBlockFunc: func(blockHash *chainhash.Hash) { -// panic("mock out the AnnounceBlock method") -// }, -// AnnounceTransactionFunc: func(txHash *chainhash.Hash) { -// panic("mock out the AnnounceTransaction method") -// }, -// ConnectedFunc: func() bool { -// panic("mock out the Connected method") -// }, -// IsHealthyFunc: func() bool { -// panic("mock out the IsHealthy method") -// }, -// IsUnhealthyChFunc: func() <-chan struct{} { -// panic("mock out the IsUnhealthyCh method") -// }, -// NetworkFunc: func() wire.BitcoinNet { -// panic("mock out the Network method") -// }, -// RequestBlockFunc: func(blockHash *chainhash.Hash) { -// panic("mock out the RequestBlock method") -// }, -// RequestTransactionFunc: func(txHash *chainhash.Hash) { -// panic("mock out the RequestTransaction method") -// }, -// RestartFunc: func() { -// panic("mock out the Restart method") -// }, -// ShutdownFunc: func() { -// panic("mock out the Shutdown method") -// }, -// StringFunc: func() string { -// panic("mock out the String method") -// }, -// WriteMsgFunc: func(msg wire.Message) error { -// panic("mock out the WriteMsg method") -// }, -// } -// -// // use mockedPeer in code that requires blocktx.Peer -// // and then make assertions. -// -// } -type PeerMock struct { - // AnnounceBlockFunc mocks the AnnounceBlock method. - AnnounceBlockFunc func(blockHash *chainhash.Hash) - - // AnnounceTransactionFunc mocks the AnnounceTransaction method. - AnnounceTransactionFunc func(txHash *chainhash.Hash) - - // ConnectedFunc mocks the Connected method. - ConnectedFunc func() bool - - // IsHealthyFunc mocks the IsHealthy method. - IsHealthyFunc func() bool - - // IsUnhealthyChFunc mocks the IsUnhealthyCh method. - IsUnhealthyChFunc func() <-chan struct{} - - // NetworkFunc mocks the Network method. - NetworkFunc func() wire.BitcoinNet - - // RequestBlockFunc mocks the RequestBlock method. - RequestBlockFunc func(blockHash *chainhash.Hash) - - // RequestTransactionFunc mocks the RequestTransaction method. - RequestTransactionFunc func(txHash *chainhash.Hash) - - // RestartFunc mocks the Restart method. - RestartFunc func() - - // ShutdownFunc mocks the Shutdown method. - ShutdownFunc func() - - // StringFunc mocks the String method. - StringFunc func() string - - // WriteMsgFunc mocks the WriteMsg method. - WriteMsgFunc func(msg wire.Message) error - - // calls tracks calls to the methods. - calls struct { - // AnnounceBlock holds details about calls to the AnnounceBlock method. - AnnounceBlock []struct { - // BlockHash is the blockHash argument value. - BlockHash *chainhash.Hash - } - // AnnounceTransaction holds details about calls to the AnnounceTransaction method. - AnnounceTransaction []struct { - // TxHash is the txHash argument value. - TxHash *chainhash.Hash - } - // Connected holds details about calls to the Connected method. - Connected []struct { - } - // IsHealthy holds details about calls to the IsHealthy method. - IsHealthy []struct { - } - // IsUnhealthyCh holds details about calls to the IsUnhealthyCh method. - IsUnhealthyCh []struct { - } - // Network holds details about calls to the Network method. - Network []struct { - } - // RequestBlock holds details about calls to the RequestBlock method. - RequestBlock []struct { - // BlockHash is the blockHash argument value. - BlockHash *chainhash.Hash - } - // RequestTransaction holds details about calls to the RequestTransaction method. - RequestTransaction []struct { - // TxHash is the txHash argument value. - TxHash *chainhash.Hash - } - // Restart holds details about calls to the Restart method. - Restart []struct { - } - // Shutdown holds details about calls to the Shutdown method. - Shutdown []struct { - } - // String holds details about calls to the String method. - String []struct { - } - // WriteMsg holds details about calls to the WriteMsg method. - WriteMsg []struct { - // Msg is the msg argument value. - Msg wire.Message - } - } - lockAnnounceBlock sync.RWMutex - lockAnnounceTransaction sync.RWMutex - lockConnected sync.RWMutex - lockIsHealthy sync.RWMutex - lockIsUnhealthyCh sync.RWMutex - lockNetwork sync.RWMutex - lockRequestBlock sync.RWMutex - lockRequestTransaction sync.RWMutex - lockRestart sync.RWMutex - lockShutdown sync.RWMutex - lockString sync.RWMutex - lockWriteMsg sync.RWMutex -} - -// AnnounceBlock calls AnnounceBlockFunc. -func (mock *PeerMock) AnnounceBlock(blockHash *chainhash.Hash) { - if mock.AnnounceBlockFunc == nil { - panic("PeerMock.AnnounceBlockFunc: method is nil but Peer.AnnounceBlock was just called") - } - callInfo := struct { - BlockHash *chainhash.Hash - }{ - BlockHash: blockHash, - } - mock.lockAnnounceBlock.Lock() - mock.calls.AnnounceBlock = append(mock.calls.AnnounceBlock, callInfo) - mock.lockAnnounceBlock.Unlock() - mock.AnnounceBlockFunc(blockHash) -} - -// AnnounceBlockCalls gets all the calls that were made to AnnounceBlock. -// Check the length with: -// -// len(mockedPeer.AnnounceBlockCalls()) -func (mock *PeerMock) AnnounceBlockCalls() []struct { - BlockHash *chainhash.Hash -} { - var calls []struct { - BlockHash *chainhash.Hash - } - mock.lockAnnounceBlock.RLock() - calls = mock.calls.AnnounceBlock - mock.lockAnnounceBlock.RUnlock() - return calls -} - -// AnnounceTransaction calls AnnounceTransactionFunc. -func (mock *PeerMock) AnnounceTransaction(txHash *chainhash.Hash) { - if mock.AnnounceTransactionFunc == nil { - panic("PeerMock.AnnounceTransactionFunc: method is nil but Peer.AnnounceTransaction was just called") - } - callInfo := struct { - TxHash *chainhash.Hash - }{ - TxHash: txHash, - } - mock.lockAnnounceTransaction.Lock() - mock.calls.AnnounceTransaction = append(mock.calls.AnnounceTransaction, callInfo) - mock.lockAnnounceTransaction.Unlock() - mock.AnnounceTransactionFunc(txHash) -} - -// AnnounceTransactionCalls gets all the calls that were made to AnnounceTransaction. -// Check the length with: -// -// len(mockedPeer.AnnounceTransactionCalls()) -func (mock *PeerMock) AnnounceTransactionCalls() []struct { - TxHash *chainhash.Hash -} { - var calls []struct { - TxHash *chainhash.Hash - } - mock.lockAnnounceTransaction.RLock() - calls = mock.calls.AnnounceTransaction - mock.lockAnnounceTransaction.RUnlock() - return calls -} - -// Connected calls ConnectedFunc. -func (mock *PeerMock) Connected() bool { - if mock.ConnectedFunc == nil { - panic("PeerMock.ConnectedFunc: method is nil but Peer.Connected was just called") - } - callInfo := struct { - }{} - mock.lockConnected.Lock() - mock.calls.Connected = append(mock.calls.Connected, callInfo) - mock.lockConnected.Unlock() - return mock.ConnectedFunc() -} - -// ConnectedCalls gets all the calls that were made to Connected. -// Check the length with: -// -// len(mockedPeer.ConnectedCalls()) -func (mock *PeerMock) ConnectedCalls() []struct { -} { - var calls []struct { - } - mock.lockConnected.RLock() - calls = mock.calls.Connected - mock.lockConnected.RUnlock() - return calls -} - -// IsHealthy calls IsHealthyFunc. -func (mock *PeerMock) IsHealthy() bool { - if mock.IsHealthyFunc == nil { - panic("PeerMock.IsHealthyFunc: method is nil but Peer.IsHealthy was just called") - } - callInfo := struct { - }{} - mock.lockIsHealthy.Lock() - mock.calls.IsHealthy = append(mock.calls.IsHealthy, callInfo) - mock.lockIsHealthy.Unlock() - return mock.IsHealthyFunc() -} - -// IsHealthyCalls gets all the calls that were made to IsHealthy. -// Check the length with: -// -// len(mockedPeer.IsHealthyCalls()) -func (mock *PeerMock) IsHealthyCalls() []struct { -} { - var calls []struct { - } - mock.lockIsHealthy.RLock() - calls = mock.calls.IsHealthy - mock.lockIsHealthy.RUnlock() - return calls -} - -// IsUnhealthyCh calls IsUnhealthyChFunc. -func (mock *PeerMock) IsUnhealthyCh() <-chan struct{} { - if mock.IsUnhealthyChFunc == nil { - panic("PeerMock.IsUnhealthyChFunc: method is nil but Peer.IsUnhealthyCh was just called") - } - callInfo := struct { - }{} - mock.lockIsUnhealthyCh.Lock() - mock.calls.IsUnhealthyCh = append(mock.calls.IsUnhealthyCh, callInfo) - mock.lockIsUnhealthyCh.Unlock() - return mock.IsUnhealthyChFunc() -} - -// IsUnhealthyChCalls gets all the calls that were made to IsUnhealthyCh. -// Check the length with: -// -// len(mockedPeer.IsUnhealthyChCalls()) -func (mock *PeerMock) IsUnhealthyChCalls() []struct { -} { - var calls []struct { - } - mock.lockIsUnhealthyCh.RLock() - calls = mock.calls.IsUnhealthyCh - mock.lockIsUnhealthyCh.RUnlock() - return calls -} - -// Network calls NetworkFunc. -func (mock *PeerMock) Network() wire.BitcoinNet { - if mock.NetworkFunc == nil { - panic("PeerMock.NetworkFunc: method is nil but Peer.Network was just called") - } - callInfo := struct { - }{} - mock.lockNetwork.Lock() - mock.calls.Network = append(mock.calls.Network, callInfo) - mock.lockNetwork.Unlock() - return mock.NetworkFunc() -} - -// NetworkCalls gets all the calls that were made to Network. -// Check the length with: -// -// len(mockedPeer.NetworkCalls()) -func (mock *PeerMock) NetworkCalls() []struct { -} { - var calls []struct { - } - mock.lockNetwork.RLock() - calls = mock.calls.Network - mock.lockNetwork.RUnlock() - return calls -} - -// RequestBlock calls RequestBlockFunc. -func (mock *PeerMock) RequestBlock(blockHash *chainhash.Hash) { - if mock.RequestBlockFunc == nil { - panic("PeerMock.RequestBlockFunc: method is nil but Peer.RequestBlock was just called") - } - callInfo := struct { - BlockHash *chainhash.Hash - }{ - BlockHash: blockHash, - } - mock.lockRequestBlock.Lock() - mock.calls.RequestBlock = append(mock.calls.RequestBlock, callInfo) - mock.lockRequestBlock.Unlock() - mock.RequestBlockFunc(blockHash) -} - -// RequestBlockCalls gets all the calls that were made to RequestBlock. -// Check the length with: -// -// len(mockedPeer.RequestBlockCalls()) -func (mock *PeerMock) RequestBlockCalls() []struct { - BlockHash *chainhash.Hash -} { - var calls []struct { - BlockHash *chainhash.Hash - } - mock.lockRequestBlock.RLock() - calls = mock.calls.RequestBlock - mock.lockRequestBlock.RUnlock() - return calls -} - -// RequestTransaction calls RequestTransactionFunc. -func (mock *PeerMock) RequestTransaction(txHash *chainhash.Hash) { - if mock.RequestTransactionFunc == nil { - panic("PeerMock.RequestTransactionFunc: method is nil but Peer.RequestTransaction was just called") - } - callInfo := struct { - TxHash *chainhash.Hash - }{ - TxHash: txHash, - } - mock.lockRequestTransaction.Lock() - mock.calls.RequestTransaction = append(mock.calls.RequestTransaction, callInfo) - mock.lockRequestTransaction.Unlock() - mock.RequestTransactionFunc(txHash) -} - -// RequestTransactionCalls gets all the calls that were made to RequestTransaction. -// Check the length with: -// -// len(mockedPeer.RequestTransactionCalls()) -func (mock *PeerMock) RequestTransactionCalls() []struct { - TxHash *chainhash.Hash -} { - var calls []struct { - TxHash *chainhash.Hash - } - mock.lockRequestTransaction.RLock() - calls = mock.calls.RequestTransaction - mock.lockRequestTransaction.RUnlock() - return calls -} - -// Restart calls RestartFunc. -func (mock *PeerMock) Restart() { - if mock.RestartFunc == nil { - panic("PeerMock.RestartFunc: method is nil but Peer.Restart was just called") - } - callInfo := struct { - }{} - mock.lockRestart.Lock() - mock.calls.Restart = append(mock.calls.Restart, callInfo) - mock.lockRestart.Unlock() - mock.RestartFunc() -} - -// RestartCalls gets all the calls that were made to Restart. -// Check the length with: -// -// len(mockedPeer.RestartCalls()) -func (mock *PeerMock) RestartCalls() []struct { -} { - var calls []struct { - } - mock.lockRestart.RLock() - calls = mock.calls.Restart - mock.lockRestart.RUnlock() - return calls -} - -// Shutdown calls ShutdownFunc. -func (mock *PeerMock) Shutdown() { - if mock.ShutdownFunc == nil { - panic("PeerMock.ShutdownFunc: method is nil but Peer.Shutdown was just called") - } - callInfo := struct { - }{} - mock.lockShutdown.Lock() - mock.calls.Shutdown = append(mock.calls.Shutdown, callInfo) - mock.lockShutdown.Unlock() - mock.ShutdownFunc() -} - -// ShutdownCalls gets all the calls that were made to Shutdown. -// Check the length with: -// -// len(mockedPeer.ShutdownCalls()) -func (mock *PeerMock) ShutdownCalls() []struct { -} { - var calls []struct { - } - mock.lockShutdown.RLock() - calls = mock.calls.Shutdown - mock.lockShutdown.RUnlock() - return calls -} - -// String calls StringFunc. -func (mock *PeerMock) String() string { - if mock.StringFunc == nil { - panic("PeerMock.StringFunc: method is nil but Peer.String was just called") - } - callInfo := struct { - }{} - mock.lockString.Lock() - mock.calls.String = append(mock.calls.String, callInfo) - mock.lockString.Unlock() - return mock.StringFunc() -} - -// StringCalls gets all the calls that were made to String. -// Check the length with: -// -// len(mockedPeer.StringCalls()) -func (mock *PeerMock) StringCalls() []struct { -} { - var calls []struct { - } - mock.lockString.RLock() - calls = mock.calls.String - mock.lockString.RUnlock() - return calls -} - -// WriteMsg calls WriteMsgFunc. -func (mock *PeerMock) WriteMsg(msg wire.Message) error { - if mock.WriteMsgFunc == nil { - panic("PeerMock.WriteMsgFunc: method is nil but Peer.WriteMsg was just called") - } - callInfo := struct { - Msg wire.Message - }{ - Msg: msg, - } - mock.lockWriteMsg.Lock() - mock.calls.WriteMsg = append(mock.calls.WriteMsg, callInfo) - mock.lockWriteMsg.Unlock() - return mock.WriteMsgFunc(msg) -} - -// WriteMsgCalls gets all the calls that were made to WriteMsg. -// Check the length with: -// -// len(mockedPeer.WriteMsgCalls()) -func (mock *PeerMock) WriteMsgCalls() []struct { - Msg wire.Message -} { - var calls []struct { - Msg wire.Message - } - mock.lockWriteMsg.RLock() - calls = mock.calls.WriteMsg - mock.lockWriteMsg.RUnlock() - return calls -} diff --git a/internal/blocktx/processor.go b/internal/blocktx/processor.go index 3630c086a..10877f7f8 100644 --- a/internal/blocktx/processor.go +++ b/internal/blocktx/processor.go @@ -14,12 +14,13 @@ import ( "time" "github.com/libsv/go-bc" - "github.com/libsv/go-p2p" "github.com/libsv/go-p2p/chaincfg/chainhash" "github.com/libsv/go-p2p/wire" "go.opentelemetry.io/otel/attribute" "golang.org/x/sync/errgroup" + "github.com/bitcoin-sv/arc/internal/blocktx/bcnet" + "github.com/bitcoin-sv/arc/internal/blocktx/bcnet/blocktx_p2p" "github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api" "github.com/bitcoin-sv/arc/internal/blocktx/store" "github.com/bitcoin-sv/arc/pkg/tracing" @@ -52,8 +53,8 @@ const ( type Processor struct { hostname string - blockRequestCh chan BlockRequest - blockProcessCh chan *p2p.BlockMessage + blockRequestCh chan blocktx_p2p.BlockRequest + blockProcessCh chan *bcnet.BlockMessage store store.BlocktxStore logger *slog.Logger transactionStorageBatchSize int @@ -80,8 +81,8 @@ type Processor struct { func NewProcessor( logger *slog.Logger, storeI store.BlocktxStore, - blockRequestCh chan BlockRequest, - blockProcessCh chan *p2p.BlockMessage, + blockRequestCh chan blocktx_p2p.BlockRequest, + blockProcessCh chan *bcnet.BlockMessage, opts ...func(*Processor), ) (*Processor, error) { hostname, err := os.Hostname() @@ -170,7 +171,7 @@ func (p *Processor) StartBlockRequesting() { p.logger.Info("Sending block request", slog.String("hash", hash.String())) msg := wire.NewMsgGetDataSizeHint(1) _ = msg.AddInvVect(wire.NewInvVect(wire.InvTypeBlock, hash)) // ignore error at this point - _ = peer.WriteMsg(msg) + peer.WriteMsg(msg) p.logger.Info("Block request message sent to peer", slog.String("hash", hash.String()), slog.String("peer", peer.String())) } @@ -192,7 +193,7 @@ func (p *Processor) StartBlockProcessing() { var err error timeStart := time.Now() - hash := blockMsg.Header.BlockHash() + hash := blockMsg.Hash p.logger.Info("received block", slog.String("hash", hash.String())) @@ -202,7 +203,7 @@ func (p *Processor) StartBlockProcessing() { continue } - storeErr := p.store.MarkBlockAsDone(p.ctx, &hash, blockMsg.Size, uint64(len(blockMsg.TransactionHashes))) + storeErr := p.store.MarkBlockAsDone(p.ctx, hash, blockMsg.Size, uint64(len(blockMsg.TransactionHashes))) if storeErr != nil { p.logger.Error("unable to mark block as processed", slog.String("hash", hash.String()), slog.String("err", storeErr.Error())) continue @@ -307,11 +308,11 @@ func (p *Processor) registerTransactions(txHashes [][]byte) { } } -func (p *Processor) processBlock(blockMsg *p2p.BlockMessage) (err error) { +func (p *Processor) processBlock(blockMsg *bcnet.BlockMessage) (err error) { ctx := p.ctx var block *blocktx_api.Block - blockHash := blockMsg.Header.BlockHash() + blockHash := blockMsg.Hash ctx, span := tracing.StartTracing(ctx, "processBlock", p.tracingEnabled, p.tracingAttributes...) defer func() { @@ -328,7 +329,7 @@ func (p *Processor) processBlock(blockMsg *p2p.BlockMessage) (err error) { p.logger.Info("processing incoming block", slog.String("hash", blockHash.String()), slog.Uint64("height", blockMsg.Height)) // check if we've already processed that block - existingBlock, _ := p.store.GetBlock(ctx, &blockHash) + existingBlock, _ := p.store.GetBlock(ctx, blockHash) if existingBlock != nil { p.logger.Warn("ignoring already existing block", slog.String("hash", blockHash.String()), slog.Uint64("height", blockMsg.Height)) @@ -371,13 +372,13 @@ func (p *Processor) processBlock(blockMsg *p2p.BlockMessage) (err error) { return nil } -func (p *Processor) verifyAndInsertBlock(ctx context.Context, blockMsg *p2p.BlockMessage) (incomingBlock *blocktx_api.Block, err error) { +func (p *Processor) verifyAndInsertBlock(ctx context.Context, blockMsg *bcnet.BlockMessage) (incomingBlock *blocktx_api.Block, err error) { ctx, span := tracing.StartTracing(ctx, "verifyAndInsertBlock", p.tracingEnabled, p.tracingAttributes...) defer func() { tracing.EndTracing(span, err) }() - blockHash := blockMsg.Header.BlockHash() + blockHash := blockMsg.Hash previousBlockHash := blockMsg.Header.PrevBlock merkleRoot := blockMsg.Header.MerkleRoot diff --git a/internal/blocktx/processor_test.go b/internal/blocktx/processor_test.go index 5f81465c9..dbef6fde6 100644 --- a/internal/blocktx/processor_test.go +++ b/internal/blocktx/processor_test.go @@ -9,17 +9,19 @@ import ( "testing" "time" - "github.com/libsv/go-p2p" "github.com/libsv/go-p2p/chaincfg/chainhash" "github.com/libsv/go-p2p/wire" "github.com/stretchr/testify/require" "google.golang.org/protobuf/reflect/protoreflect" "github.com/bitcoin-sv/arc/internal/blocktx" + "github.com/bitcoin-sv/arc/internal/blocktx/bcnet" + "github.com/bitcoin-sv/arc/internal/blocktx/bcnet/blocktx_p2p" "github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api" "github.com/bitcoin-sv/arc/internal/blocktx/mocks" "github.com/bitcoin-sv/arc/internal/blocktx/store" storeMocks "github.com/bitcoin-sv/arc/internal/blocktx/store/mocks" + p2p_mocks "github.com/bitcoin-sv/arc/internal/p2p/mocks" "github.com/bitcoin-sv/arc/internal/testdata" ) @@ -198,14 +200,15 @@ func TestHandleBlock(t *testing.T) { } logger := slog.Default() - blockProcessCh := make(chan *p2p.BlockMessage, 1) - p2pMsgHandler := blocktx.NewPeerHandler(logger, nil, blockProcessCh) + blockProcessCh := make(chan *bcnet.BlockMessage, 1) + p2pMsgHandler := blocktx_p2p.NewMsgHandler(logger, nil, blockProcessCh) sut, err := blocktx.NewProcessor(logger, storeMock, nil, blockProcessCh, blocktx.WithTransactionBatchSize(batchSize), blocktx.WithMessageQueueClient(mq)) require.NoError(t, err) - blockMessage := &p2p.BlockMessage{ + blockMessage := &bcnet.BlockMessage{ // Hash: testdata.Block1Hash, + Hash: testdata.Block1Hash, Header: &wire.BlockHeader{ Version: 541065216, PrevBlock: tc.prevBlockHash, @@ -222,8 +225,7 @@ func TestHandleBlock(t *testing.T) { sut.StartBlockProcessing() // simulate receiving block from node - err = p2pMsgHandler.HandleBlock(blockMessage, &mocks.PeerMock{StringFunc: func() string { return "peer" }}) - require.NoError(t, err) + p2pMsgHandler.OnReceive(blockMessage, &p2p_mocks.PeerIMock{StringFunc: func() string { return "peer" }}) var actualInsertedBlockTransactions []string time.Sleep(20 * time.Millisecond) @@ -439,8 +441,8 @@ func TestHandleBlockReorgAndOrphans(t *testing.T) { // build peer manager and processor logger := slog.Default() - blockProcessCh := make(chan *p2p.BlockMessage, 10) - p2pMsgHandler := blocktx.NewPeerHandler(logger, nil, blockProcessCh) + blockProcessCh := make(chan *bcnet.BlockMessage, 10) + p2pMsgHandler := blocktx_p2p.NewMsgHandler(logger, nil, blockProcessCh) sut, err := blocktx.NewProcessor(logger, storeMock, nil, blockProcessCh) require.NoError(t, err) @@ -450,8 +452,8 @@ func TestHandleBlockReorgAndOrphans(t *testing.T) { merkleRoot, err := chainhash.NewHashFromStr("be181e91217d5f802f695e52144078f8dfbe51b8a815c3d6fb48c0d853ec683b") require.NoError(t, err) - blockMessage := &p2p.BlockMessage{ - // Hash: testdata.Block1Hash, + blockMessage := &bcnet.BlockMessage{ + Hash: testdata.Block1Hash, Header: &wire.BlockHeader{ Version: 541065216, MerkleRoot: *merkleRoot, @@ -465,8 +467,7 @@ func TestHandleBlockReorgAndOrphans(t *testing.T) { sut.StartBlockProcessing() // simulate receiving block from node - err = p2pMsgHandler.HandleBlock(blockMessage, nil) - require.NoError(t, err) + p2pMsgHandler.OnReceive(blockMessage, nil) // then time.Sleep(20 * time.Millisecond) @@ -596,18 +597,18 @@ func TestStartBlockRequesting(t *testing.T) { }, } - peerMock := &mocks.PeerMock{ - WriteMsgFunc: func(_ wire.Message) error { return nil }, + peerMock := &p2p_mocks.PeerIMock{ + WriteMsgFunc: func(_ wire.Message) {}, StringFunc: func() string { return "peer" }, } // build peer manager logger := slog.Default() - blockRequestCh := make(chan blocktx.BlockRequest, 10) - blockProcessCh := make(chan *p2p.BlockMessage, 10) + blockRequestCh := make(chan blocktx_p2p.BlockRequest, 10) + blockProcessCh := make(chan *bcnet.BlockMessage, 10) - peerHandler := blocktx.NewPeerHandler(logger, blockRequestCh, blockProcessCh) + peerHandler := blocktx_p2p.NewMsgHandler(logger, blockRequestCh, blockProcessCh) sut, err := blocktx.NewProcessor(logger, storeMock, blockRequestCh, blockProcessCh) require.NoError(t, err) @@ -619,8 +620,7 @@ func TestStartBlockRequesting(t *testing.T) { invMsg := wire.NewMsgInvSizeHint(1) err = invMsg.AddInvVect(wire.NewInvVect(wire.InvTypeBlock, blockHash)) require.NoError(t, err) - err = peerHandler.HandleBlockAnnouncement(invMsg.InvList[0], peerMock) - require.NoError(t, err) + peerHandler.OnReceive(invMsg, peerMock) time.Sleep(200 * time.Millisecond) diff --git a/internal/blocktx/server.go b/internal/blocktx/server.go index 6a5b99193..9e58a26db 100644 --- a/internal/blocktx/server.go +++ b/internal/blocktx/server.go @@ -9,12 +9,11 @@ import ( "google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/timestamppb" - "github.com/libsv/go-p2p" - "github.com/bitcoin-sv/arc/config" "github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api" "github.com/bitcoin-sv/arc/internal/blocktx/store" "github.com/bitcoin-sv/arc/internal/grpc_opts" + "github.com/bitcoin-sv/arc/internal/p2p" ) // Server type carries the logger within it. @@ -23,14 +22,14 @@ type Server struct { grpc_opts.GrpcServer logger *slog.Logger - pm p2p.PeerManagerI + pm *p2p.PeerManager store store.BlocktxStore maxAllowedBlockHeightMismatch int } // NewServer will return a server instance with the logger stored within it. func NewServer(prometheusEndpoint string, maxMsgSize int, logger *slog.Logger, - store store.BlocktxStore, pm p2p.PeerManagerI, maxAllowedBlockHeightMismatch int, tracingConfig *config.TracingConfig) (*Server, error) { + store store.BlocktxStore, pm *p2p.PeerManager, maxAllowedBlockHeightMismatch int, tracingConfig *config.TracingConfig) (*Server, error) { logger = logger.With(slog.String("module", "server")) grpcServer, err := grpc_opts.NewGrpcServer(logger, "blocktx", prometheusEndpoint, maxMsgSize, tracingConfig) diff --git a/internal/blocktx/server_test.go b/internal/blocktx/server_test.go index 9c479f20e..7010b7435 100644 --- a/internal/blocktx/server_test.go +++ b/internal/blocktx/server_test.go @@ -6,11 +6,11 @@ import ( "testing" "time" - "github.com/libsv/go-p2p" "github.com/stretchr/testify/require" "github.com/bitcoin-sv/arc/internal/blocktx" storeMocks "github.com/bitcoin-sv/arc/internal/blocktx/store/mocks" + "github.com/bitcoin-sv/arc/internal/p2p" ) func TestListenAndServe(t *testing.T) { diff --git a/internal/broadcaster/broadcaster_test.go b/internal/broadcaster/broadcaster_test.go index 1df449799..045849e0e 100644 --- a/internal/broadcaster/broadcaster_test.go +++ b/internal/broadcaster/broadcaster_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/bitcoin-sv/go-sdk/chainhash" "github.com/bitcoin-sv/go-sdk/script" sdkTx "github.com/bitcoin-sv/go-sdk/transaction" chaincfg "github.com/bitcoin-sv/go-sdk/transaction/chaincfg" @@ -19,6 +20,8 @@ import ( ) func TestBroadcaster(t *testing.T) { + hash1, _ := chainhash.NewHash([]byte("sample-txid-1")) + hash2, _ := chainhash.NewHash([]byte("sample-txid-2")) // given mockedUtxoClient := &mocks.UtxoClientMock{ GetBalanceFunc: func(_ context.Context, _ string) (int64, int64, error) { @@ -30,13 +33,13 @@ func TestBroadcaster(t *testing.T) { GetUTXOsFunc: func(_ context.Context, lockingScript *script.Script, _ string) (sdkTx.UTXOs, error) { return sdkTx.UTXOs{ { - TxID: []byte("sample-txid-1"), + TxID: hash1, Vout: 0, LockingScript: lockingScript, Satoshis: 1000, }, { - TxID: []byte("sample-txid-2"), + TxID: hash2, Vout: 1, LockingScript: lockingScript, Satoshis: 1000, @@ -46,13 +49,13 @@ func TestBroadcaster(t *testing.T) { GetUTXOsWithRetriesFunc: func(_ context.Context, lockingScript *script.Script, _ string, _ time.Duration, _ uint64) (sdkTx.UTXOs, error) { return sdkTx.UTXOs{ { - TxID: []byte("sample-txid-1"), + TxID: hash1, Vout: 0, LockingScript: lockingScript, Satoshis: 1000, }, { - TxID: []byte("sample-txid-2"), + TxID: hash2, Vout: 1, LockingScript: lockingScript, Satoshis: 1000, diff --git a/internal/broadcaster/rate_broadcaster.go b/internal/broadcaster/rate_broadcaster.go index 1f13f1923..ba479c55a 100644 --- a/internal/broadcaster/rate_broadcaster.go +++ b/internal/broadcaster/rate_broadcaster.go @@ -13,6 +13,7 @@ import ( "sync/atomic" "time" + "github.com/bitcoin-sv/go-sdk/chainhash" sdkTx "github.com/bitcoin-sv/go-sdk/transaction" "github.com/bitcoin-sv/arc/internal/metamorph/metamorph_api" @@ -315,9 +316,14 @@ func (b *UTXORateBroadcaster) broadcastBatchAsync(txs sdkTx.Transactions, errCh sat, found := b.satoshiMap.Load(res.Txid) satoshis, isValid := sat.(uint64) + hash, _ := chainhash.NewHash(txIDBytes) + if err != nil { + b.logger.Error("failed to create chainhash txid", slog.String("err", err.Error())) + } + if found && isValid { newUtxo := &sdkTx.UTXO{ - TxID: txIDBytes, + TxID: hash, Vout: 0, LockingScript: b.ks.Script, Satoshis: satoshis, diff --git a/internal/broadcaster/rate_broadcaster_test.go b/internal/broadcaster/rate_broadcaster_test.go index c89fcaa79..95d70ed05 100644 --- a/internal/broadcaster/rate_broadcaster_test.go +++ b/internal/broadcaster/rate_broadcaster_test.go @@ -12,6 +12,7 @@ import ( "github.com/bitcoin-sv/arc/internal/broadcaster/mocks" "github.com/bitcoin-sv/arc/internal/metamorph/metamorph_api" "github.com/bitcoin-sv/arc/pkg/keyset" + "github.com/bitcoin-sv/go-sdk/chainhash" "github.com/bitcoin-sv/go-sdk/script" sdkTx "github.com/bitcoin-sv/go-sdk/transaction" chaincfg "github.com/bitcoin-sv/go-sdk/transaction/chaincfg" @@ -58,6 +59,8 @@ func TestRateBroadcaster(t *testing.T) { }, } + hash1, _ := chainhash.NewHash([]byte("sample-txid-1")) + hash2, _ := chainhash.NewHash([]byte("sample-txid-2")) for _, tc := range tt { t.Run(tc.name, func(t *testing.T) { // given @@ -72,13 +75,13 @@ func TestRateBroadcaster(t *testing.T) { return sdkTx.UTXOs{ { - TxID: []byte("sample-txid-1"), + TxID: hash1, Vout: 0, LockingScript: lockingScript, Satoshis: 1000, }, { - TxID: []byte("sample-txid-2"), + TxID: hash2, Vout: 1, LockingScript: lockingScript, Satoshis: 1000, @@ -97,7 +100,7 @@ func TestRateBroadcaster(t *testing.T) { var statuses []*metamorph_api.TransactionStatus for _, tx := range txs { statuses = append(statuses, &metamorph_api.TransactionStatus{ - Txid: tx.TxID(), + Txid: tx.TxID().String(), Status: metamorph_api.Status_SEEN_ON_NETWORK, }) } diff --git a/internal/broadcaster/utxo_consolidator.go b/internal/broadcaster/utxo_consolidator.go index 8b56d7340..010e16c74 100644 --- a/internal/broadcaster/utxo_consolidator.go +++ b/internal/broadcaster/utxo_consolidator.go @@ -10,6 +10,7 @@ import ( "sync" "time" + "github.com/bitcoin-sv/go-sdk/chainhash" sdkTx "github.com/bitcoin-sv/go-sdk/transaction" "github.com/bitcoin-sv/arc/internal/metamorph/metamorph_api" @@ -122,7 +123,7 @@ func (b *UTXOConsolidator) Start(txsRateTxsPerMinute int) error { res.Status != metamorph_api.Status_SENT_TO_NETWORK { b.logger.Error("consolidation tx was not successful", slog.String("status", res.Status.String()), slog.String("hash", res.Txid), slog.String("reason", res.RejectReason)) for _, tx := range batch { - if tx.TxID() == res.Txid { + if tx.TxID().String() == res.Txid { b.logger.Debug(tx.String()) break } @@ -135,9 +136,14 @@ func (b *UTXOConsolidator) Start(txsRateTxsPerMinute int) error { b.logger.Error("failed to decode txid", slog.String("err", err.Error())) return } + hash, err := chainhash.NewHash(txIDBytes) + if err != nil { + b.logger.Error("failed to create chainhash txid", slog.String("err", err.Error())) + return + } newUtxo := &sdkTx.UTXO{ - TxID: txIDBytes, + TxID: hash, Vout: 0, LockingScript: b.keySet.Script, Satoshis: satoshiMap[res.Txid], @@ -184,7 +190,7 @@ func (b *UTXOConsolidator) createConsolidationTxs(utxoSet *list.List, satoshiMap } txsConsolidation = append(txsConsolidation, tx) - satoshiMap[tx.TxID()] = tx.TotalOutputSatoshis() + satoshiMap[tx.TxID().String()] = tx.TotalOutputSatoshis() } if len(txsConsolidation) > 0 { @@ -206,7 +212,7 @@ func (b *UTXOConsolidator) createConsolidationTxs(utxoSet *list.List, satoshiMap txsConsolidation = append(txsConsolidation, tx) - satoshiMap[tx.TxID()] = tx.TotalOutputSatoshis() + satoshiMap[tx.TxID().String()] = tx.TotalOutputSatoshis() tx = sdkTx.NewTransaction() txSatoshis = 0 } diff --git a/internal/broadcaster/utxo_consolidator_test.go b/internal/broadcaster/utxo_consolidator_test.go index b86fb32d1..d5ae5c7cb 100644 --- a/internal/broadcaster/utxo_consolidator_test.go +++ b/internal/broadcaster/utxo_consolidator_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/bitcoin-sv/go-sdk/chainhash" "github.com/bitcoin-sv/go-sdk/script" sdkTx "github.com/bitcoin-sv/go-sdk/transaction" chaincfg "github.com/bitcoin-sv/go-sdk/transaction/chaincfg" @@ -24,26 +25,31 @@ func TestStart(t *testing.T) { ks, err := keyset.New(&chaincfg.MainNet) require.NoError(t, err) + hash1, _ := chainhash.NewHash(testdata.TX1Hash[:]) + hash2, _ := chainhash.NewHash(testdata.TX2Hash[:]) + hash3, _ := chainhash.NewHash(testdata.TX3Hash[:]) + hash4, _ := chainhash.NewHash(testdata.TX4Hash[:]) + utxo1 := &sdkTx.UTXO{ - TxID: testdata.TX1Hash[:], + TxID: hash1, Vout: 0, LockingScript: ks.Script, Satoshis: 1000, } utxo2 := &sdkTx.UTXO{ - TxID: testdata.TX2Hash[:], + TxID: hash2, Vout: 0, LockingScript: ks.Script, Satoshis: 1000, } utxo3 := &sdkTx.UTXO{ - TxID: testdata.TX3Hash[:], + TxID: hash3, Vout: 0, LockingScript: ks.Script, Satoshis: 1000, } utxo4 := &sdkTx.UTXO{ - TxID: testdata.TX4Hash[:], + TxID: hash4, Vout: 0, LockingScript: ks.Script, Satoshis: 1000, @@ -111,7 +117,7 @@ func TestStart(t *testing.T) { for _, tx := range txs { statuses = append(statuses, &metamorph_api.TransactionStatus{ - Txid: tx.TxID(), + Txid: tx.TxID().String(), Status: tc.responseStatus, }) } diff --git a/internal/broadcaster/utxo_creator.go b/internal/broadcaster/utxo_creator.go index 2f56ab887..f0b62c85f 100644 --- a/internal/broadcaster/utxo_creator.go +++ b/internal/broadcaster/utxo_creator.go @@ -12,6 +12,7 @@ import ( "github.com/bitcoin-sv/arc/internal/metamorph/metamorph_api" "github.com/bitcoin-sv/arc/pkg/keyset" + "github.com/bitcoin-sv/go-sdk/chainhash" sdkTx "github.com/bitcoin-sv/go-sdk/transaction" ) @@ -137,7 +138,7 @@ func (b *UTXOCreator) Start(requestedOutputs int, requestedSatoshisPerOutput uin if res.Status == metamorph_api.Status_REJECTED || res.Status == metamorph_api.Status_SEEN_IN_ORPHAN_MEMPOOL { b.logger.Error("splitting tx was not successful", slog.String("status", res.Status.String()), slog.String("hash", res.Txid), slog.String("reason", res.RejectReason)) for _, tx := range batch { - if tx.TxID() == res.Txid { + if tx.TxID().String() == res.Txid { b.logger.Debug(tx.String()) break } @@ -157,9 +158,14 @@ func (b *UTXOCreator) Start(requestedOutputs int, requestedSatoshisPerOutput uin continue } + hash, err := chainhash.NewHash(txIDBytes) + if err != nil { + b.logger.Error("failed to create chainhash txid", slog.String("err", err.Error())) + continue + } for _, foundOutput := range foundOutputs { newUtxo := &sdkTx.UTXO{ - TxID: txIDBytes, + TxID: hash, Vout: foundOutput.vout, LockingScript: b.keySet.Script, Satoshis: foundOutput.satoshis, @@ -224,7 +230,7 @@ func (b *UTXOCreator) splitOutputs(requestedOutputs int, requestedSatoshisPerOut txOutputs[i] = splittingOutput{satoshis: txOutput.Satoshis, vout: uint32(i)} } - satoshiMap[tx.TxID()] = txOutputs + satoshiMap[tx.TxID().String()] = txOutputs if len(txsSplit) == b.batchSize { txsSplitBatches = append(txsSplitBatches, txsSplit) diff --git a/internal/broadcaster/utxo_creator_test.go b/internal/broadcaster/utxo_creator_test.go index 64cd963c0..b770ef32b 100644 --- a/internal/broadcaster/utxo_creator_test.go +++ b/internal/broadcaster/utxo_creator_test.go @@ -12,6 +12,7 @@ import ( "github.com/bitcoin-sv/arc/internal/metamorph/metamorph_api" "github.com/bitcoin-sv/arc/internal/testdata" "github.com/bitcoin-sv/arc/pkg/keyset" + "github.com/bitcoin-sv/go-sdk/chainhash" "github.com/bitcoin-sv/go-sdk/script" sdkTx "github.com/bitcoin-sv/go-sdk/transaction" transaction "github.com/bitcoin-sv/go-sdk/transaction/chaincfg" @@ -23,6 +24,7 @@ func TestUTXOCreator(t *testing.T) { require.NoError(t, err) logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) + hash1, _ := chainhash.NewHash(testdata.TX1Hash[:]) tests := []struct { name string @@ -35,7 +37,7 @@ func TestUTXOCreator(t *testing.T) { }{ { name: "success - creates correct UTXOs", - getUTXOsResp: sdkTx.UTXOs{{TxID: testdata.TX1Hash[:], Vout: 0, LockingScript: ks.Script, Satoshis: 401}}, + getUTXOsResp: sdkTx.UTXOs{{TxID: hash1, Vout: 0, LockingScript: ks.Script, Satoshis: 401}}, getBalanceFunc: func(_ context.Context, _ string, _ time.Duration, _ uint64) (int64, int64, error) { return 400, 0, nil }, @@ -46,7 +48,7 @@ func TestUTXOCreator(t *testing.T) { }, { name: "Insufficient balance", - getUTXOsResp: sdkTx.UTXOs{{TxID: testdata.TX1Hash[:], Vout: 0, LockingScript: ks.Script, Satoshis: 50}}, + getUTXOsResp: sdkTx.UTXOs{{TxID: hash1, Vout: 0, LockingScript: ks.Script, Satoshis: 50}}, getBalanceFunc: func(_ context.Context, _ string, _ time.Duration, _ uint64) (int64, int64, error) { return 100, 0, nil }, @@ -64,7 +66,7 @@ func TestUTXOCreator(t *testing.T) { BroadcastTransactionsFunc: func(_ context.Context, txs sdkTx.Transactions, _ metamorph_api.Status, _, _ string, _, _ bool) ([]*metamorph_api.TransactionStatus, error) { statuses := make([]*metamorph_api.TransactionStatus, len(txs)) for i, tx := range txs { - statuses[i] = &metamorph_api.TransactionStatus{Txid: tx.TxID(), Status: metamorph_api.Status_SEEN_ON_NETWORK} + statuses[i] = &metamorph_api.TransactionStatus{Txid: tx.TxID().String(), Status: metamorph_api.Status_SEEN_ON_NETWORK} } return statuses, nil }, diff --git a/internal/broadcaster/utxo_splitter.go b/internal/broadcaster/utxo_splitter.go index 35d11f49a..34a3af485 100644 --- a/internal/broadcaster/utxo_splitter.go +++ b/internal/broadcaster/utxo_splitter.go @@ -5,6 +5,7 @@ import ( "errors" "log/slog" + "github.com/bitcoin-sv/go-sdk/chainhash" sdkTx "github.com/bitcoin-sv/go-sdk/transaction" "github.com/bitcoin-sv/arc/internal/metamorph/metamorph_api" @@ -46,8 +47,12 @@ func (b *UTXOSplitter) SplitUtxo(txid string, satoshis uint64, vout uint32, dryr return errors.Join(ErrFailedToDecodeTxID, err) } + hash, err := chainhash.NewHash(txIDBytes) + if err != nil { + return err + } utxo := &sdkTx.UTXO{ - TxID: txIDBytes, + TxID: hash, Vout: vout, LockingScript: b.fromKeySet.Script, Satoshis: satoshis, @@ -88,19 +93,19 @@ func (b *UTXOSplitter) SplitUtxo(txid string, satoshis uint64, vout uint32, dryr return err } - b.logger.Info("Splitting tx", slog.String("txid", tx.TxID()), slog.String("rawTx", tx.String())) + b.logger.Info("Splitting tx", slog.String("txid", tx.TxID().String()), slog.String("rawTx", tx.String())) if dryrun { return nil } - b.logger.Info("Submit splitting tx", slog.String("txid", tx.TxID())) + b.logger.Info("Submit splitting tx", slog.String("txid", tx.TxID().String())) resp, err := b.client.BroadcastTransaction(b.ctx, tx, metamorph_api.Status_SEEN_ON_NETWORK, "") if err != nil { return errors.Join(ErrFailedToBroadcastTx, err) } - b.logger.Info("Splitting tx submitted", slog.String("txid", tx.TxID()), slog.String("status", resp.Status.String())) + b.logger.Info("Splitting tx submitted", slog.String("txid", tx.TxID().String()), slog.String("status", resp.Status.String())) return nil } diff --git a/internal/broadcaster/utxo_splitter_test.go b/internal/broadcaster/utxo_splitter_test.go index 3990d8a68..82929431b 100644 --- a/internal/broadcaster/utxo_splitter_test.go +++ b/internal/broadcaster/utxo_splitter_test.go @@ -78,7 +78,7 @@ func TestSplitUtxo(t *testing.T) { require.Equal(t, txIDBytes, tx.Inputs[0].SourceTXID) return &metamorph_api.TransactionStatus{ - Txid: tx.TxID(), + Txid: tx.TxID().String(), Status: metamorph_api.Status_SEEN_ON_NETWORK, }, tc.broadcastTransactionsErr }, diff --git a/internal/metamorph/client_test.go b/internal/metamorph/client_test.go index 6cf65c1a2..a5707082c 100644 --- a/internal/metamorph/client_test.go +++ b/internal/metamorph/client_test.go @@ -261,15 +261,15 @@ func TestClient_SubmitTransactions(t *testing.T) { putTxStatus: &metamorph_api.TransactionStatuses{ Statuses: []*metamorph_api.TransactionStatus{ { - Txid: tx1.TxID(), + Txid: tx1.TxID().String(), Status: metamorph_api.Status_RECEIVED, }, { - Txid: tx2.TxID(), + Txid: tx2.TxID().String(), Status: metamorph_api.Status_RECEIVED, }, { - Txid: tx3.TxID(), + Txid: tx3.TxID().String(), Status: metamorph_api.Status_RECEIVED, }, }, @@ -277,17 +277,17 @@ func TestClient_SubmitTransactions(t *testing.T) { expectedStatuses: []*metamorph.TransactionStatus{ { - TxID: tx1.TxID(), + TxID: tx1.TxID().String(), Status: metamorph_api.Status_RECEIVED.String(), Timestamp: now.Unix(), }, { - TxID: tx2.TxID(), + TxID: tx2.TxID().String(), Status: metamorph_api.Status_RECEIVED.String(), Timestamp: now.Unix(), }, { - TxID: tx3.TxID(), + TxID: tx3.TxID().String(), Status: metamorph_api.Status_RECEIVED.String(), Timestamp: now.Unix(), }, @@ -301,15 +301,15 @@ func TestClient_SubmitTransactions(t *testing.T) { putTxStatus: &metamorph_api.TransactionStatuses{ Statuses: []*metamorph_api.TransactionStatus{ { - Txid: tx1.TxID(), + Txid: tx1.TxID().String(), Status: metamorph_api.Status_RECEIVED, }, { - Txid: tx2.TxID(), + Txid: tx2.TxID().String(), Status: metamorph_api.Status_RECEIVED, }, { - Txid: tx3.TxID(), + Txid: tx3.TxID().String(), Status: metamorph_api.Status_RECEIVED, }, }, @@ -328,15 +328,15 @@ func TestClient_SubmitTransactions(t *testing.T) { putTxStatus: &metamorph_api.TransactionStatuses{ Statuses: []*metamorph_api.TransactionStatus{ { - Txid: tx1.TxID(), + Txid: tx1.TxID().String(), Status: metamorph_api.Status_RECEIVED, }, { - Txid: tx2.TxID(), + Txid: tx2.TxID().String(), Status: metamorph_api.Status_RECEIVED, }, { - Txid: tx3.TxID(), + Txid: tx3.TxID().String(), Status: metamorph_api.Status_RECEIVED, }, }, @@ -349,17 +349,17 @@ func TestClient_SubmitTransactions(t *testing.T) { expectedStatuses: []*metamorph.TransactionStatus{ { - TxID: tx1.TxID(), + TxID: tx1.TxID().String(), Status: metamorph_api.Status_RECEIVED.String(), Timestamp: now.Unix(), }, { - TxID: tx2.TxID(), + TxID: tx2.TxID().String(), Status: metamorph_api.Status_RECEIVED.String(), Timestamp: now.Unix(), }, { - TxID: tx3.TxID(), + TxID: tx3.TxID().String(), Status: metamorph_api.Status_RECEIVED.String(), Timestamp: now.Unix(), }, @@ -373,15 +373,15 @@ func TestClient_SubmitTransactions(t *testing.T) { putTxStatus: &metamorph_api.TransactionStatuses{ Statuses: []*metamorph_api.TransactionStatus{ { - Txid: tx1.TxID(), + Txid: tx1.TxID().String(), Status: metamorph_api.Status_RECEIVED, }, { - Txid: tx2.TxID(), + Txid: tx2.TxID().String(), Status: metamorph_api.Status_RECEIVED, }, { - Txid: tx3.TxID(), + Txid: tx3.TxID().String(), Status: metamorph_api.Status_RECEIVED, }, }, @@ -391,17 +391,17 @@ func TestClient_SubmitTransactions(t *testing.T) { expectedStatuses: []*metamorph.TransactionStatus{ { - TxID: tx1.TxID(), + TxID: tx1.TxID().String(), Status: metamorph_api.Status_RECEIVED.String(), Timestamp: now.Unix(), }, { - TxID: tx2.TxID(), + TxID: tx2.TxID().String(), Status: metamorph_api.Status_RECEIVED.String(), Timestamp: now.Unix(), }, { - TxID: tx3.TxID(), + TxID: tx3.TxID().String(), Status: metamorph_api.Status_RECEIVED.String(), Timestamp: now.Unix(), }, @@ -416,17 +416,17 @@ func TestClient_SubmitTransactions(t *testing.T) { expectedStatuses: []*metamorph.TransactionStatus{ { - TxID: tx1.TxID(), + TxID: tx1.TxID().String(), Status: metamorph_api.Status_QUEUED.String(), Timestamp: now.Unix(), }, { - TxID: tx2.TxID(), + TxID: tx2.TxID().String(), Status: metamorph_api.Status_QUEUED.String(), Timestamp: now.Unix(), }, { - TxID: tx3.TxID(), + TxID: tx3.TxID().String(), Status: metamorph_api.Status_QUEUED.String(), Timestamp: now.Unix(), }, diff --git a/internal/node_client/node_client_test.go b/internal/node_client/node_client_test.go index 13c1cab1a..83c831614 100644 --- a/internal/node_client/node_client_test.go +++ b/internal/node_client/node_client_test.go @@ -15,7 +15,7 @@ import ( "github.com/stretchr/testify/require" "github.com/bitcoin-sv/arc/internal/node_client" - "github.com/bitcoin-sv/arc/pkg/test_utils" + testutils "github.com/bitcoin-sv/arc/pkg/test_utils" ) var ( @@ -133,12 +133,12 @@ func TestNodeClient(t *testing.T) { require.NoError(t, err) if i != len(txs) { - expectedTxIDs[i] = tx.TxID() + expectedTxIDs[i] = tx.TxID().String() } } // when - ancestorTxIDs, err := sut.GetMempoolAncestors(ctx, []string{txs[len(txs)-1].TxID()}) + ancestorTxIDs, err := sut.GetMempoolAncestors(ctx, []string{txs[len(txs)-1].TxID().String()}) // then require.NoError(t, err) diff --git a/internal/p2p/tests/wire_reader_test.go b/internal/p2p/tests/wire_reader_test.go new file mode 100644 index 000000000..8b397a1f5 --- /dev/null +++ b/internal/p2p/tests/wire_reader_test.go @@ -0,0 +1,119 @@ +package p2p_tests + +import ( + "bytes" + "context" + "io" + "testing" + "time" + + "github.com/bitcoin-sv/arc/internal/p2p" + "github.com/libsv/go-p2p/wire" + "github.com/stretchr/testify/require" +) + +func TestWireReader_ReadNextMsg(t *testing.T) { + t.Run("Success", func(t *testing.T) { + // given + expectedMsg := wire.NewMsgGetBlocks(blockHash) + + var buff bytes.Buffer + err := wire.WriteMessage(&buff, expectedMsg, wire.ProtocolVersion, bitcoinNet) + require.NoError(t, err) + + sut := p2p.NewWireReader(&buff, 4096) + + // when + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + res, err := sut.ReadNextMsg(ctx, wire.ProtocolVersion, bitcoinNet) + + // then + require.NoError(t, err) + require.Equal(t, expectedMsg, res) + }) + + t.Run("Unknown msg", func(t *testing.T) { + // given + unknownMsg := unknownMsg{} + + expectedMsg := wire.NewMsgGetBlocks(blockHash) + + var buff bytes.Buffer + // first write unknown msg + err := wire.WriteMessage(&buff, &unknownMsg, wire.ProtocolVersion, bitcoinNet) + require.NoError(t, err) + + // next write regular msg + err = wire.WriteMessage(&buff, expectedMsg, wire.ProtocolVersion, bitcoinNet) + require.NoError(t, err) + + sut := p2p.NewWireReader(&buff, 4096) + + // when + ctx, cancel := context.WithTimeout(context.Background(), 35*time.Second) + defer cancel() + + res, err := sut.ReadNextMsg(ctx, wire.ProtocolVersion, bitcoinNet) + + // then + require.NoError(t, err) + require.Equal(t, expectedMsg, res) + }) + + t.Run("Context cancelled", func(t *testing.T) { + // given + expectedMsg := wire.NewMsgGetBlocks(blockHash) + + var buff bytes.Buffer + err := wire.WriteMessage(&buff, expectedMsg, wire.ProtocolVersion, bitcoinNet) + require.NoError(t, err) + + sut := p2p.NewWireReader(&buff, 4096) + + // when + ctx, cancel := context.WithCancel(context.Background()) + cancel() // Cancel the context immediately + + res, err := sut.ReadNextMsg(ctx, wire.ProtocolVersion, bitcoinNet) + + // then + require.ErrorIs(t, err, context.Canceled) + require.Nil(t, res) + }) + + t.Run("Read error", func(t *testing.T) { + var buff bytes.Buffer + sut := p2p.NewWireReader(&buff, 4096) + + // when + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + res, err := sut.ReadNextMsg(ctx, wire.ProtocolVersion, bitcoinNet) + + // then + require.Error(t, err) + require.Nil(t, res) + }) +} + +type unknownMsg struct { +} + +func (m *unknownMsg) Bsvdecode(_ io.Reader, _ uint32, _ wire.MessageEncoding) error { + return nil +} + +func (m *unknownMsg) BsvEncode(_ io.Writer, _ uint32, _ wire.MessageEncoding) error { + return nil +} + +func (m *unknownMsg) Command() string { + return "test-cmd" +} + +func (m *unknownMsg) MaxPayloadLength(_ uint32) uint64 { + return 0 +} diff --git a/internal/testdata/data.go b/internal/testdata/data.go index 1b23d3293..3880f6eed 100644 --- a/internal/testdata/data.go +++ b/internal/testdata/data.go @@ -1,9 +1,10 @@ package testdata import ( - sdkTx "github.com/bitcoin-sv/go-sdk/transaction" "time" + sdkTx "github.com/bitcoin-sv/go-sdk/transaction" + "github.com/libsv/go-p2p/chaincfg/chainhash" ) @@ -15,7 +16,7 @@ var ( TX1RawString = "01000000010000000000000000000000000000000000000000000000000000000000000000ffffffff1a0386c40b2f7461616c2e636f6d2f00cf47ad9c7af83836000000ffffffff0117564425000000001976a914522cf9e7626d9bd8729e5a1398ece40dad1b6a2f88ac00000000" TX1Raw, _ = sdkTx.NewTransactionFromHex(TX1RawString) - TX1Hash, _ = chainhash.NewHashFromStr(TX1Raw.TxID()) + TX1Hash, _ = chainhash.NewHashFromStr(TX1Raw.TxID().String()) TX2 = "1a8fda8c35b8fc30885e88d6eb0214e2b3a74c96c82c386cb463905446011fdf" TX2Hash, _ = chainhash.NewHashFromStr(TX2) @@ -31,7 +32,7 @@ var ( TX6RawString = "010000000000000000ef016f8828b2d3f8085561d0b4ff6f5d17c269206fa3d32bcd3b22e26ce659ed12e7000000006b483045022100d3649d120249a09af44b4673eecec873109a3e120b9610b78858087fb225c9b9022037f16999b7a4fecdd9f47ebdc44abd74567a18940c37e1481ab0fe84d62152e4412102f87ce69f6ba5444aed49c34470041189c1e1060acd99341959c0594002c61bf0ffffffffe7030000000000001976a914c2b6fd4319122b9b5156a2a0060d19864c24f49a88ac01e7030000000000001976a914c2b6fd4319122b9b5156a2a0060d19864c24f49a88ac00000000" TX6Raw, _ = sdkTx.NewTransactionFromHex(TX6RawString) - TX6Hash, _ = chainhash.NewHashFromStr(TX6Raw.TxID()) + TX6Hash, _ = chainhash.NewHashFromStr(TX6Raw.TxID().String()) Time = time.Date(2009, 1, 03, 18, 15, 05, 0, time.UTC) DefaultPolicy = `{"excessiveblocksize":2000000000,"blockmaxsize":512000000,"maxtxsizepolicy":10000000,"maxorphantxsize":1000000000,"datacarriersize":4294967295,"maxscriptsizepolicy":500000,"maxopsperscriptpolicy":4294967295,"maxscriptnumlengthpolicy":10000,"maxpubkeyspermultisigpolicy":4294967295,"maxtxsigopscountspolicy":4294967295,"maxstackmemoryusagepolicy":100000000,"maxstackmemoryusageconsensus":200000000,"limitancestorcount":10000,"limitcpfpgroupmemberscount":25,"maxmempool":2000000000,"maxmempoolsizedisk":0,"mempoolmaxpercentcpfp":10,"acceptnonstdoutputs":true,"datacarrier":true,"minminingtxfee":5e-7,"maxstdtxvalidationduration":3,"maxnonstdtxvalidationduration":1000,"maxtxchainvalidationbudget":50,"validationclockcpu":true,"minconsolidationfactor":20,"maxconsolidationinputscriptsize":150,"minconfconsolidationinput":6,"minconsolidationinputmaturity":6,"acceptnonstdconsolidationinput":false}` diff --git a/internal/tx_finder/cached_tx_finder.go b/internal/tx_finder/cached_tx_finder.go index 0fe7f7770..bcb69a2fb 100644 --- a/internal/tx_finder/cached_tx_finder.go +++ b/internal/tx_finder/cached_tx_finder.go @@ -96,7 +96,7 @@ func (f CachedFinder) GetRawTxs(ctx context.Context, source validator.FindSource // update cache for _, tx := range foundTxs { - f.cacheStore.Set(tx.TxID(), *tx, cacheExpiration) + f.cacheStore.Set(tx.TxID().String(), *tx, cacheExpiration) } return append(cachedTxs, foundTxs...), nil diff --git a/internal/tx_finder/cached_tx_finder_test.go b/internal/tx_finder/cached_tx_finder_test.go index 05f222937..af195cc8a 100644 --- a/internal/tx_finder/cached_tx_finder_test.go +++ b/internal/tx_finder/cached_tx_finder_test.go @@ -30,15 +30,15 @@ func TestCachedFinder_GetRawTxs_AllFromCache(t *testing.T) { { name: "all from finder", fetchedTx: []*metamorph.Transaction{ - {TxID: testdata.TX1Raw.TxID(), Bytes: testdata.TX1Raw.Bytes()}, - {TxID: testdata.TX6Raw.TxID(), Bytes: testdata.TX6Raw.Bytes()}, + {TxID: testdata.TX1Raw.TxID().String(), Bytes: testdata.TX1Raw.Bytes()}, + {TxID: testdata.TX6Raw.TxID().String(), Bytes: testdata.TX6Raw.Bytes()}, }, }, { name: "cached and fetched mixed", cachedTx: []sdkTx.Transaction{*testdata.TX1Raw}, fetchedTx: []*metamorph.Transaction{ - {TxID: testdata.TX6Raw.TxID(), Bytes: testdata.TX6Raw.Bytes()}, + {TxID: testdata.TX6Raw.TxID().String(), Bytes: testdata.TX6Raw.Bytes()}, }, }, } @@ -54,7 +54,7 @@ func TestCachedFinder_GetRawTxs_AllFromCache(t *testing.T) { c := cache.New(10*time.Second, 10*time.Second) for _, r := range tc.cachedTx { - c.Set(r.TxID(), r, cache.DefaultExpiration) + c.Set(r.TxID().String(), r, cache.DefaultExpiration) } logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})) finder := New(thMq, nil, nil, logger) @@ -62,7 +62,7 @@ func TestCachedFinder_GetRawTxs_AllFromCache(t *testing.T) { // when // try to find in cache or with TransactionHandler only - res, err := sut.GetRawTxs(context.Background(), validator.SourceTransactionHandler, []string{testdata.TX1Raw.TxID(), testdata.TX6Raw.TxID()}) + res, err := sut.GetRawTxs(context.Background(), validator.SourceTransactionHandler, []string{testdata.TX1Raw.TxID().String(), testdata.TX6Raw.TxID().String()}) // then require.NoError(t, err) diff --git a/internal/validator/beef/beef_validator.go b/internal/validator/beef/beef_validator.go index e8c0e5842..f76e87b01 100644 --- a/internal/validator/beef/beef_validator.go +++ b/internal/validator/beef/beef_validator.go @@ -232,7 +232,7 @@ func findMinedAncestorsForInput(input *sdkTx.TransactionInput, ancestors []*beef } func findParentForInput(input *sdkTx.TransactionInput, parentTxs []*beef.TxData) *beef.TxData { - parentID := input.PreviousTxIDStr() + parentID := input.SourceTXID.String() for _, ptx := range parentTxs { if ptx.GetTxID() == parentID { diff --git a/internal/validator/common_validation.go b/internal/validator/common_validation.go index 77ff32812..68a0b40b1 100644 --- a/internal/validator/common_validation.go +++ b/internal/validator/common_validation.go @@ -119,7 +119,7 @@ func checkOutputs(tx *sdkTx.Transaction) *Error { func checkInputs(tx *sdkTx.Transaction) *Error { total := uint64(0) for index, input := range tx.Inputs { - if input.PreviousTxIDStr() == coinbaseTxID { + if input.SourceTXID.String() == coinbaseTxID { return NewError(errors.Join(ErrTxInputInvalid, fmt.Errorf("input %d is a coinbase input", index)), api.ErrStatusInputs) } diff --git a/internal/validator/common_validation_test.go b/internal/validator/common_validation_test.go index f78090cad..4816c998d 100644 --- a/internal/validator/common_validation_test.go +++ b/internal/validator/common_validation_test.go @@ -4,6 +4,7 @@ import ( "encoding/hex" "testing" + "github.com/bitcoin-sv/go-sdk/chainhash" "github.com/bitcoin-sv/go-sdk/script" sdkTx "github.com/bitcoin-sv/go-sdk/transaction" "github.com/ordishs/go-bitcoin" @@ -149,9 +150,10 @@ func TestCheckInputs(t *testing.T) { } coinbaseInput := &sdkTx.TransactionInput{} - coinbaseInput.SetPrevTxFromOutput(&sdkTx.TransactionOutput{Satoshis: 100}) + coinbaseInput.SetSourceTxOutput(&sdkTx.TransactionOutput{Satoshis: 100}) hexTxID, _ := hex.DecodeString(coinbaseTxID) - coinbaseInput.SourceTXID = hexTxID + hash, _ := chainhash.NewHash(hexTxID) + coinbaseInput.SourceTXID = hash tests := []struct { name string diff --git a/internal/validator/default/default_validator.go b/internal/validator/default/default_validator.go index 5722d558d..65b9a3545 100644 --- a/internal/validator/default/default_validator.go +++ b/internal/validator/default/default_validator.go @@ -143,7 +143,12 @@ func checkCumulativeFees(ctx context.Context, txFinder validator.TxFinderI, tx * for _, txFromSet := range txSet { cumulativeSize += txFromSet.Size() - totalInput := txFromSet.TotalInputSatoshis() + total, err := txFromSet.TotalInputSatoshis() + if err != nil { + e := fmt.Errorf("failed to get total input satoshis %w", err) + return validator.NewError(e, api.ErrStatusCumulativeFees) + } + totalInput := total totalOutput := txFromSet.TotalOutputSatoshis() if totalOutput > totalInput { @@ -172,7 +177,11 @@ func isFeePaidEnough(feeModel sdkTx.FeeModel, tx *sdkTx.Transaction) (bool, uint return false, 0, 0, err } - totalInputSatoshis := tx.TotalInputSatoshis() + total, err := tx.TotalInputSatoshis() + if err != nil { + return false, 0, 0, err + } + totalInputSatoshis := total totalOutputSatoshis := tx.TotalOutputSatoshis() if totalInputSatoshis < totalOutputSatoshis { diff --git a/internal/validator/default/default_validator_test.go b/internal/validator/default/default_validator_test.go index 449968c56..a698c7f7f 100644 --- a/internal/validator/default/default_validator_test.go +++ b/internal/validator/default/default_validator_test.go @@ -135,7 +135,7 @@ func TestValidator(t *testing.T) { parentTx, err := sdkTx.NewTransactionFromHex(parentHex) require.NoError(t, err, "Could not parse parent tx hex") - in.SetPrevTxFromOutput(parentTx.Outputs[in.SourceTxOutIndex]) + in.SetSourceTxOutput(parentTx.Outputs[in.SourceTxOutIndex]) } policy := getPolicy(5) diff --git a/internal/validator/default/helpers.go b/internal/validator/default/helpers.go index 55188e24a..4fe4a2bcd 100644 --- a/internal/validator/default/helpers.go +++ b/internal/validator/default/helpers.go @@ -33,7 +33,7 @@ func extendTx(ctx context.Context, txFinder validator.TxFinderI, rawTx *sdkTx.Tr parentInputMap := make(map[string][]*sdkTx.TransactionInput) for _, in := range rawTx.Inputs { - prevTxID := in.PreviousTxIDStr() + prevTxID := in.SourceTXID.String() inputs, found := parentInputMap[prevTxID] if !found { @@ -60,7 +60,7 @@ func extendTx(ctx context.Context, txFinder validator.TxFinderI, rawTx *sdkTx.Tr // extend inputs with parents data for _, p := range parentsTxs { - childInputs, found := parentInputMap[p.TxID()] + childInputs, found := parentInputMap[p.TxID().String()] if !found { return ErrParentNotFound } @@ -76,11 +76,11 @@ func extendTx(ctx context.Context, txFinder validator.TxFinderI, rawTx *sdkTx.Tr func extendInputs(tx *sdkTx.Transaction, childInputs []*sdkTx.TransactionInput) error { for _, input := range childInputs { if len(tx.Outputs) < int(input.SourceTxOutIndex) { - return fmt.Errorf("output %d not found in transaction %s", input.SourceTxOutIndex, input.PreviousTxIDStr()) + return fmt.Errorf("output %d not found in transaction %s", input.SourceTxOutIndex, input.SourceTXID.String()) } output := tx.Outputs[input.SourceTxOutIndex] - input.SetPrevTxFromOutput(output) + input.SetSourceTxOutput(output) } return nil @@ -99,7 +99,7 @@ func getUnminedAncestors(ctx context.Context, txFinder validator.TxFinderI, tx * parentInputMap := make(map[string]struct{}) for _, in := range tx.Inputs { - prevTxID := in.PreviousTxIDStr() + prevTxID := in.SourceTXID.String() _, found := parentInputMap[prevTxID] if !found { // first occurrence of the parent @@ -127,7 +127,7 @@ func getUnminedAncestors(ctx context.Context, txFinder validator.TxFinderI, tx * return nil, err } - unmindedAncestorsSet[mempoolTx.TxID()] = mempoolTx + unmindedAncestorsSet[mempoolTx.TxID().String()] = mempoolTx } return unmindedAncestorsSet, nil diff --git a/internal/validator/default/helpers_test.go b/internal/validator/default/helpers_test.go index a95576dba..028088818 100644 --- a/internal/validator/default/helpers_test.go +++ b/internal/validator/default/helpers_test.go @@ -90,14 +90,14 @@ func TestDefaultValidator_helpers_getUnminedAncestors(t *testing.T) { { name: "tx finder returns rubbish", txHex: fixture.ValidTxRawHex, - mempoolAncestors: []string{fixture.ParentTx1.TxID(), fixture.RandomTx1.TxID()}, + mempoolAncestors: []string{fixture.ParentTx1.TxID().String(), fixture.RandomTx1.TxID().String()}, expectedError: ErrParentNotFound, }, { name: "with mined parents only", txHex: fixture.ValidTxRawHex, - mempoolAncestors: []string{fixture.ParentTx1.TxID()}, + mempoolAncestors: []string{fixture.ParentTx1.TxID().String()}, }, { name: "with mined parents only", diff --git a/pkg/keyset/key_set.go b/pkg/keyset/key_set.go index a8394adb8..096cbcb43 100644 --- a/pkg/keyset/key_set.go +++ b/pkg/keyset/key_set.go @@ -88,7 +88,7 @@ func NewFromExtendedKey(extendedKey *bip32.ExtendedKey, derivationPath string) ( Path: derivationPath, PrivateKey: privateKey, PublicKey: publicKey, - PublicKeyHash: publicKey.SerializeCompressed(), + PublicKeyHash: publicKey.Compressed(), Script: p2pkhScript, }, nil } diff --git a/pkg/woc_client/woc_client_test.go b/pkg/woc_client/woc_client_test.go index 827eedb7f..ce9b72b87 100644 --- a/pkg/woc_client/woc_client_test.go +++ b/pkg/woc_client/woc_client_test.go @@ -9,6 +9,7 @@ import ( "net/http/httptest" "testing" + "github.com/bitcoin-sv/go-sdk/chainhash" sdkTx "github.com/bitcoin-sv/go-sdk/transaction" "github.com/stretchr/testify/require" ) @@ -101,6 +102,8 @@ func Test_GetUTXOs(t *testing.T) { txIDbytes, err := hex.DecodeString("4a2992fa3af9eb7ff6b94dc9e27e44f29a54ab351ee6377455409b0ebbe1f00c") require.NoError(t, err) + hash, err := chainhash.NewHash(txIDbytes) + require.NoError(t, err) tt := []struct { name string @@ -123,7 +126,7 @@ func Test_GetUTXOs(t *testing.T) { }, expected: sdkTx.UTXOs{{ - TxID: txIDbytes, + TxID: hash, Vout: 1, Satoshis: 4, }}, diff --git a/test/submit_01_single_test.go b/test/submit_01_single_test.go index 357774607..69e6d5306 100644 --- a/test/submit_01_single_test.go +++ b/test/submit_01_single_test.go @@ -473,7 +473,7 @@ func TestCallback(t *testing.T) { expectedTxsCallbacks := make(map[string]int) // key: txID, value: number of received callbacks for _, tx := range txs { - expectedTxsCallbacks[tx.TxID()] = 0 + expectedTxsCallbacks[tx.TxID().String()] = 0 } expectedCallbacksNumber := callbacksNumber * tc.numberOfTxs @@ -645,8 +645,8 @@ func TestBatchCallback(t *testing.T) { expectedTxsCallbacks := make(map[string]int) // key: txID, value: number of received callbacks for _, tx := range txs { - t.Logf("expected callback - server: %d, tx ID: %s", i, tx.TxID()) - expectedTxsCallbacks[tx.TxID()] = 0 + t.Logf("expected callback - server: %d, tx ID: %s", i, tx.TxID().String()) + expectedTxsCallbacks[tx.TxID().String()] = 0 } expectedCallbacksNumber := callbacksNumber @@ -868,7 +868,7 @@ func TestPostCumulativeFeesValidation(t *testing.T) { for i := 0; i < zeroChainCount; i++ { output := parentTx.Outputs[0] utxo := node_client.UnspentOutput{ - Txid: parentTx.TxID(), + Txid: parentTx.TxID().String(), Vout: 0, Address: address, ScriptPubKey: output.LockingScript.String(), @@ -927,7 +927,7 @@ func TestPostCumulativeFeesValidation(t *testing.T) { parentTx := chain[len(chain)-1] output := parentTx.Outputs[0] utxo := node_client.UnspentOutput{ - Txid: parentTx.TxID(), + Txid: parentTx.TxID().String(), Vout: 0, Address: address, ScriptPubKey: output.LockingScript.String(), diff --git a/test/submit_03_double_spending_test.go b/test/submit_03_double_spending_test.go index 6ded35c70..9f7306f3f 100644 --- a/test/submit_03_double_spending_test.go +++ b/test/submit_03_double_spending_test.go @@ -40,7 +40,7 @@ func TestDoubleSpend(t *testing.T) { // submit second transaction resp = postRequest[TransactionResponse](t, arcEndpointV1Tx, createPayload(t, TransactionRequest{RawTx: rawTx}), map[string]string{"X-WaitFor": StatusDoubleSpendAttempted}, http.StatusOK) require.Equal(t, StatusDoubleSpendAttempted, resp.TxStatus) - require.Equal(t, []string{tx1.TxID()}, *resp.CompetingTxs) + require.Equal(t, []string{tx1.TxID().String()}, *resp.CompetingTxs) // give arc time to update the status of all competing transactions time.Sleep(5 * time.Second) @@ -50,7 +50,7 @@ func TestDoubleSpend(t *testing.T) { // verify that the first tx was also set to DOUBLE_SPEND_ATTEMPTED require.Equal(t, StatusDoubleSpendAttempted, statusResp.TxStatus) - require.Equal(t, []string{tx2.TxID()}, *statusResp.CompetingTxs) + require.Equal(t, []string{tx2.TxID().String()}, *statusResp.CompetingTxs) // mine the first tx node_client.Generate(t, bitcoind, 1) diff --git a/test/submit_04_beef_test.go b/test/submit_04_beef_test.go index 2982143df..cd54b44de 100644 --- a/test/submit_04_beef_test.go +++ b/test/submit_04_beef_test.go @@ -83,10 +83,10 @@ func TestBeef(t *testing.T) { for i := 0; i < expectedCallbacks; i++ { select { case status := <-callbackReceivedChan: - if status.Txid == middleTx.TxID() { + if status.Txid == middleTx.TxID().String() { require.Equal(t, StatusMined, status.TxStatus) middleTxCallbackReceived = true - } else if status.Txid == tx.TxID() { + } else if status.Txid == tx.TxID().String() { require.Equal(t, StatusMined, status.TxStatus) lastTxCallbackReceived = true } else { @@ -167,7 +167,7 @@ func prepareBeef(t *testing.T, inputTxID, blockHash, fromAddress, toAddress, pri expectedCallbacks++ middleUtxo := node_client.UnspentOutput{ - Txid: middleTx.TxID(), + Txid: middleTx.TxID().String(), Vout: 0, ScriptPubKey: middleTx.Outputs[0].LockingScriptHex(), Amount: float64(middleTx.Outputs[0].Satoshis) / 1e8, // satoshis to BSV