From e5bf515b2fa67ac070ead89c2414ec8c55882f2a Mon Sep 17 00:00:00 2001 From: "shota.silagadze" Date: Thu, 23 Jan 2025 20:00:19 +0400 Subject: [PATCH] fix incorrect orphaned status --- cmd/arc/services/metamorph.go | 8 ++ .../double_spend_integration_test.go | 2 +- .../processor_integration_test.go | 2 +- internal/metamorph/processor.go | 76 ++++++++++++++++++- internal/metamorph/processor_test.go | 16 ++-- internal/metamorph/stats_collector_test.go | 2 +- internal/metamorph/zmq.go | 1 + internal/node_client/node_client.go | 14 ++++ test/submit_01_single_test.go | 38 ++++++++++ 9 files changed, 148 insertions(+), 11 deletions(-) diff --git a/cmd/arc/services/metamorph.go b/cmd/arc/services/metamorph.go index 3df64f51d..569a9420a 100644 --- a/cmd/arc/services/metamorph.go +++ b/cmd/arc/services/metamorph.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log/slog" + "net/url" "os" "time" @@ -160,10 +161,17 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore metamorph.WithMaxRetries(mtmConfig.MaxRetries), metamorph.WithMinimumHealthyConnections(mtmConfig.Health.MinimumHealthyConnections)) + pc := arcConfig.PeerRPC + rpcURL, err := url.Parse(fmt.Sprintf("rpc://%s:%s@%s:%d", pc.User, pc.Password, pc.Host, pc.Port)) + if err != nil { + return nil, fmt.Errorf("failed to parse node rpc url: %w", err) + } + processor, err = metamorph.NewProcessor( metamorphStore, cacheStore, pm, + rpcURL, statusMessageCh, processorOpts..., ) diff --git a/internal/metamorph/integration_test/double_spend_integration_test.go b/internal/metamorph/integration_test/double_spend_integration_test.go index 5ab7f18a1..a44e5b2b0 100644 --- a/internal/metamorph/integration_test/double_spend_integration_test.go +++ b/internal/metamorph/integration_test/double_spend_integration_test.go @@ -82,7 +82,7 @@ func TestDoubleSpendDetection(t *testing.T) { require.NoError(t, err) } - processor, err := metamorph.NewProcessor(metamorphStore, cStore, pm, statusMessageChannel, + processor, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil, statusMessageChannel, metamorph.WithMinedTxsChan(minedTxChannel), metamorph.WithNow(func() time.Time { return time.Date(2023, 10, 1, 13, 0, 0, 0, time.UTC) }), metamorph.WithProcessStatusUpdatesInterval(200*time.Millisecond), diff --git a/internal/metamorph/integration_test/processor_integration_test.go b/internal/metamorph/integration_test/processor_integration_test.go index 4512e2592..3fa93e5c5 100644 --- a/internal/metamorph/integration_test/processor_integration_test.go +++ b/internal/metamorph/integration_test/processor_integration_test.go @@ -51,7 +51,7 @@ func TestProcessor(t *testing.T) { natsQueue := nats_core.New(natsMock) statusMessageChannel := make(chan *metamorph.TxStatusMessage, 10) - sut, err := metamorph.NewProcessor(mtmStore, cacheStore, pm, statusMessageChannel, + sut, err := metamorph.NewProcessor(mtmStore, cacheStore, pm, nil, statusMessageChannel, metamorph.WithProcessStatusUpdatesInterval(200*time.Millisecond), metamorph.WithMessageQueueClient(natsQueue), ) diff --git a/internal/metamorph/processor.go b/internal/metamorph/processor.go index 827daea91..137c155f3 100644 --- a/internal/metamorph/processor.go +++ b/internal/metamorph/processor.go @@ -2,15 +2,18 @@ package metamorph import ( "context" + "encoding/hex" "errors" "fmt" "log/slog" + "net/url" "os" "sync" "time" "github.com/libsv/go-p2p" "github.com/libsv/go-p2p/chaincfg/chainhash" + "github.com/ordishs/go-bitcoin" "go.opentelemetry.io/otel/attribute" "google.golang.org/protobuf/proto" @@ -18,7 +21,9 @@ import ( "github.com/bitcoin-sv/arc/internal/cache" "github.com/bitcoin-sv/arc/internal/metamorph/metamorph_api" "github.com/bitcoin-sv/arc/internal/metamorph/store" + "github.com/bitcoin-sv/arc/internal/node_client" "github.com/bitcoin-sv/arc/internal/tracing" + sdkTx "github.com/bitcoin-sv/go-sdk/transaction" ) const ( @@ -65,6 +70,7 @@ type Processor struct { hostname string pm p2p.PeerManagerI mqClient MessageQueue + nodeClient *node_client.NodeClient logger *slog.Logger mapExpiryTime time.Duration recheckSeenFromAgo time.Duration @@ -113,7 +119,7 @@ type CallbackSender interface { SendCallback(ctx context.Context, data *store.Data) } -func NewProcessor(s store.MetamorphStore, c cache.Store, pm p2p.PeerManagerI, statusMessageChannel chan *TxStatusMessage, opts ...Option) (*Processor, error) { +func NewProcessor(s store.MetamorphStore, c cache.Store, pm p2p.PeerManagerI, rpcURL *url.URL, statusMessageChannel chan *TxStatusMessage, opts ...Option) (*Processor, error) { if s == nil { return nil, ErrStoreNil } @@ -127,6 +133,20 @@ func NewProcessor(s store.MetamorphStore, c cache.Store, pm p2p.PeerManagerI, st return nil, err } + var nodeClient *node_client.NodeClient + if rpcURL != nil { + bitcoinClient, err := bitcoin.NewFromURL(rpcURL, false) + if err != nil { + return nil, fmt.Errorf("failed to create bitcoin client: %w", err) + } + + nc, err := node_client.New(bitcoinClient) + if err != nil { + return nil, fmt.Errorf("failed to create node client: %v", err) + } + nodeClient = &nc + } + p := &Processor{ store: s, cacheStore: c, @@ -151,6 +171,7 @@ func NewProcessor(s store.MetamorphStore, c cache.Store, pm p2p.PeerManagerI, st storageStatusUpdateCh: make(chan store.UpdateStatus, processStatusUpdatesBatchSizeDefault), stats: newProcessorStats(), waitGroup: &sync.WaitGroup{}, + nodeClient: nodeClient, statCollectionInterval: statCollectionIntervalDefault, processTransactionsInterval: processTransactionsIntervalDefault, @@ -400,6 +421,13 @@ func (p *Processor) StartSendStatusUpdate() { return case msg := <-p.statusMessageCh: + if msg.Status == metamorph_api.Status_SEEN_IN_ORPHAN_MEMPOOL && p.nodeClient != nil { + err := p.CheckDoubleSpending(p.ctx, msg) + if err != nil { + fmt.Println("shota e", err) + p.logger.Warn("checking double spend attempt failed", slog.String("err", err.Error())) + } + } // if we receive new update check if we have client connection waiting for status and send it found := p.responseProcessor.UpdateStatus(msg.Hash, StatusAndError{ Hash: msg.Hash, @@ -732,6 +760,52 @@ func (p *Processor) StartProcessExpiredTransactions() { }() } +// we receive Status_SEEN_IN_ORPHAN_MEMPOOL from ZMQ in two cases: when tx is orphaned (input transaction wasn't found) and +// when input(s) is already spent in previous block. We need to check which case it is because in case of spending already spent +// transaction we should reject the transaction as it can never be mined/successful. To check which case it is we are looping +// over all the input transactions and check that they can be found, if they all can be found then it must be - spending already +// spend input. +func (p *Processor) CheckDoubleSpending(ctx context.Context, msg *TxStatusMessage) error { + // we must have tx already in db + data, err := p.store.Get(ctx, msg.Hash[:]) + if err != nil { + fmt.Println("shota 0", err) + return err + } + + // we must be able to decode it + tx, err := sdkTx.NewTransactionFromBytes(data.RawTx) + if err != nil { + fmt.Println("shota 8", err) + return err + } + + for _, input := range tx.Inputs { + inputTX, err := p.nodeClient.GetRawTransaction(ctx, hex.EncodeToString(input.SourceTXID)) + if err != nil { + return err + } + + fmt.Println("shota iq", inputTX) + if inputTX == nil { + // so if one of those transactions cannot be found the status was initially correct, it's orphaned transaction + return nil + } + + txout, err := p.nodeClient.GetTXOut(ctx, inputTX.TxID(), int(input.SourceTxOutIndex), true) + if err != nil { + return err + } + + if txout == nil { + msg.Status = metamorph_api.Status_REJECTED + return nil + } + } + fmt.Println("shota a 6") + return nil +} + // GetPeers returns a list of connected and a list of disconnected peers func (p *Processor) GetPeers() []p2p.PeerI { return p.pm.GetPeers() diff --git a/internal/metamorph/processor_test.go b/internal/metamorph/processor_test.go index e86424c29..da8177048 100644 --- a/internal/metamorph/processor_test.go +++ b/internal/metamorph/processor_test.go @@ -71,7 +71,7 @@ func TestNewProcessor(t *testing.T) { for _, tc := range tt { t.Run(tc.name, func(t *testing.T) { // when - sut, actualErr := metamorph.NewProcessor(tc.store, cStore, tc.pm, nil, + sut, actualErr := metamorph.NewProcessor(tc.store, cStore, tc.pm, nil, nil, metamorph.WithCacheExpiryTime(time.Second*5), metamorph.WithProcessorLogger(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: metamorph.LogLevelDefault}))), ) @@ -128,7 +128,7 @@ func TestStartLockTransactions(t *testing.T) { cStore := cache.NewMemoryStore() // when - sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil, metamorph.WithLockTxsInterval(20*time.Millisecond)) + sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil, nil, metamorph.WithLockTxsInterval(20*time.Millisecond)) require.NoError(t, err) defer sut.Shutdown() sut.StartLockTransactions() @@ -244,7 +244,7 @@ func TestProcessTransaction(t *testing.T) { }, } - sut, err := metamorph.NewProcessor(s, cStore, pm, nil, metamorph.WithMessageQueueClient(publisher)) + sut, err := metamorph.NewProcessor(s, cStore, pm, nil, nil, metamorph.WithMessageQueueClient(publisher)) require.NoError(t, err) require.Equal(t, 0, sut.GetProcessorMapSize()) @@ -543,6 +543,7 @@ func TestStartSendStatusForTransaction(t *testing.T) { metamorphStore, cStore, pm, + nil, statusMessageChannel, metamorph.WithNow(func() time.Time { return time.Date(2023, 10, 1, 13, 0, 0, 0, time.UTC) }), metamorph.WithProcessStatusUpdatesInterval(200*time.Millisecond), @@ -694,7 +695,7 @@ func TestStartProcessSubmittedTxs(t *testing.T) { } const submittedTxsBuffer = 5 submittedTxsChan := make(chan *metamorph_api.TransactionRequest, submittedTxsBuffer) - sut, err := metamorph.NewProcessor(s, cStore, pm, nil, + sut, err := metamorph.NewProcessor(s, cStore, pm, nil, nil, metamorph.WithMessageQueueClient(publisher), metamorph.WithSubmittedTxsChan(submittedTxsChan), metamorph.WithProcessStatusUpdatesInterval(20*time.Millisecond), @@ -822,7 +823,7 @@ func TestProcessExpiredTransactions(t *testing.T) { }, } - sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil, + sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil, nil, metamorph.WithMessageQueueClient(publisher), metamorph.WithProcessExpiredTxsInterval(time.Millisecond*20), metamorph.WithMaxRetries(10), @@ -913,6 +914,7 @@ func TestStartProcessMinedCallbacks(t *testing.T) { cStore, pm, nil, + nil, metamorph.WithMinedTxsChan(minedTxsChan), metamorph.WithProcessMinedBatchSize(tc.processMinedBatchSize), metamorph.WithProcessMinedInterval(tc.processMinedInterval), @@ -1000,7 +1002,7 @@ func TestProcessorHealth(t *testing.T) { } cStore := cache.NewMemoryStore() - sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil, + sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil, nil, metamorph.WithProcessExpiredTxsInterval(time.Millisecond*20), metamorph.WithNow(func() time.Time { return time.Date(2033, 1, 1, 1, 0, 0, 0, time.UTC) @@ -1078,7 +1080,7 @@ func TestStart(t *testing.T) { submittedTxsChan := make(chan *metamorph_api.TransactionRequest, 2) minedTxsChan := make(chan *blocktx_api.TransactionBlock, 2) - sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil, + sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil, nil, metamorph.WithMessageQueueClient(mqClient), metamorph.WithSubmittedTxsChan(submittedTxsChan), metamorph.WithMinedTxsChan(minedTxsChan), diff --git a/internal/metamorph/stats_collector_test.go b/internal/metamorph/stats_collector_test.go index 2dd35de12..bfe441f5e 100644 --- a/internal/metamorph/stats_collector_test.go +++ b/internal/metamorph/stats_collector_test.go @@ -51,7 +51,7 @@ func TestStartCollectStats(t *testing.T) { pm := &mocks.PeerManagerMock{ShutdownFunc: func() {}} - processor, err := metamorph.NewProcessor(mtmStore, nil, pm, nil, + processor, err := metamorph.NewProcessor(mtmStore, nil, pm, nil, nil, metamorph.WithProcessorLogger(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: metamorph.LogLevelDefault}))), metamorph.WithStatCollectionInterval(10*time.Millisecond), ) diff --git a/internal/metamorph/zmq.go b/internal/metamorph/zmq.go index a443ad5a7..2f2e09341 100644 --- a/internal/metamorph/zmq.go +++ b/internal/metamorph/zmq.go @@ -200,6 +200,7 @@ func (z *ZMQ) handleInvalidTx(msg []string) (hash *chainhash.Hash, status metamo } if txInfo.IsMissingInputs { + fmt.Println("shota 1") // Missing Inputs does not immediately mean it's an error. It may mean that transaction is temporarily waiting // for its parents (e.g. in case of bulk submit). So we don't throw any error here, just update the status. // If it's actually an error with transaction, it will be rejected when the parents arrive to node's memopool. diff --git a/internal/node_client/node_client.go b/internal/node_client/node_client.go index f450fc4ab..8b9155568 100644 --- a/internal/node_client/node_client.go +++ b/internal/node_client/node_client.go @@ -117,3 +117,17 @@ func (n NodeClient) GetRawTransaction(ctx context.Context, id string) (rt *sdkTx return rt, nil } + +func (n NodeClient) GetTXOut(ctx context.Context, id string, outputIndex int, includeMempool bool) (res *bitcoin.TXOut, err error) { + _, span := tracing.StartTracing(ctx, "NodeClient_GetRawTransaction", n.tracingEnabled, n.tracingAttributes...) + defer func() { + tracing.EndTracing(span, err) + }() + + nTx, err := n.bitcoinClient.GetTxOut(id, outputIndex, includeMempool) + if err != nil { + return nil, errors.Join(ErrFailedToGetRawTransaction, err) + } + + return nTx, nil +} diff --git a/test/submit_01_single_test.go b/test/submit_01_single_test.go index 357774607..0ef5f6b46 100644 --- a/test/submit_01_single_test.go +++ b/test/submit_01_single_test.go @@ -130,6 +130,44 @@ func TestSubmitSingle(t *testing.T) { } } +func TestRejectingOrphaned(t *testing.T) { + address, privateKey := node_client.FundNewWallet(t, bitcoind) + address2, _ := node_client.FundNewWallet(t, bitcoind) + + node_client.SendToAddress(t, bitcoind, address, float64(10)) + + utxos := node_client.GetUtxos(t, bitcoind, address) + require.True(t, len(utxos) > 0, "No UTXOs available for the address") + + tx1, err := node_client.CreateTx(privateKey, address, utxos[0]) + require.NoError(t, err) + rawTx1, err := tx1.EFHex() + require.NoError(t, err) + + tx2, err := node_client.CreateTx(privateKey, address2, utxos[0]) + require.NoError(t, err) + rawTx2, err := tx2.EFHex() + require.NoError(t, err) + + transactionResponse := postRequest[TransactionResponse](t, arcEndpointV1Tx, createPayload(t, TransactionRequest{RawTx: rawTx1}), + map[string]string{ + "X-MaxTimeout": "5", + }, http.StatusOK) + + require.Equal(t, StatusSeenOnNetwork, transactionResponse.TxStatus) + node_client.Generate(t, bitcoind, 1) + + statusResponse := getRequest[TransactionResponse](t, fmt.Sprintf("%s/%s", arcEndpointV1Tx, tx1.TxID())) + require.Equal(t, StatusMined, statusResponse.TxStatus) + + transactionResponse2 := postRequest[TransactionResponse](t, arcEndpointV1Tx, createPayload(t, TransactionRequest{RawTx: rawTx2}), + map[string]string{ + "X-MaxTimeout": "30", + }, http.StatusOK) + + require.Equal(t, StatusRejected, transactionResponse2.TxStatus) +} + func TestSubmitMined(t *testing.T) { t.Run("submit mined tx + calculate merkle path", func(t *testing.T) { // Submit an unregistered, already mined transaction. ARC should return the status as MINED for the transaction.