diff --git a/cmd/arc/services/metamorph.go b/cmd/arc/services/metamorph.go index 3df64f51d..569a9420a 100644 --- a/cmd/arc/services/metamorph.go +++ b/cmd/arc/services/metamorph.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log/slog" + "net/url" "os" "time" @@ -160,10 +161,17 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore metamorph.WithMaxRetries(mtmConfig.MaxRetries), metamorph.WithMinimumHealthyConnections(mtmConfig.Health.MinimumHealthyConnections)) + pc := arcConfig.PeerRPC + rpcURL, err := url.Parse(fmt.Sprintf("rpc://%s:%s@%s:%d", pc.User, pc.Password, pc.Host, pc.Port)) + if err != nil { + return nil, fmt.Errorf("failed to parse node rpc url: %w", err) + } + processor, err = metamorph.NewProcessor( metamorphStore, cacheStore, pm, + rpcURL, statusMessageCh, processorOpts..., ) diff --git a/config/defaults.go b/config/defaults.go index e5047a70d..633fe82d5 100644 --- a/config/defaults.go +++ b/config/defaults.go @@ -13,7 +13,7 @@ func getDefaultArcConfig() *ArcConfig { ProfilerAddr: "", // optional Prometheus: getDefaultPrometheusConfig(), GrpcMessageSize: 100000000, - Network: "regtest", + Network: "testnet", MessageQueue: getDefaultMessageQueueConfig(), Tracing: getDefaultTracingConfig(), PeerRPC: getDefaultPeerRPCConfig(), @@ -66,18 +66,6 @@ func getBroadcastingConfig() *BroadcastingConfig { ZMQ: 28332, }, }, - { - Host: "localhost", - Port: &PeerPortConfig{ - P2P: 18334, - }, - }, - { - Host: "localhost", - Port: &PeerPortConfig{ - P2P: 18335, - }, - }, }, }, Multicast: &Mulsticast{ diff --git a/internal/metamorph/integration_test/double_spend_integration_test.go b/internal/metamorph/integration_test/double_spend_integration_test.go index 5ab7f18a1..a44e5b2b0 100644 --- a/internal/metamorph/integration_test/double_spend_integration_test.go +++ b/internal/metamorph/integration_test/double_spend_integration_test.go @@ -82,7 +82,7 @@ func TestDoubleSpendDetection(t *testing.T) { require.NoError(t, err) } - processor, err := metamorph.NewProcessor(metamorphStore, cStore, pm, statusMessageChannel, + processor, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil, statusMessageChannel, metamorph.WithMinedTxsChan(minedTxChannel), metamorph.WithNow(func() time.Time { return time.Date(2023, 10, 1, 13, 0, 0, 0, time.UTC) }), metamorph.WithProcessStatusUpdatesInterval(200*time.Millisecond), diff --git a/internal/metamorph/integration_test/processor_integration_test.go b/internal/metamorph/integration_test/processor_integration_test.go index 4512e2592..3fa93e5c5 100644 --- a/internal/metamorph/integration_test/processor_integration_test.go +++ b/internal/metamorph/integration_test/processor_integration_test.go @@ -51,7 +51,7 @@ func TestProcessor(t *testing.T) { natsQueue := nats_core.New(natsMock) statusMessageChannel := make(chan *metamorph.TxStatusMessage, 10) - sut, err := metamorph.NewProcessor(mtmStore, cacheStore, pm, statusMessageChannel, + sut, err := metamorph.NewProcessor(mtmStore, cacheStore, pm, nil, statusMessageChannel, metamorph.WithProcessStatusUpdatesInterval(200*time.Millisecond), metamorph.WithMessageQueueClient(natsQueue), ) diff --git a/internal/metamorph/processor.go b/internal/metamorph/processor.go index 827daea91..d4d38273f 100644 --- a/internal/metamorph/processor.go +++ b/internal/metamorph/processor.go @@ -2,15 +2,18 @@ package metamorph import ( "context" + "encoding/hex" "errors" "fmt" "log/slog" + "net/url" "os" "sync" "time" "github.com/libsv/go-p2p" "github.com/libsv/go-p2p/chaincfg/chainhash" + "github.com/ordishs/go-bitcoin" "go.opentelemetry.io/otel/attribute" "google.golang.org/protobuf/proto" @@ -18,7 +21,9 @@ import ( "github.com/bitcoin-sv/arc/internal/cache" "github.com/bitcoin-sv/arc/internal/metamorph/metamorph_api" "github.com/bitcoin-sv/arc/internal/metamorph/store" + "github.com/bitcoin-sv/arc/internal/node_client" "github.com/bitcoin-sv/arc/internal/tracing" + sdkTx "github.com/bitcoin-sv/go-sdk/transaction" ) const ( @@ -65,6 +70,7 @@ type Processor struct { hostname string pm p2p.PeerManagerI mqClient MessageQueue + nodeClient *node_client.NodeClient logger *slog.Logger mapExpiryTime time.Duration recheckSeenFromAgo time.Duration @@ -113,7 +119,7 @@ type CallbackSender interface { SendCallback(ctx context.Context, data *store.Data) } -func NewProcessor(s store.MetamorphStore, c cache.Store, pm p2p.PeerManagerI, statusMessageChannel chan *TxStatusMessage, opts ...Option) (*Processor, error) { +func NewProcessor(s store.MetamorphStore, c cache.Store, pm p2p.PeerManagerI, rpcURL *url.URL, statusMessageChannel chan *TxStatusMessage, opts ...Option) (*Processor, error) { if s == nil { return nil, ErrStoreNil } @@ -127,6 +133,20 @@ func NewProcessor(s store.MetamorphStore, c cache.Store, pm p2p.PeerManagerI, st return nil, err } + var nodeClient *node_client.NodeClient + if rpcURL != nil { + bitcoinClient, err := bitcoin.NewFromURL(rpcURL, false) + if err != nil { + return nil, fmt.Errorf("failed to create bitcoin client: %w", err) + } + + nc, err := node_client.New(bitcoinClient) + if err != nil { + return nil, fmt.Errorf("failed to create node client: %v", err) + } + nodeClient = &nc + } + p := &Processor{ store: s, cacheStore: c, @@ -151,6 +171,7 @@ func NewProcessor(s store.MetamorphStore, c cache.Store, pm p2p.PeerManagerI, st storageStatusUpdateCh: make(chan store.UpdateStatus, processStatusUpdatesBatchSizeDefault), stats: newProcessorStats(), waitGroup: &sync.WaitGroup{}, + nodeClient: nodeClient, statCollectionInterval: statCollectionIntervalDefault, processTransactionsInterval: processTransactionsIntervalDefault, @@ -400,6 +421,13 @@ func (p *Processor) StartSendStatusUpdate() { return case msg := <-p.statusMessageCh: + if p.nodeClient != nil { + err := p.CheckDoubleSpending(p.ctx, msg) + if err != nil { + p.logger.Warn("checking double spend attempt failed", slog.String("err", err.Error())) + } + } + // if we receive new update check if we have client connection waiting for status and send it found := p.responseProcessor.UpdateStatus(msg.Hash, StatusAndError{ Hash: msg.Hash, @@ -732,6 +760,44 @@ func (p *Processor) StartProcessExpiredTransactions() { }() } +// we receive Status_SEEN_IN_ORPHAN_MEMPOOL from ZMQ in two cases: when tx is orphaned (input transaction wasn't found) and +// when input(s) is already spent in previous block. We need to check which case it is because in case of spending already spent +// transaction we should reject the transaction as it can never be mined/successful. To check which case it is we are looping +// over all the input transactions and check that they can be found, if they all can be found then it must be - spending already +// spend input. +func (p *Processor) CheckDoubleSpending(ctx context.Context, msg *TxStatusMessage) error { + if msg.Status == metamorph_api.Status_SEEN_IN_ORPHAN_MEMPOOL { + // we must have tx already in db + data, err := p.store.Get(ctx, msg.Hash[:]) + if err != nil { + return err + } + + // we must be able to decode it + tx, err := sdkTx.NewTransactionFromBytes(data.RawTx) + if err != nil { + return err + } + + for _, input := range tx.Inputs { + inputTXID := input.SourceTXID + inputTX, err := p.nodeClient.GetRawTransaction(ctx, hex.EncodeToString(inputTXID)) + if err != nil { + return err + } + + if inputTX == nil { + // so if one of those transactions cannot be found the status was initially correct, it's orphaned transaction + return nil + } + } + + // if we found all the input transactions then it must be spending already spent tx output so change status to REJECTED + msg.Status = metamorph_api.Status_REJECTED + } + return nil +} + // GetPeers returns a list of connected and a list of disconnected peers func (p *Processor) GetPeers() []p2p.PeerI { return p.pm.GetPeers() diff --git a/internal/metamorph/processor_test.go b/internal/metamorph/processor_test.go index e86424c29..da8177048 100644 --- a/internal/metamorph/processor_test.go +++ b/internal/metamorph/processor_test.go @@ -71,7 +71,7 @@ func TestNewProcessor(t *testing.T) { for _, tc := range tt { t.Run(tc.name, func(t *testing.T) { // when - sut, actualErr := metamorph.NewProcessor(tc.store, cStore, tc.pm, nil, + sut, actualErr := metamorph.NewProcessor(tc.store, cStore, tc.pm, nil, nil, metamorph.WithCacheExpiryTime(time.Second*5), metamorph.WithProcessorLogger(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: metamorph.LogLevelDefault}))), ) @@ -128,7 +128,7 @@ func TestStartLockTransactions(t *testing.T) { cStore := cache.NewMemoryStore() // when - sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil, metamorph.WithLockTxsInterval(20*time.Millisecond)) + sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil, nil, metamorph.WithLockTxsInterval(20*time.Millisecond)) require.NoError(t, err) defer sut.Shutdown() sut.StartLockTransactions() @@ -244,7 +244,7 @@ func TestProcessTransaction(t *testing.T) { }, } - sut, err := metamorph.NewProcessor(s, cStore, pm, nil, metamorph.WithMessageQueueClient(publisher)) + sut, err := metamorph.NewProcessor(s, cStore, pm, nil, nil, metamorph.WithMessageQueueClient(publisher)) require.NoError(t, err) require.Equal(t, 0, sut.GetProcessorMapSize()) @@ -543,6 +543,7 @@ func TestStartSendStatusForTransaction(t *testing.T) { metamorphStore, cStore, pm, + nil, statusMessageChannel, metamorph.WithNow(func() time.Time { return time.Date(2023, 10, 1, 13, 0, 0, 0, time.UTC) }), metamorph.WithProcessStatusUpdatesInterval(200*time.Millisecond), @@ -694,7 +695,7 @@ func TestStartProcessSubmittedTxs(t *testing.T) { } const submittedTxsBuffer = 5 submittedTxsChan := make(chan *metamorph_api.TransactionRequest, submittedTxsBuffer) - sut, err := metamorph.NewProcessor(s, cStore, pm, nil, + sut, err := metamorph.NewProcessor(s, cStore, pm, nil, nil, metamorph.WithMessageQueueClient(publisher), metamorph.WithSubmittedTxsChan(submittedTxsChan), metamorph.WithProcessStatusUpdatesInterval(20*time.Millisecond), @@ -822,7 +823,7 @@ func TestProcessExpiredTransactions(t *testing.T) { }, } - sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil, + sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil, nil, metamorph.WithMessageQueueClient(publisher), metamorph.WithProcessExpiredTxsInterval(time.Millisecond*20), metamorph.WithMaxRetries(10), @@ -913,6 +914,7 @@ func TestStartProcessMinedCallbacks(t *testing.T) { cStore, pm, nil, + nil, metamorph.WithMinedTxsChan(minedTxsChan), metamorph.WithProcessMinedBatchSize(tc.processMinedBatchSize), metamorph.WithProcessMinedInterval(tc.processMinedInterval), @@ -1000,7 +1002,7 @@ func TestProcessorHealth(t *testing.T) { } cStore := cache.NewMemoryStore() - sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil, + sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil, nil, metamorph.WithProcessExpiredTxsInterval(time.Millisecond*20), metamorph.WithNow(func() time.Time { return time.Date(2033, 1, 1, 1, 0, 0, 0, time.UTC) @@ -1078,7 +1080,7 @@ func TestStart(t *testing.T) { submittedTxsChan := make(chan *metamorph_api.TransactionRequest, 2) minedTxsChan := make(chan *blocktx_api.TransactionBlock, 2) - sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil, + sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil, nil, metamorph.WithMessageQueueClient(mqClient), metamorph.WithSubmittedTxsChan(submittedTxsChan), metamorph.WithMinedTxsChan(minedTxsChan), diff --git a/internal/metamorph/stats_collector_test.go b/internal/metamorph/stats_collector_test.go index 2dd35de12..bfe441f5e 100644 --- a/internal/metamorph/stats_collector_test.go +++ b/internal/metamorph/stats_collector_test.go @@ -51,7 +51,7 @@ func TestStartCollectStats(t *testing.T) { pm := &mocks.PeerManagerMock{ShutdownFunc: func() {}} - processor, err := metamorph.NewProcessor(mtmStore, nil, pm, nil, + processor, err := metamorph.NewProcessor(mtmStore, nil, pm, nil, nil, metamorph.WithProcessorLogger(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: metamorph.LogLevelDefault}))), metamorph.WithStatCollectionInterval(10*time.Millisecond), )