Skip to content

Commit

Permalink
Merge pull request #213 from bitcoin-sv/fix/check-if-mined
Browse files Browse the repository at this point in the history
Fix/check if mined
  • Loading branch information
shotasilagadzetaal authored Dec 14, 2023
2 parents 5390786 + ce9f960 commit 30e8542
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 90 deletions.
66 changes: 34 additions & 32 deletions blocktx/store/sql/get_transaction_blocks.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package sql

import (
"database/sql"
"encoding/hex"
"fmt"
"strings"

"github.com/lib/pq"

"github.com/bitcoin-sv/arc/blocktx/blocktx_api"
"github.com/ordishs/gocore"

Expand All @@ -13,33 +16,24 @@ import (

const (
queryGetBlockHashHeightForTxHashesPostgres = `
SELECT
b.hash, b.height, t.hash
FROM blocks b
INNER JOIN block_transactions_map m ON m.blockid = b.id
INNER JOIN transactions t ON m.txid = t.id
WHERE t.hash in (%s)
AND b.orphanedyn = FALSE`
SELECT
b.hash, b.height, t.hash
FROM blocks b
INNER JOIN block_transactions_map m ON m.blockid = b.id
INNER JOIN transactions t ON m.txid = t.id
WHERE t.hash = ANY($1)
AND b.orphanedyn = FALSE`

queryGetBlockHashHeightForTxHashesSQLite = `
SELECT
b.hash, b.height, t.hash
FROM blocks b
INNER JOIN block_transactions_map m ON m.blockid = b.id
INNER JOIN transactions t ON m.txid = t.id
WHERE HEX(t.hash) in ('%s')
AND b.orphanedyn = FALSE`
SELECT
b.hash, b.height, t.hash
FROM blocks b
INNER JOIN block_transactions_map m ON m.blockid = b.id
INNER JOIN transactions t ON m.txid = t.id
WHERE HEX(t.hash) in ('%s')
AND b.orphanedyn = FALSE`
)

func getQueryPostgres(transactions *blocktx_api.Transactions) string {
var result []string
for _, v := range transactions.Transactions {
result = append(result, fmt.Sprintf("decode('%x', 'hex')", (v.Hash)))
}

return fmt.Sprintf(queryGetBlockHashHeightForTxHashesPostgres, strings.Join(result, ","))
}

func getQuerySQLite(transactions *blocktx_api.Transactions) string {
var result []string
for _, v := range transactions.Transactions {
Expand All @@ -58,24 +52,32 @@ func (s *SQL) GetTransactionBlocks(ctx context.Context, transactions *blocktx_ap
defer cancel()

results := &blocktx_api.TransactionBlocks{}
var query string
var rows *sql.Rows
var err error

switch s.engine {
case sqliteEngine:
query = getQuerySQLite(transactions)
fallthrough
case sqliteMemoryEngine:
query = getQuerySQLite(transactions)
rows, err = s.db.QueryContext(ctx, getQuerySQLite(transactions))
if err != nil {
return nil, err
}
case postgresEngine:
query = getQueryPostgres(transactions)
var hashSlice [][]byte
for _, tx := range transactions.Transactions {
hashSlice = append(hashSlice, tx.Hash)
}

rows, err = s.db.QueryContext(ctx, queryGetBlockHashHeightForTxHashesPostgres, pq.Array(hashSlice))
if err != nil {
return nil, err
}

default:
return nil, fmt.Errorf("engine not supported: %s", s.engine)
}

rows, err := s.db.QueryContext(ctx, query)
if err != nil {
return nil, err
}

defer rows.Close()

for rows.Next() {
Expand Down
76 changes: 42 additions & 34 deletions blocktx/store/sql/get_transaction_blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,44 +7,52 @@ import (

"github.com/DATA-DOG/go-sqlmock"
"github.com/bitcoin-sv/arc/blocktx/blocktx_api"
"github.com/bitcoin-sv/arc/blocktx/store"
. "github.com/bitcoin-sv/arc/database_testing"
"github.com/libsv/go-p2p/chaincfg/chainhash"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)

func TestGetFullQuery(t *testing.T) {
t.Run("test", func(t *testing.T) {
hash1, err := chainhash.NewHashFromStr("181fcd0be5a1742aabd594a5bfd5a1e7863a4583290da72fb2a896dfa824645c")
require.NoError(t, err)
hash2, err := chainhash.NewHashFromStr("2e5c318f7f2e2e80e484ca1f00f1b7bee95a33a848de572a304b973ff2b0b35b")
require.NoError(t, err)
hash3, err := chainhash.NewHashFromStr("82b0a66c5dcbd0f6f6f99e2bf766e1d40b04c175c01ee87f1abc36136e511a7e")
require.NoError(t, err)

expectedQuery := `
SELECT
b.hash, b.height, t.hash
FROM blocks b
INNER JOIN block_transactions_map m ON m.blockid = b.id
INNER JOIN transactions t ON m.txid = t.id
WHERE t.hash in (decode('5c6424a8df96a8b22fa70d2983453a86e7a1d5bfa594d5ab2a74a1e50bcd1f18', 'hex'),decode('5bb3b0f23f974b302a57de48a8335ae9beb7f1001fca84e4802e2e7f8f315c2e', 'hex'),decode('7e1a516e1336bc1a7fe81ec075c1040bd4e166f72b9ef9f6f6d0cb5d6ca6b082', 'hex'))
AND b.orphanedyn = FALSE`
transactions := &blocktx_api.Transactions{
Transactions: []*blocktx_api.Transaction{
{
Hash: hash1.CloneBytes(),
},
{
Hash: hash2.CloneBytes(),
},
{
Hash: hash3.CloneBytes(),
},
},
}
type GetTransactionBlocksSuite struct {
DatabaseTestSuite
}

func (s *GetTransactionBlocksSuite) Test() {
block := GetTestBlock()
tx := GetTestTransaction()
s.InsertBlock(block)

s.InsertTransaction(tx)

s.InsertBlockTransactionMap(&store.BlockTransactionMap{
BlockID: block.ID,
TransactionID: tx.ID,
Pos: 2,
})

store, err := NewPostgresStore(DefaultParams)
require.NoError(s.T(), err)

q := getQueryPostgres(transactions)
require.Equal(t, expectedQuery, q)
b, err := store.GetTransactionBlocks(context.Background(), &blocktx_api.Transactions{
Transactions: []*blocktx_api.Transaction{
{
Hash: []byte(tx.Hash),
},
},
})
require.NoError(s.T(), err)

require.NoError(s.T(), err)
require.Equal(s.T(), block.Hash, string(b.TransactionBlocks[0].BlockHash))

require.NoError(s.T(), err)
require.Equal(s.T(), tx.Hash, string(b.TransactionBlocks[0].TransactionHash))
}

func TestGetTransactionBlocksIntegration(t *testing.T) {
s := new(GetTransactionBlocksSuite)
suite.Run(t, s)
}

func TestGetTransactionBlocks(t *testing.T) {
Expand Down Expand Up @@ -94,7 +102,7 @@ func TestGetTransactionBlocks(t *testing.T) {
FROM blocks b
INNER JOIN block_transactions_map m ON m.blockid = b.id
INNER JOIN transactions t ON m.txid = t.id
WHERE t.hash in (decode('5c6424a8df96a8b22fa70d2983453a86e7a1d5bfa594d5ab2a74a1e50bcd1f18', 'hex'),decode('5bb3b0f23f974b302a57de48a8335ae9beb7f1001fca84e4802e2e7f8f315c2e', 'hex'),decode('7e1a516e1336bc1a7fe81ec075c1040bd4e166f72b9ef9f6f6d0cb5d6ca6b082', 'hex'))
WHERE t.hash = ANY($1)
AND b.orphanedyn = FALSE`

mock.ExpectQuery(query).WillReturnError(errors.New("db connection error"))
Expand All @@ -110,7 +118,7 @@ func TestGetTransactionBlocks(t *testing.T) {
FROM blocks b
INNER JOIN block_transactions_map m ON m.blockid = b.id
INNER JOIN transactions t ON m.txid = t.id
WHERE t.hash in (decode('5c6424a8df96a8b22fa70d2983453a86e7a1d5bfa594d5ab2a74a1e50bcd1f18', 'hex'),decode('5bb3b0f23f974b302a57de48a8335ae9beb7f1001fca84e4802e2e7f8f315c2e', 'hex'),decode('7e1a516e1336bc1a7fe81ec075c1040bd4e166f72b9ef9f6f6d0cb5d6ca6b082', 'hex'))
WHERE t.hash = ANY($1)
AND b.orphanedyn = FALSE`

mock.ExpectQuery(query).WillReturnRows(
Expand Down
3 changes: 2 additions & 1 deletion blocktx/store/sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ func NewPostgresStore(params dbconn.DBConnectionParams) (store.Interface, error)
}

return &SQL{
db: db,
db: db,
engine: postgresEngine,
}, nil
}

Expand Down
11 changes: 6 additions & 5 deletions metamorph/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
)

const (
// number of times we will retry announcing transaction if we haven't seen it on the network
// MaxRetries number of times we will retry announcing transaction if we haven't seen it on the network
MaxRetries = 15
// length of interval for checking transactions if they are seen on the network
// if not we resend them again for a few times
Expand All @@ -36,6 +36,7 @@ const (

failedToUpdateStatus = "Failed to update status"
dataRetentionPeriodDefault = 14 * 24 * time.Hour // 14 days
checkIfMinedTimeRange = time.Minute * 20
)

type Processor struct {
Expand Down Expand Up @@ -116,7 +117,7 @@ func NewProcessor(s store.MetamorphStore, pm p2p.PeerManagerI, btc blocktx.Clien

// Start a goroutine to resend transactions that have not been seen on the network
go p.processExpiredTransactions()
go p.processExpiredSeenTransactions()
go p.processCheckIfMined()

gocore.AddAppPayloadFn("mtm", func() interface{} {
return p.GetStats(false)
Expand Down Expand Up @@ -166,10 +167,10 @@ func (p *Processor) unlockItems() error {
return p.store.SetUnlocked(context.Background(), hashes)
}

func (p *Processor) processExpiredSeenTransactions() {
// filterFunc returns true if the transaction has not been seen on the network
func (p *Processor) processCheckIfMined() {
// filter for transactions which have been at least announced but not mined and which haven't started to be processed longer than a specified amount of time ago
filterFunc := func(processorResp *processor_response.ProcessorResponse) bool {
return processorResp.GetStatus() == metamorph_api.Status_SEEN_ON_NETWORK && p.now().Sub(processorResp.Start) > p.processExpiredSeenTxsInterval
return (processorResp.GetStatus() != metamorph_api.Status_MINED && processorResp.GetStatus() != metamorph_api.Status_CONFIRMED) && p.now().Sub(processorResp.Start) < checkIfMinedTimeRange
}

// Check transactions that have been seen on the network, but haven't been marked as mined
Expand Down
31 changes: 13 additions & 18 deletions metamorph/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,7 @@ func BenchmarkProcessTransaction(b *testing.B) {
time.Sleep(1 * time.Second)
}

func TestProcessExpiredSeenTransactions(t *testing.T) {
func TestProcessCheckIfMined(t *testing.T) {
txsBlocks := []*blocktx_api.TransactionBlock{
{
BlockHash: testdata.Block1Hash[:],
Expand All @@ -738,52 +738,46 @@ func TestProcessExpiredSeenTransactions(t *testing.T) {
getTransactionBlocksErr error
updateMinedErr error

expectedNrOfUpdates int
expectedNrOfBlockTxRequests int
expectedNrOfUpdates int
}{
{
name: "expired seen txs",
name: "expired txs",
blocks: txsBlocks,

expectedNrOfUpdates: 3,
expectedNrOfBlockTxRequests: 1,
expectedNrOfUpdates: 3,
},
{
name: "failed to get transaction blocks",
getTransactionBlocksErr: errors.New("failed to get transaction blocks"),

expectedNrOfUpdates: 0,
expectedNrOfBlockTxRequests: 1,
expectedNrOfUpdates: 0,
},
{
name: "failed to parse block hash",
blocks: []*blocktx_api.TransactionBlock{{
BlockHash: []byte("not a valid block hash"),
}},

expectedNrOfUpdates: 0,
expectedNrOfBlockTxRequests: 1,
expectedNrOfUpdates: 0,
},
{
name: "failed to update mined",
blocks: txsBlocks,
updateMinedErr: errors.New("failed to update mined"),

expectedNrOfUpdates: 3,
expectedNrOfBlockTxRequests: 1,
expectedNrOfUpdates: 3,
},
{
name: "failed to get tx from response map",
blocks: []*blocktx_api.TransactionBlock{
{
BlockHash: testdata.Block1Hash[:],
BlockHeight: 1234,
TransactionHash: testdata.TX4Hash[:],
TransactionHash: testdata.TX5Hash[:],
},
},

expectedNrOfUpdates: 0,
expectedNrOfBlockTxRequests: 1,
expectedNrOfUpdates: 0,
},
}

Expand Down Expand Up @@ -827,14 +821,15 @@ func TestProcessExpiredSeenTransactions(t *testing.T) {

require.Equal(t, 0, processor.ProcessorResponseMap.Len())

processor.ProcessorResponseMap.Set(testdata.TX1Hash, processor_response.NewProcessorResponseWithStatus(testdata.TX1Hash, metamorph_api.Status_SEEN_ON_NETWORK))
processor.ProcessorResponseMap.Set(testdata.TX1Hash, processor_response.NewProcessorResponseWithStatus(testdata.TX1Hash, metamorph_api.Status_STORED))
processor.ProcessorResponseMap.Set(testdata.TX2Hash, processor_response.NewProcessorResponseWithStatus(testdata.TX2Hash, metamorph_api.Status_SEEN_ON_NETWORK))
processor.ProcessorResponseMap.Set(testdata.TX3Hash, processor_response.NewProcessorResponseWithStatus(testdata.TX3Hash, metamorph_api.Status_SEEN_ON_NETWORK))
processor.ProcessorResponseMap.Set(testdata.TX3Hash, processor_response.NewProcessorResponseWithStatus(testdata.TX3Hash, metamorph_api.Status_REJECTED))
processor.ProcessorResponseMap.Set(testdata.TX4Hash, processor_response.NewProcessorResponseWithStatus(testdata.TX4Hash, metamorph_api.Status_MINED))

time.Sleep(25 * time.Millisecond)

require.Equal(t, tc.expectedNrOfUpdates, len(metamorphStore.UpdateMinedCalls()))
require.Equal(t, tc.expectedNrOfBlockTxRequests, len(btxMock.GetTransactionBlocksCalls()))
require.Equal(t, 1, len(btxMock.GetTransactionBlocksCalls()))
})
}
}
Expand Down
3 changes: 3 additions & 0 deletions testdata/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ var (
TX4 = "88eab41a8d0b7b4bc395f8f988ea3d6e63c8bc339526fd2f00cb7ce6fd7df0f7"
TX4Hash, _ = chainhash.NewHashFromStr(TX4)

TX5 = "df931ab7d4ff0bbf96ff186f221c466f09c052c5331733641040defabf9dcd93"
TX5Hash, _ = chainhash.NewHashFromStr(TX5)

Time = time.Date(2009, 1, 03, 18, 15, 05, 0, time.UTC)
DefaultPolicy = `{"excessiveblocksize":2000000000,"blockmaxsize":512000000,"maxtxsizepolicy":10000000,"maxorphantxsize":1000000000,"datacarriersize":4294967295,"maxscriptsizepolicy":500000,"maxopsperscriptpolicy":4294967295,"maxscriptnumlengthpolicy":10000,"maxpubkeyspermultisigpolicy":4294967295,"maxtxsigopscountspolicy":4294967295,"maxstackmemoryusagepolicy":100000000,"maxstackmemoryusageconsensus":200000000,"limitancestorcount":10000,"limitcpfpgroupmemberscount":25,"maxmempool":2000000000,"maxmempoolsizedisk":0,"mempoolmaxpercentcpfp":10,"acceptnonstdoutputs":true,"datacarrier":true,"minminingtxfee":5e-7,"maxstdtxvalidationduration":3,"maxnonstdtxvalidationduration":1000,"maxtxchainvalidationbudget":50,"validationclockcpu":true,"minconsolidationfactor":20,"maxconsolidationinputscriptsize":150,"minconfconsolidationinput":6,"minconsolidationinputmaturity":6,"acceptnonstdconsolidationinput":false}`
)

0 comments on commit 30e8542

Please sign in to comment.