Skip to content

Commit

Permalink
Merge pull request #326 from bitcoin-sv/fix/set-locked-since
Browse files Browse the repository at this point in the history
fix metamorph updates
  • Loading branch information
boecklim authored Mar 12, 2024
2 parents c3f10b3 + f370c8f commit ad8c94d
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 13 deletions.
14 changes: 10 additions & 4 deletions metamorph/mocks/store_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion metamorph/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,8 @@ func (p *Processor) StartLockTransactions() {
case <-p.quitLockTransactions:
return
case <-ticker.C:
err := p.store.SetLocked(dbctx, loadUnminedLimit)
expiredSince := p.now().Add(-1 * p.mapExpiryTime)
err := p.store.SetLocked(dbctx, expiredSince, loadUnminedLimit)
if err != nil {
p.logger.Error("Failed to set transactions locked", slog.String("err", err.Error()))
}
Expand Down
2 changes: 1 addition & 1 deletion metamorph/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func TestStartLockTransactions(t *testing.T) {
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
metamorphStore := &mocks.MetamorphStoreMock{
SetLockedFunc: func(ctx context.Context, limit int64) error {
SetLockedFunc: func(ctx context.Context, since time.Time, limit int64) error {
require.Equal(t, int64(5000), limit)
return tc.setLockedErr
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@
stored_at: 2023-10-01 14:00:00
announced_at: 2023-10-01 14:05:00
inserted_at_num: 2023100114
- hash: 0x392ea135503bf49f46eb421af01153a2bdcedeba0ee13aa1dd36983861cc5ddd
locked_by: NONE
status: 6
stored_at: 2023-09-01 14:00:00
announced_at: 2023-09-01 14:00:00
inserted_at_num: 2023090114
- hash: 0xa8b965b5901163a9bdcd38d2ad524c3bb27ae31fb86dc8947253b541af8dd308
locked_by: NONE
status: 9
Expand Down
9 changes: 5 additions & 4 deletions metamorph/store/postgresql/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func (p *PostgreSQL) Set(ctx context.Context, _ []byte, value *store.StoreData)
return err
}

func (p *PostgreSQL) SetLocked(ctx context.Context, limit int64) error {
func (p *PostgreSQL) SetLocked(ctx context.Context, since time.Time, limit int64) error {
startNanos := p.now().UnixNano()
defer func() {
gocore.NewStat("mtm_store_sql").NewStat("setlockedby").AddTime(startNanos)
Expand All @@ -358,12 +358,13 @@ func (p *PostgreSQL) SetLocked(ctx context.Context, limit int64) error {
FROM metamorph.transactions t2
WHERE t2.locked_by = 'NONE'
AND (t2.status < $3 OR t2.status = $4)
AND inserted_at_num > $5
LIMIT $2
FOR UPDATE SKIP LOCKED
);
;`

_, err := p.db.ExecContext(ctx, q, p.hostname, limit, metamorph_api.Status_SEEN_ON_NETWORK, metamorph_api.Status_SEEN_IN_ORPHAN_MEMPOOL)
_, err := p.db.ExecContext(ctx, q, p.hostname, limit, metamorph_api.Status_SEEN_ON_NETWORK, metamorph_api.Status_SEEN_IN_ORPHAN_MEMPOOL, since.Format(numericalDateHourLayout))
if err != nil {
return err
}
Expand Down Expand Up @@ -444,7 +445,7 @@ func (p *PostgreSQL) UpdateStatusBulk(ctx context.Context, updates []store.Updat
AS t(hash, status, reject_reason)
) AS bulk_query
WHERE
metamorph.transactions.hash=bulk_query.hash
metamorph.transactions.hash=bulk_query.hash AND metamorph.transactions.locked_by = $4
RETURNING metamorph.transactions.stored_at
,metamorph.transactions.announced_at
,metamorph.transactions.mined_at
Expand All @@ -463,7 +464,7 @@ func (p *PostgreSQL) UpdateStatusBulk(ctx context.Context, updates []store.Updat
;
`

rows, err := p.db.QueryContext(ctx, qBulk, pq.Array(txHashes), pq.Array(statuses), pq.Array(rejectReasons))
rows, err := p.db.QueryContext(ctx, qBulk, pq.Array(txHashes), pq.Array(statuses), pq.Array(rejectReasons), p.hostname)
if err != nil {
return nil, err
}
Expand Down
12 changes: 10 additions & 2 deletions metamorph/store/postgresql/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ func TestPostgresDB(t *testing.T) {

require.NoError(t, loadFixtures(postgresDB.db, "fixtures"))

err := postgresDB.SetLocked(ctx, 2)
err := postgresDB.SetLocked(ctx, time.Date(2023, 9, 15, 1, 0, 0, 0, time.UTC), 2)
require.NoError(t, err)

// locked by NONE
Expand Down Expand Up @@ -362,6 +362,10 @@ func TestPostgresDB(t *testing.T) {
err = postgresDB.Set(ctx, testdata.TX6Hash[:], tx6Data)
require.NoError(t, err)

// will not be updated because it is locked by metamorph-3
metamorph3Hash, err := chainhash.NewHashFromStr("538808e847d0add40ed9622fff53954c79e1f52db7c47ea0b6cdc0df972f3dcd")
require.NoError(t, err)

updates := []store.UpdateStatus{
{
Hash: *testdata.TX1Hash,
Expand All @@ -381,6 +385,10 @@ func TestPostgresDB(t *testing.T) {
Hash: *testdata.TX4Hash, // hash non-existent in db
Status: metamorph_api.Status_ANNOUNCED_TO_NETWORK,
},
{
Hash: *metamorph3Hash,
Status: metamorph_api.Status_MINED,
},
}

statusUpdates, err := postgresDB.UpdateStatusBulk(ctx, updates)
Expand Down Expand Up @@ -492,7 +500,7 @@ func TestPostgresDB(t *testing.T) {

res, err := postgresDB.ClearData(ctx, 14)
require.NoError(t, err)
require.Equal(t, int64(3), res)
require.Equal(t, int64(4), res)

var numberOfRemainingTxs int
err = postgresDB.db.QueryRowContext(ctx, "SELECT count(*) FROM metamorph.transactions;").Scan(&numberOfRemainingTxs)
Expand Down
2 changes: 1 addition & 1 deletion metamorph/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type MetamorphStore interface {
Del(ctx context.Context, key []byte) error

SetUnlocked(ctx context.Context, hashes []*chainhash.Hash) error
SetLocked(ctx context.Context, limit int64) error
SetLocked(ctx context.Context, since time.Time, limit int64) error
IncrementRetries(ctx context.Context, hash *chainhash.Hash) error
SetUnlockedByName(ctx context.Context, lockedBy string) (int64, error)
GetUnmined(ctx context.Context, since time.Time, limit int64, offset int64) ([]*StoreData, error)
Expand Down

0 comments on commit ad8c94d

Please sign in to comment.