Skip to content

Commit

Permalink
fix incorrect orphaned status
Browse files Browse the repository at this point in the history
  • Loading branch information
shotasilagadzetaal committed Jan 27, 2025
1 parent 153ad99 commit e5bf515
Show file tree
Hide file tree
Showing 9 changed files with 148 additions and 11 deletions.
8 changes: 8 additions & 0 deletions cmd/arc/services/metamorph.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log/slog"
"net/url"
"os"
"time"

Expand Down Expand Up @@ -160,10 +161,17 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore
metamorph.WithMaxRetries(mtmConfig.MaxRetries),
metamorph.WithMinimumHealthyConnections(mtmConfig.Health.MinimumHealthyConnections))

pc := arcConfig.PeerRPC
rpcURL, err := url.Parse(fmt.Sprintf("rpc://%s:%s@%s:%d", pc.User, pc.Password, pc.Host, pc.Port))
if err != nil {
return nil, fmt.Errorf("failed to parse node rpc url: %w", err)
}

processor, err = metamorph.NewProcessor(
metamorphStore,
cacheStore,
pm,
rpcURL,
statusMessageCh,
processorOpts...,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestDoubleSpendDetection(t *testing.T) {
require.NoError(t, err)
}

processor, err := metamorph.NewProcessor(metamorphStore, cStore, pm, statusMessageChannel,
processor, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil, statusMessageChannel,
metamorph.WithMinedTxsChan(minedTxChannel),
metamorph.WithNow(func() time.Time { return time.Date(2023, 10, 1, 13, 0, 0, 0, time.UTC) }),
metamorph.WithProcessStatusUpdatesInterval(200*time.Millisecond),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestProcessor(t *testing.T) {
natsQueue := nats_core.New(natsMock)
statusMessageChannel := make(chan *metamorph.TxStatusMessage, 10)

sut, err := metamorph.NewProcessor(mtmStore, cacheStore, pm, statusMessageChannel,
sut, err := metamorph.NewProcessor(mtmStore, cacheStore, pm, nil, statusMessageChannel,
metamorph.WithProcessStatusUpdatesInterval(200*time.Millisecond),
metamorph.WithMessageQueueClient(natsQueue),
)
Expand Down
76 changes: 75 additions & 1 deletion internal/metamorph/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,28 @@ package metamorph

import (
"context"
"encoding/hex"
"errors"
"fmt"
"log/slog"
"net/url"
"os"
"sync"
"time"

"github.com/libsv/go-p2p"
"github.com/libsv/go-p2p/chaincfg/chainhash"
"github.com/ordishs/go-bitcoin"
"go.opentelemetry.io/otel/attribute"
"google.golang.org/protobuf/proto"

"github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api"
"github.com/bitcoin-sv/arc/internal/cache"
"github.com/bitcoin-sv/arc/internal/metamorph/metamorph_api"
"github.com/bitcoin-sv/arc/internal/metamorph/store"
"github.com/bitcoin-sv/arc/internal/node_client"
"github.com/bitcoin-sv/arc/internal/tracing"
sdkTx "github.com/bitcoin-sv/go-sdk/transaction"
)

const (
Expand Down Expand Up @@ -65,6 +70,7 @@ type Processor struct {
hostname string
pm p2p.PeerManagerI
mqClient MessageQueue
nodeClient *node_client.NodeClient
logger *slog.Logger
mapExpiryTime time.Duration
recheckSeenFromAgo time.Duration
Expand Down Expand Up @@ -113,7 +119,7 @@ type CallbackSender interface {
SendCallback(ctx context.Context, data *store.Data)
}

func NewProcessor(s store.MetamorphStore, c cache.Store, pm p2p.PeerManagerI, statusMessageChannel chan *TxStatusMessage, opts ...Option) (*Processor, error) {
func NewProcessor(s store.MetamorphStore, c cache.Store, pm p2p.PeerManagerI, rpcURL *url.URL, statusMessageChannel chan *TxStatusMessage, opts ...Option) (*Processor, error) {
if s == nil {
return nil, ErrStoreNil
}
Expand All @@ -127,6 +133,20 @@ func NewProcessor(s store.MetamorphStore, c cache.Store, pm p2p.PeerManagerI, st
return nil, err
}

var nodeClient *node_client.NodeClient
if rpcURL != nil {
bitcoinClient, err := bitcoin.NewFromURL(rpcURL, false)
if err != nil {
return nil, fmt.Errorf("failed to create bitcoin client: %w", err)
}

nc, err := node_client.New(bitcoinClient)
if err != nil {
return nil, fmt.Errorf("failed to create node client: %v", err)
}
nodeClient = &nc
}

p := &Processor{
store: s,
cacheStore: c,
Expand All @@ -151,6 +171,7 @@ func NewProcessor(s store.MetamorphStore, c cache.Store, pm p2p.PeerManagerI, st
storageStatusUpdateCh: make(chan store.UpdateStatus, processStatusUpdatesBatchSizeDefault),
stats: newProcessorStats(),
waitGroup: &sync.WaitGroup{},
nodeClient: nodeClient,

statCollectionInterval: statCollectionIntervalDefault,
processTransactionsInterval: processTransactionsIntervalDefault,
Expand Down Expand Up @@ -400,6 +421,13 @@ func (p *Processor) StartSendStatusUpdate() {
return

case msg := <-p.statusMessageCh:
if msg.Status == metamorph_api.Status_SEEN_IN_ORPHAN_MEMPOOL && p.nodeClient != nil {
err := p.CheckDoubleSpending(p.ctx, msg)
if err != nil {
fmt.Println("shota e", err)
p.logger.Warn("checking double spend attempt failed", slog.String("err", err.Error()))
}
}
// if we receive new update check if we have client connection waiting for status and send it
found := p.responseProcessor.UpdateStatus(msg.Hash, StatusAndError{
Hash: msg.Hash,
Expand Down Expand Up @@ -732,6 +760,52 @@ func (p *Processor) StartProcessExpiredTransactions() {
}()
}

// we receive Status_SEEN_IN_ORPHAN_MEMPOOL from ZMQ in two cases: when tx is orphaned (input transaction wasn't found) and
// when input(s) is already spent in previous block. We need to check which case it is because in case of spending already spent
// transaction we should reject the transaction as it can never be mined/successful. To check which case it is we are looping
// over all the input transactions and check that they can be found, if they all can be found then it must be - spending already
// spend input.
func (p *Processor) CheckDoubleSpending(ctx context.Context, msg *TxStatusMessage) error {
// we must have tx already in db
data, err := p.store.Get(ctx, msg.Hash[:])
if err != nil {
fmt.Println("shota 0", err)
return err
}

// we must be able to decode it
tx, err := sdkTx.NewTransactionFromBytes(data.RawTx)
if err != nil {
fmt.Println("shota 8", err)
return err
}

for _, input := range tx.Inputs {
inputTX, err := p.nodeClient.GetRawTransaction(ctx, hex.EncodeToString(input.SourceTXID))
if err != nil {
return err
}

fmt.Println("shota iq", inputTX)
if inputTX == nil {
// so if one of those transactions cannot be found the status was initially correct, it's orphaned transaction
return nil
}

txout, err := p.nodeClient.GetTXOut(ctx, inputTX.TxID(), int(input.SourceTxOutIndex), true)
if err != nil {
return err
}

if txout == nil {
msg.Status = metamorph_api.Status_REJECTED
return nil
}
}
fmt.Println("shota a 6")
return nil
}

// GetPeers returns a list of connected and a list of disconnected peers
func (p *Processor) GetPeers() []p2p.PeerI {
return p.pm.GetPeers()
Expand Down
16 changes: 9 additions & 7 deletions internal/metamorph/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestNewProcessor(t *testing.T) {
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
// when
sut, actualErr := metamorph.NewProcessor(tc.store, cStore, tc.pm, nil,
sut, actualErr := metamorph.NewProcessor(tc.store, cStore, tc.pm, nil, nil,
metamorph.WithCacheExpiryTime(time.Second*5),
metamorph.WithProcessorLogger(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: metamorph.LogLevelDefault}))),
)
Expand Down Expand Up @@ -128,7 +128,7 @@ func TestStartLockTransactions(t *testing.T) {
cStore := cache.NewMemoryStore()

// when
sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil, metamorph.WithLockTxsInterval(20*time.Millisecond))
sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil, nil, metamorph.WithLockTxsInterval(20*time.Millisecond))
require.NoError(t, err)
defer sut.Shutdown()
sut.StartLockTransactions()
Expand Down Expand Up @@ -244,7 +244,7 @@ func TestProcessTransaction(t *testing.T) {
},
}

sut, err := metamorph.NewProcessor(s, cStore, pm, nil, metamorph.WithMessageQueueClient(publisher))
sut, err := metamorph.NewProcessor(s, cStore, pm, nil, nil, metamorph.WithMessageQueueClient(publisher))
require.NoError(t, err)
require.Equal(t, 0, sut.GetProcessorMapSize())

Expand Down Expand Up @@ -543,6 +543,7 @@ func TestStartSendStatusForTransaction(t *testing.T) {
metamorphStore,
cStore,
pm,
nil,
statusMessageChannel,
metamorph.WithNow(func() time.Time { return time.Date(2023, 10, 1, 13, 0, 0, 0, time.UTC) }),
metamorph.WithProcessStatusUpdatesInterval(200*time.Millisecond),
Expand Down Expand Up @@ -694,7 +695,7 @@ func TestStartProcessSubmittedTxs(t *testing.T) {
}
const submittedTxsBuffer = 5
submittedTxsChan := make(chan *metamorph_api.TransactionRequest, submittedTxsBuffer)
sut, err := metamorph.NewProcessor(s, cStore, pm, nil,
sut, err := metamorph.NewProcessor(s, cStore, pm, nil, nil,
metamorph.WithMessageQueueClient(publisher),
metamorph.WithSubmittedTxsChan(submittedTxsChan),
metamorph.WithProcessStatusUpdatesInterval(20*time.Millisecond),
Expand Down Expand Up @@ -822,7 +823,7 @@ func TestProcessExpiredTransactions(t *testing.T) {
},
}

sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil,
sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil, nil,
metamorph.WithMessageQueueClient(publisher),
metamorph.WithProcessExpiredTxsInterval(time.Millisecond*20),
metamorph.WithMaxRetries(10),
Expand Down Expand Up @@ -913,6 +914,7 @@ func TestStartProcessMinedCallbacks(t *testing.T) {
cStore,
pm,
nil,
nil,
metamorph.WithMinedTxsChan(minedTxsChan),
metamorph.WithProcessMinedBatchSize(tc.processMinedBatchSize),
metamorph.WithProcessMinedInterval(tc.processMinedInterval),
Expand Down Expand Up @@ -1000,7 +1002,7 @@ func TestProcessorHealth(t *testing.T) {
}
cStore := cache.NewMemoryStore()

sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil,
sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil, nil,
metamorph.WithProcessExpiredTxsInterval(time.Millisecond*20),
metamorph.WithNow(func() time.Time {
return time.Date(2033, 1, 1, 1, 0, 0, 0, time.UTC)
Expand Down Expand Up @@ -1078,7 +1080,7 @@ func TestStart(t *testing.T) {
submittedTxsChan := make(chan *metamorph_api.TransactionRequest, 2)
minedTxsChan := make(chan *blocktx_api.TransactionBlock, 2)

sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil,
sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil, nil,
metamorph.WithMessageQueueClient(mqClient),
metamorph.WithSubmittedTxsChan(submittedTxsChan),
metamorph.WithMinedTxsChan(minedTxsChan),
Expand Down
2 changes: 1 addition & 1 deletion internal/metamorph/stats_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestStartCollectStats(t *testing.T) {

pm := &mocks.PeerManagerMock{ShutdownFunc: func() {}}

processor, err := metamorph.NewProcessor(mtmStore, nil, pm, nil,
processor, err := metamorph.NewProcessor(mtmStore, nil, pm, nil, nil,
metamorph.WithProcessorLogger(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: metamorph.LogLevelDefault}))),
metamorph.WithStatCollectionInterval(10*time.Millisecond),
)
Expand Down
1 change: 1 addition & 0 deletions internal/metamorph/zmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ func (z *ZMQ) handleInvalidTx(msg []string) (hash *chainhash.Hash, status metamo
}

if txInfo.IsMissingInputs {
fmt.Println("shota 1")
// Missing Inputs does not immediately mean it's an error. It may mean that transaction is temporarily waiting
// for its parents (e.g. in case of bulk submit). So we don't throw any error here, just update the status.
// If it's actually an error with transaction, it will be rejected when the parents arrive to node's memopool.
Expand Down
14 changes: 14 additions & 0 deletions internal/node_client/node_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,17 @@ func (n NodeClient) GetRawTransaction(ctx context.Context, id string) (rt *sdkTx

return rt, nil
}

func (n NodeClient) GetTXOut(ctx context.Context, id string, outputIndex int, includeMempool bool) (res *bitcoin.TXOut, err error) {
_, span := tracing.StartTracing(ctx, "NodeClient_GetRawTransaction", n.tracingEnabled, n.tracingAttributes...)
defer func() {
tracing.EndTracing(span, err)
}()

nTx, err := n.bitcoinClient.GetTxOut(id, outputIndex, includeMempool)
if err != nil {
return nil, errors.Join(ErrFailedToGetRawTransaction, err)
}

return nTx, nil
}
38 changes: 38 additions & 0 deletions test/submit_01_single_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,44 @@ func TestSubmitSingle(t *testing.T) {
}
}

func TestRejectingOrphaned(t *testing.T) {
address, privateKey := node_client.FundNewWallet(t, bitcoind)
address2, _ := node_client.FundNewWallet(t, bitcoind)

node_client.SendToAddress(t, bitcoind, address, float64(10))

utxos := node_client.GetUtxos(t, bitcoind, address)
require.True(t, len(utxos) > 0, "No UTXOs available for the address")

tx1, err := node_client.CreateTx(privateKey, address, utxos[0])
require.NoError(t, err)
rawTx1, err := tx1.EFHex()
require.NoError(t, err)

tx2, err := node_client.CreateTx(privateKey, address2, utxos[0])
require.NoError(t, err)
rawTx2, err := tx2.EFHex()
require.NoError(t, err)

transactionResponse := postRequest[TransactionResponse](t, arcEndpointV1Tx, createPayload(t, TransactionRequest{RawTx: rawTx1}),
map[string]string{
"X-MaxTimeout": "5",
}, http.StatusOK)

require.Equal(t, StatusSeenOnNetwork, transactionResponse.TxStatus)
node_client.Generate(t, bitcoind, 1)

statusResponse := getRequest[TransactionResponse](t, fmt.Sprintf("%s/%s", arcEndpointV1Tx, tx1.TxID()))
require.Equal(t, StatusMined, statusResponse.TxStatus)

transactionResponse2 := postRequest[TransactionResponse](t, arcEndpointV1Tx, createPayload(t, TransactionRequest{RawTx: rawTx2}),
map[string]string{
"X-MaxTimeout": "30",
}, http.StatusOK)

require.Equal(t, StatusRejected, transactionResponse2.TxStatus)
}

func TestSubmitMined(t *testing.T) {
t.Run("submit mined tx + calculate merkle path", func(t *testing.T) {
// Submit an unregistered, already mined transaction. ARC should return the status as MINED for the transaction.
Expand Down

0 comments on commit e5bf515

Please sign in to comment.