Skip to content

Commit

Permalink
fix incorrect orphaned status
Browse files Browse the repository at this point in the history
  • Loading branch information
shotasilagadzetaal committed Jan 24, 2025
1 parent 153ad99 commit c4c2c3d
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 24 deletions.
8 changes: 8 additions & 0 deletions cmd/arc/services/metamorph.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log/slog"
"net/url"
"os"
"time"

Expand Down Expand Up @@ -160,10 +161,17 @@ func StartMetamorph(logger *slog.Logger, arcConfig *config.ArcConfig, cacheStore
metamorph.WithMaxRetries(mtmConfig.MaxRetries),
metamorph.WithMinimumHealthyConnections(mtmConfig.Health.MinimumHealthyConnections))

pc := arcConfig.PeerRPC
rpcURL, err := url.Parse(fmt.Sprintf("rpc://%s:%s@%s:%d", pc.User, pc.Password, pc.Host, pc.Port))
if err != nil {
return nil, fmt.Errorf("failed to parse node rpc url: %w", err)
}

processor, err = metamorph.NewProcessor(
metamorphStore,
cacheStore,
pm,
rpcURL,
statusMessageCh,
processorOpts...,
)
Expand Down
14 changes: 1 addition & 13 deletions config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func getDefaultArcConfig() *ArcConfig {
ProfilerAddr: "", // optional
Prometheus: getDefaultPrometheusConfig(),
GrpcMessageSize: 100000000,
Network: "regtest",
Network: "testnet",
MessageQueue: getDefaultMessageQueueConfig(),
Tracing: getDefaultTracingConfig(),
PeerRPC: getDefaultPeerRPCConfig(),
Expand Down Expand Up @@ -66,18 +66,6 @@ func getBroadcastingConfig() *BroadcastingConfig {
ZMQ: 28332,
},
},
{
Host: "localhost",
Port: &PeerPortConfig{
P2P: 18334,
},
},
{
Host: "localhost",
Port: &PeerPortConfig{
P2P: 18335,
},
},
},
},
Multicast: &Mulsticast{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestDoubleSpendDetection(t *testing.T) {
require.NoError(t, err)
}

processor, err := metamorph.NewProcessor(metamorphStore, cStore, pm, statusMessageChannel,
processor, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil, statusMessageChannel,
metamorph.WithMinedTxsChan(minedTxChannel),
metamorph.WithNow(func() time.Time { return time.Date(2023, 10, 1, 13, 0, 0, 0, time.UTC) }),
metamorph.WithProcessStatusUpdatesInterval(200*time.Millisecond),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestProcessor(t *testing.T) {
natsQueue := nats_core.New(natsMock)
statusMessageChannel := make(chan *metamorph.TxStatusMessage, 10)

sut, err := metamorph.NewProcessor(mtmStore, cacheStore, pm, statusMessageChannel,
sut, err := metamorph.NewProcessor(mtmStore, cacheStore, pm, nil, statusMessageChannel,
metamorph.WithProcessStatusUpdatesInterval(200*time.Millisecond),
metamorph.WithMessageQueueClient(natsQueue),
)
Expand Down
68 changes: 67 additions & 1 deletion internal/metamorph/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,28 @@ package metamorph

import (
"context"
"encoding/hex"
"errors"
"fmt"
"log/slog"
"net/url"
"os"
"sync"
"time"

"github.com/libsv/go-p2p"
"github.com/libsv/go-p2p/chaincfg/chainhash"
"github.com/ordishs/go-bitcoin"
"go.opentelemetry.io/otel/attribute"
"google.golang.org/protobuf/proto"

"github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api"
"github.com/bitcoin-sv/arc/internal/cache"
"github.com/bitcoin-sv/arc/internal/metamorph/metamorph_api"
"github.com/bitcoin-sv/arc/internal/metamorph/store"
"github.com/bitcoin-sv/arc/internal/node_client"
"github.com/bitcoin-sv/arc/internal/tracing"
sdkTx "github.com/bitcoin-sv/go-sdk/transaction"
)

const (
Expand Down Expand Up @@ -65,6 +70,7 @@ type Processor struct {
hostname string
pm p2p.PeerManagerI
mqClient MessageQueue
nodeClient *node_client.NodeClient
logger *slog.Logger
mapExpiryTime time.Duration
recheckSeenFromAgo time.Duration
Expand Down Expand Up @@ -113,7 +119,7 @@ type CallbackSender interface {
SendCallback(ctx context.Context, data *store.Data)
}

func NewProcessor(s store.MetamorphStore, c cache.Store, pm p2p.PeerManagerI, statusMessageChannel chan *TxStatusMessage, opts ...Option) (*Processor, error) {
func NewProcessor(s store.MetamorphStore, c cache.Store, pm p2p.PeerManagerI, rpcURL *url.URL, statusMessageChannel chan *TxStatusMessage, opts ...Option) (*Processor, error) {
if s == nil {
return nil, ErrStoreNil
}
Expand All @@ -127,6 +133,20 @@ func NewProcessor(s store.MetamorphStore, c cache.Store, pm p2p.PeerManagerI, st
return nil, err
}

var nodeClient *node_client.NodeClient
if rpcURL != nil {
bitcoinClient, err := bitcoin.NewFromURL(rpcURL, false)
if err != nil {
return nil, fmt.Errorf("failed to create bitcoin client: %w", err)
}

nc, err := node_client.New(bitcoinClient)
if err != nil {
return nil, fmt.Errorf("failed to create node client: %v", err)
}
nodeClient = &nc
}

p := &Processor{
store: s,
cacheStore: c,
Expand All @@ -151,6 +171,7 @@ func NewProcessor(s store.MetamorphStore, c cache.Store, pm p2p.PeerManagerI, st
storageStatusUpdateCh: make(chan store.UpdateStatus, processStatusUpdatesBatchSizeDefault),
stats: newProcessorStats(),
waitGroup: &sync.WaitGroup{},
nodeClient: nodeClient,

statCollectionInterval: statCollectionIntervalDefault,
processTransactionsInterval: processTransactionsIntervalDefault,
Expand Down Expand Up @@ -400,6 +421,13 @@ func (p *Processor) StartSendStatusUpdate() {
return

case msg := <-p.statusMessageCh:
if p.nodeClient != nil {
err := p.CheckDoubleSpending(p.ctx, msg)
if err != nil {
p.logger.Warn("checking double spend attempt failed", slog.String("err", err.Error()))
}
}

// if we receive new update check if we have client connection waiting for status and send it
found := p.responseProcessor.UpdateStatus(msg.Hash, StatusAndError{
Hash: msg.Hash,
Expand Down Expand Up @@ -732,6 +760,44 @@ func (p *Processor) StartProcessExpiredTransactions() {
}()
}

// we receive Status_SEEN_IN_ORPHAN_MEMPOOL from ZMQ in two cases: when tx is orphaned (input transaction wasn't found) and
// when input(s) is already spent in previous block. We need to check which case it is because in case of spending already spent
// transaction we should reject the transaction as it can never be mined/successful. To check which case it is we are looping
// over all the input transactions and check that they can be found, if they all can be found then it must be - spending already
// spend input.
func (p *Processor) CheckDoubleSpending(ctx context.Context, msg *TxStatusMessage) error {
if msg.Status == metamorph_api.Status_SEEN_IN_ORPHAN_MEMPOOL {
// we must have tx already in db
data, err := p.store.Get(ctx, msg.Hash[:])
if err != nil {
return err
}

// we must be able to decode it
tx, err := sdkTx.NewTransactionFromBytes(data.RawTx)
if err != nil {
return err
}

for _, input := range tx.Inputs {
inputTXID := input.SourceTXID
inputTX, err := p.nodeClient.GetRawTransaction(ctx, hex.EncodeToString(inputTXID))
if err != nil {
return err
}

if inputTX == nil {
// so if one of those transactions cannot be found the status was initially correct, it's orphaned transaction
return nil
}
}

// if we found all the input transactions then it must be spending already spent tx output so change status to REJECTED
msg.Status = metamorph_api.Status_REJECTED
}
return nil
}

// GetPeers returns a list of connected and a list of disconnected peers
func (p *Processor) GetPeers() []p2p.PeerI {
return p.pm.GetPeers()
Expand Down
16 changes: 9 additions & 7 deletions internal/metamorph/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestNewProcessor(t *testing.T) {
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
// when
sut, actualErr := metamorph.NewProcessor(tc.store, cStore, tc.pm, nil,
sut, actualErr := metamorph.NewProcessor(tc.store, cStore, tc.pm, nil, nil,
metamorph.WithCacheExpiryTime(time.Second*5),
metamorph.WithProcessorLogger(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: metamorph.LogLevelDefault}))),
)
Expand Down Expand Up @@ -128,7 +128,7 @@ func TestStartLockTransactions(t *testing.T) {
cStore := cache.NewMemoryStore()

// when
sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil, metamorph.WithLockTxsInterval(20*time.Millisecond))
sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil, nil, metamorph.WithLockTxsInterval(20*time.Millisecond))
require.NoError(t, err)
defer sut.Shutdown()
sut.StartLockTransactions()
Expand Down Expand Up @@ -244,7 +244,7 @@ func TestProcessTransaction(t *testing.T) {
},
}

sut, err := metamorph.NewProcessor(s, cStore, pm, nil, metamorph.WithMessageQueueClient(publisher))
sut, err := metamorph.NewProcessor(s, cStore, pm, nil, nil, metamorph.WithMessageQueueClient(publisher))
require.NoError(t, err)
require.Equal(t, 0, sut.GetProcessorMapSize())

Expand Down Expand Up @@ -543,6 +543,7 @@ func TestStartSendStatusForTransaction(t *testing.T) {
metamorphStore,
cStore,
pm,
nil,
statusMessageChannel,
metamorph.WithNow(func() time.Time { return time.Date(2023, 10, 1, 13, 0, 0, 0, time.UTC) }),
metamorph.WithProcessStatusUpdatesInterval(200*time.Millisecond),
Expand Down Expand Up @@ -694,7 +695,7 @@ func TestStartProcessSubmittedTxs(t *testing.T) {
}
const submittedTxsBuffer = 5
submittedTxsChan := make(chan *metamorph_api.TransactionRequest, submittedTxsBuffer)
sut, err := metamorph.NewProcessor(s, cStore, pm, nil,
sut, err := metamorph.NewProcessor(s, cStore, pm, nil, nil,
metamorph.WithMessageQueueClient(publisher),
metamorph.WithSubmittedTxsChan(submittedTxsChan),
metamorph.WithProcessStatusUpdatesInterval(20*time.Millisecond),
Expand Down Expand Up @@ -822,7 +823,7 @@ func TestProcessExpiredTransactions(t *testing.T) {
},
}

sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil,
sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil, nil,
metamorph.WithMessageQueueClient(publisher),
metamorph.WithProcessExpiredTxsInterval(time.Millisecond*20),
metamorph.WithMaxRetries(10),
Expand Down Expand Up @@ -913,6 +914,7 @@ func TestStartProcessMinedCallbacks(t *testing.T) {
cStore,
pm,
nil,
nil,
metamorph.WithMinedTxsChan(minedTxsChan),
metamorph.WithProcessMinedBatchSize(tc.processMinedBatchSize),
metamorph.WithProcessMinedInterval(tc.processMinedInterval),
Expand Down Expand Up @@ -1000,7 +1002,7 @@ func TestProcessorHealth(t *testing.T) {
}
cStore := cache.NewMemoryStore()

sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil,
sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil, nil,
metamorph.WithProcessExpiredTxsInterval(time.Millisecond*20),
metamorph.WithNow(func() time.Time {
return time.Date(2033, 1, 1, 1, 0, 0, 0, time.UTC)
Expand Down Expand Up @@ -1078,7 +1080,7 @@ func TestStart(t *testing.T) {
submittedTxsChan := make(chan *metamorph_api.TransactionRequest, 2)
minedTxsChan := make(chan *blocktx_api.TransactionBlock, 2)

sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil,
sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, nil, nil,
metamorph.WithMessageQueueClient(mqClient),
metamorph.WithSubmittedTxsChan(submittedTxsChan),
metamorph.WithMinedTxsChan(minedTxsChan),
Expand Down
2 changes: 1 addition & 1 deletion internal/metamorph/stats_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestStartCollectStats(t *testing.T) {

pm := &mocks.PeerManagerMock{ShutdownFunc: func() {}}

processor, err := metamorph.NewProcessor(mtmStore, nil, pm, nil,
processor, err := metamorph.NewProcessor(mtmStore, nil, pm, nil, nil,
metamorph.WithProcessorLogger(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: metamorph.LogLevelDefault}))),
metamorph.WithStatCollectionInterval(10*time.Millisecond),
)
Expand Down

0 comments on commit c4c2c3d

Please sign in to comment.