diff --git a/blocktx/store/sql/get_transaction_blocks.go b/blocktx/store/sql/get_transaction_blocks.go index 09be0d118..7f8a3de37 100644 --- a/blocktx/store/sql/get_transaction_blocks.go +++ b/blocktx/store/sql/get_transaction_blocks.go @@ -1,10 +1,13 @@ package sql import ( + "database/sql" "encoding/hex" "fmt" "strings" + "github.com/lib/pq" + "github.com/bitcoin-sv/arc/blocktx/blocktx_api" "github.com/ordishs/gocore" @@ -13,33 +16,24 @@ import ( const ( queryGetBlockHashHeightForTxHashesPostgres = ` -SELECT -b.hash, b.height, t.hash -FROM blocks b -INNER JOIN block_transactions_map m ON m.blockid = b.id -INNER JOIN transactions t ON m.txid = t.id -WHERE t.hash in (%s) -AND b.orphanedyn = FALSE` + SELECT + b.hash, b.height, t.hash + FROM blocks b + INNER JOIN block_transactions_map m ON m.blockid = b.id + INNER JOIN transactions t ON m.txid = t.id + WHERE t.hash = ANY($1) + AND b.orphanedyn = FALSE` queryGetBlockHashHeightForTxHashesSQLite = ` -SELECT -b.hash, b.height, t.hash -FROM blocks b -INNER JOIN block_transactions_map m ON m.blockid = b.id -INNER JOIN transactions t ON m.txid = t.id -WHERE HEX(t.hash) in ('%s') -AND b.orphanedyn = FALSE` + SELECT + b.hash, b.height, t.hash + FROM blocks b + INNER JOIN block_transactions_map m ON m.blockid = b.id + INNER JOIN transactions t ON m.txid = t.id + WHERE HEX(t.hash) in ('%s') + AND b.orphanedyn = FALSE` ) -func getQueryPostgres(transactions *blocktx_api.Transactions) string { - var result []string - for _, v := range transactions.Transactions { - result = append(result, fmt.Sprintf("decode('%x', 'hex')", (v.Hash))) - } - - return fmt.Sprintf(queryGetBlockHashHeightForTxHashesPostgres, strings.Join(result, ",")) -} - func getQuerySQLite(transactions *blocktx_api.Transactions) string { var result []string for _, v := range transactions.Transactions { @@ -58,24 +52,32 @@ func (s *SQL) GetTransactionBlocks(ctx context.Context, transactions *blocktx_ap defer cancel() results := &blocktx_api.TransactionBlocks{} - var query string + var rows *sql.Rows + var err error switch s.engine { case sqliteEngine: - query = getQuerySQLite(transactions) + fallthrough case sqliteMemoryEngine: - query = getQuerySQLite(transactions) + rows, err = s.db.QueryContext(ctx, getQuerySQLite(transactions)) + if err != nil { + return nil, err + } case postgresEngine: - query = getQueryPostgres(transactions) + var hashSlice [][]byte + for _, tx := range transactions.Transactions { + hashSlice = append(hashSlice, tx.Hash) + } + + rows, err = s.db.QueryContext(ctx, queryGetBlockHashHeightForTxHashesPostgres, pq.Array(hashSlice)) + if err != nil { + return nil, err + } + default: return nil, fmt.Errorf("engine not supported: %s", s.engine) } - rows, err := s.db.QueryContext(ctx, query) - if err != nil { - return nil, err - } - defer rows.Close() for rows.Next() { diff --git a/blocktx/store/sql/get_transaction_blocks_test.go b/blocktx/store/sql/get_transaction_blocks_test.go index f8e1cdf5b..d1f8d6a1a 100644 --- a/blocktx/store/sql/get_transaction_blocks_test.go +++ b/blocktx/store/sql/get_transaction_blocks_test.go @@ -7,44 +7,52 @@ import ( "github.com/DATA-DOG/go-sqlmock" "github.com/bitcoin-sv/arc/blocktx/blocktx_api" + "github.com/bitcoin-sv/arc/blocktx/store" + . "github.com/bitcoin-sv/arc/database_testing" "github.com/libsv/go-p2p/chaincfg/chainhash" "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" ) -func TestGetFullQuery(t *testing.T) { - t.Run("test", func(t *testing.T) { - hash1, err := chainhash.NewHashFromStr("181fcd0be5a1742aabd594a5bfd5a1e7863a4583290da72fb2a896dfa824645c") - require.NoError(t, err) - hash2, err := chainhash.NewHashFromStr("2e5c318f7f2e2e80e484ca1f00f1b7bee95a33a848de572a304b973ff2b0b35b") - require.NoError(t, err) - hash3, err := chainhash.NewHashFromStr("82b0a66c5dcbd0f6f6f99e2bf766e1d40b04c175c01ee87f1abc36136e511a7e") - require.NoError(t, err) - - expectedQuery := ` -SELECT -b.hash, b.height, t.hash -FROM blocks b -INNER JOIN block_transactions_map m ON m.blockid = b.id -INNER JOIN transactions t ON m.txid = t.id -WHERE t.hash in (decode('5c6424a8df96a8b22fa70d2983453a86e7a1d5bfa594d5ab2a74a1e50bcd1f18', 'hex'),decode('5bb3b0f23f974b302a57de48a8335ae9beb7f1001fca84e4802e2e7f8f315c2e', 'hex'),decode('7e1a516e1336bc1a7fe81ec075c1040bd4e166f72b9ef9f6f6d0cb5d6ca6b082', 'hex')) -AND b.orphanedyn = FALSE` - transactions := &blocktx_api.Transactions{ - Transactions: []*blocktx_api.Transaction{ - { - Hash: hash1.CloneBytes(), - }, - { - Hash: hash2.CloneBytes(), - }, - { - Hash: hash3.CloneBytes(), - }, - }, - } +type GetTransactionBlocksSuite struct { + DatabaseTestSuite +} + +func (s *GetTransactionBlocksSuite) Test() { + block := GetTestBlock() + tx := GetTestTransaction() + s.InsertBlock(block) + + s.InsertTransaction(tx) + + s.InsertBlockTransactionMap(&store.BlockTransactionMap{ + BlockID: block.ID, + TransactionID: tx.ID, + Pos: 2, + }) + + store, err := NewPostgresStore(DefaultParams) + require.NoError(s.T(), err) - q := getQueryPostgres(transactions) - require.Equal(t, expectedQuery, q) + b, err := store.GetTransactionBlocks(context.Background(), &blocktx_api.Transactions{ + Transactions: []*blocktx_api.Transaction{ + { + Hash: []byte(tx.Hash), + }, + }, }) + require.NoError(s.T(), err) + + require.NoError(s.T(), err) + require.Equal(s.T(), block.Hash, string(b.TransactionBlocks[0].BlockHash)) + + require.NoError(s.T(), err) + require.Equal(s.T(), tx.Hash, string(b.TransactionBlocks[0].TransactionHash)) +} + +func TestGetTransactionBlocksIntegration(t *testing.T) { + s := new(GetTransactionBlocksSuite) + suite.Run(t, s) } func TestGetTransactionBlocks(t *testing.T) { @@ -94,7 +102,7 @@ func TestGetTransactionBlocks(t *testing.T) { FROM blocks b INNER JOIN block_transactions_map m ON m.blockid = b.id INNER JOIN transactions t ON m.txid = t.id - WHERE t.hash in (decode('5c6424a8df96a8b22fa70d2983453a86e7a1d5bfa594d5ab2a74a1e50bcd1f18', 'hex'),decode('5bb3b0f23f974b302a57de48a8335ae9beb7f1001fca84e4802e2e7f8f315c2e', 'hex'),decode('7e1a516e1336bc1a7fe81ec075c1040bd4e166f72b9ef9f6f6d0cb5d6ca6b082', 'hex')) + WHERE t.hash = ANY($1) AND b.orphanedyn = FALSE` mock.ExpectQuery(query).WillReturnError(errors.New("db connection error")) @@ -110,7 +118,7 @@ func TestGetTransactionBlocks(t *testing.T) { FROM blocks b INNER JOIN block_transactions_map m ON m.blockid = b.id INNER JOIN transactions t ON m.txid = t.id - WHERE t.hash in (decode('5c6424a8df96a8b22fa70d2983453a86e7a1d5bfa594d5ab2a74a1e50bcd1f18', 'hex'),decode('5bb3b0f23f974b302a57de48a8335ae9beb7f1001fca84e4802e2e7f8f315c2e', 'hex'),decode('7e1a516e1336bc1a7fe81ec075c1040bd4e166f72b9ef9f6f6d0cb5d6ca6b082', 'hex')) + WHERE t.hash = ANY($1) AND b.orphanedyn = FALSE` mock.ExpectQuery(query).WillReturnRows( diff --git a/blocktx/store/sql/sql.go b/blocktx/store/sql/sql.go index 108f26149..51d6c2f7d 100644 --- a/blocktx/store/sql/sql.go +++ b/blocktx/store/sql/sql.go @@ -40,7 +40,8 @@ func NewPostgresStore(params dbconn.DBConnectionParams) (store.Interface, error) } return &SQL{ - db: db, + db: db, + engine: postgresEngine, }, nil } diff --git a/metamorph/processor.go b/metamorph/processor.go index eea13e6ec..cba154bff 100644 --- a/metamorph/processor.go +++ b/metamorph/processor.go @@ -25,7 +25,7 @@ import ( ) const ( - // number of times we will retry announcing transaction if we haven't seen it on the network + // MaxRetries number of times we will retry announcing transaction if we haven't seen it on the network MaxRetries = 15 // length of interval for checking transactions if they are seen on the network // if not we resend them again for a few times @@ -36,6 +36,7 @@ const ( failedToUpdateStatus = "Failed to update status" dataRetentionPeriodDefault = 14 * 24 * time.Hour // 14 days + checkIfMinedTimeRange = time.Minute * 20 ) type Processor struct { @@ -116,7 +117,7 @@ func NewProcessor(s store.MetamorphStore, pm p2p.PeerManagerI, btc blocktx.Clien // Start a goroutine to resend transactions that have not been seen on the network go p.processExpiredTransactions() - go p.processExpiredSeenTransactions() + go p.processCheckIfMined() gocore.AddAppPayloadFn("mtm", func() interface{} { return p.GetStats(false) @@ -166,10 +167,10 @@ func (p *Processor) unlockItems() error { return p.store.SetUnlocked(context.Background(), hashes) } -func (p *Processor) processExpiredSeenTransactions() { - // filterFunc returns true if the transaction has not been seen on the network +func (p *Processor) processCheckIfMined() { + // filter for transactions which have been at least announced but not mined and which haven't started to be processed longer than a specified amount of time ago filterFunc := func(processorResp *processor_response.ProcessorResponse) bool { - return processorResp.GetStatus() == metamorph_api.Status_SEEN_ON_NETWORK && p.now().Sub(processorResp.Start) > p.processExpiredSeenTxsInterval + return (processorResp.GetStatus() != metamorph_api.Status_MINED && processorResp.GetStatus() != metamorph_api.Status_CONFIRMED) && p.now().Sub(processorResp.Start) < checkIfMinedTimeRange } // Check transactions that have been seen on the network, but haven't been marked as mined diff --git a/metamorph/processor_test.go b/metamorph/processor_test.go index a53588dbd..7be2c24ff 100644 --- a/metamorph/processor_test.go +++ b/metamorph/processor_test.go @@ -713,7 +713,7 @@ func BenchmarkProcessTransaction(b *testing.B) { time.Sleep(1 * time.Second) } -func TestProcessExpiredSeenTransactions(t *testing.T) { +func TestProcessCheckIfMined(t *testing.T) { txsBlocks := []*blocktx_api.TransactionBlock{ { BlockHash: testdata.Block1Hash[:], @@ -738,22 +738,19 @@ func TestProcessExpiredSeenTransactions(t *testing.T) { getTransactionBlocksErr error updateMinedErr error - expectedNrOfUpdates int - expectedNrOfBlockTxRequests int + expectedNrOfUpdates int }{ { - name: "expired seen txs", + name: "expired txs", blocks: txsBlocks, - expectedNrOfUpdates: 3, - expectedNrOfBlockTxRequests: 1, + expectedNrOfUpdates: 3, }, { name: "failed to get transaction blocks", getTransactionBlocksErr: errors.New("failed to get transaction blocks"), - expectedNrOfUpdates: 0, - expectedNrOfBlockTxRequests: 1, + expectedNrOfUpdates: 0, }, { name: "failed to parse block hash", @@ -761,16 +758,14 @@ func TestProcessExpiredSeenTransactions(t *testing.T) { BlockHash: []byte("not a valid block hash"), }}, - expectedNrOfUpdates: 0, - expectedNrOfBlockTxRequests: 1, + expectedNrOfUpdates: 0, }, { name: "failed to update mined", blocks: txsBlocks, updateMinedErr: errors.New("failed to update mined"), - expectedNrOfUpdates: 3, - expectedNrOfBlockTxRequests: 1, + expectedNrOfUpdates: 3, }, { name: "failed to get tx from response map", @@ -778,12 +773,11 @@ func TestProcessExpiredSeenTransactions(t *testing.T) { { BlockHash: testdata.Block1Hash[:], BlockHeight: 1234, - TransactionHash: testdata.TX4Hash[:], + TransactionHash: testdata.TX5Hash[:], }, }, - expectedNrOfUpdates: 0, - expectedNrOfBlockTxRequests: 1, + expectedNrOfUpdates: 0, }, } @@ -827,14 +821,15 @@ func TestProcessExpiredSeenTransactions(t *testing.T) { require.Equal(t, 0, processor.ProcessorResponseMap.Len()) - processor.ProcessorResponseMap.Set(testdata.TX1Hash, processor_response.NewProcessorResponseWithStatus(testdata.TX1Hash, metamorph_api.Status_SEEN_ON_NETWORK)) + processor.ProcessorResponseMap.Set(testdata.TX1Hash, processor_response.NewProcessorResponseWithStatus(testdata.TX1Hash, metamorph_api.Status_STORED)) processor.ProcessorResponseMap.Set(testdata.TX2Hash, processor_response.NewProcessorResponseWithStatus(testdata.TX2Hash, metamorph_api.Status_SEEN_ON_NETWORK)) - processor.ProcessorResponseMap.Set(testdata.TX3Hash, processor_response.NewProcessorResponseWithStatus(testdata.TX3Hash, metamorph_api.Status_SEEN_ON_NETWORK)) + processor.ProcessorResponseMap.Set(testdata.TX3Hash, processor_response.NewProcessorResponseWithStatus(testdata.TX3Hash, metamorph_api.Status_REJECTED)) + processor.ProcessorResponseMap.Set(testdata.TX4Hash, processor_response.NewProcessorResponseWithStatus(testdata.TX4Hash, metamorph_api.Status_MINED)) time.Sleep(25 * time.Millisecond) require.Equal(t, tc.expectedNrOfUpdates, len(metamorphStore.UpdateMinedCalls())) - require.Equal(t, tc.expectedNrOfBlockTxRequests, len(btxMock.GetTransactionBlocksCalls())) + require.Equal(t, 1, len(btxMock.GetTransactionBlocksCalls())) }) } } diff --git a/testdata/data.go b/testdata/data.go index ee32dc9dc..49f04490d 100644 --- a/testdata/data.go +++ b/testdata/data.go @@ -27,6 +27,9 @@ var ( TX4 = "88eab41a8d0b7b4bc395f8f988ea3d6e63c8bc339526fd2f00cb7ce6fd7df0f7" TX4Hash, _ = chainhash.NewHashFromStr(TX4) + TX5 = "df931ab7d4ff0bbf96ff186f221c466f09c052c5331733641040defabf9dcd93" + TX5Hash, _ = chainhash.NewHashFromStr(TX5) + Time = time.Date(2009, 1, 03, 18, 15, 05, 0, time.UTC) DefaultPolicy = `{"excessiveblocksize":2000000000,"blockmaxsize":512000000,"maxtxsizepolicy":10000000,"maxorphantxsize":1000000000,"datacarriersize":4294967295,"maxscriptsizepolicy":500000,"maxopsperscriptpolicy":4294967295,"maxscriptnumlengthpolicy":10000,"maxpubkeyspermultisigpolicy":4294967295,"maxtxsigopscountspolicy":4294967295,"maxstackmemoryusagepolicy":100000000,"maxstackmemoryusageconsensus":200000000,"limitancestorcount":10000,"limitcpfpgroupmemberscount":25,"maxmempool":2000000000,"maxmempoolsizedisk":0,"mempoolmaxpercentcpfp":10,"acceptnonstdoutputs":true,"datacarrier":true,"minminingtxfee":5e-7,"maxstdtxvalidationduration":3,"maxnonstdtxvalidationduration":1000,"maxtxchainvalidationbudget":50,"validationclockcpu":true,"minconsolidationfactor":20,"maxconsolidationinputscriptsize":150,"minconfconsolidationinput":6,"minconsolidationinputmaturity":6,"acceptnonstdconsolidationinput":false}` )