diff --git a/internal/metamorph/processor.go b/internal/metamorph/processor.go index 62d531f3e..3ddb4e909 100644 --- a/internal/metamorph/processor.go +++ b/internal/metamorph/processor.go @@ -236,7 +236,7 @@ func (p *Processor) StartProcessSubmittedTxs() { if submittedTx == nil { continue } - + now := p.now() sReq := &store.StoreData{ Hash: PtrTo(chainhash.DoubleHashH(submittedTx.GetRawTx())), Status: metamorph_api.Status_STORED, @@ -244,6 +244,8 @@ func (p *Processor) StartProcessSubmittedTxs() { CallbackToken: submittedTx.GetCallbackToken(), FullStatusUpdates: submittedTx.GetFullStatusUpdates(), RawTx: submittedTx.GetRawTx(), + StoredAt: now, + LastSubmittedAt: now, } reqs = append(reqs, sReq) @@ -626,10 +628,9 @@ func (p *Processor) ProcessTransaction(req *ProcessorRequest) { } func (p *Processor) ProcessTransactions(sReq []*store.StoreData) { - ctx := context.Background() // store in database - err := p.store.SetBulk(ctx, sReq) + err := p.store.SetBulk(p.ctx, sReq) if err != nil { p.logger.Error("Failed to bulk store txs", slog.Int("number", len(sReq)), slog.String("err", err.Error())) return diff --git a/internal/metamorph/store/postgresql/postgres.go b/internal/metamorph/store/postgresql/postgres.go index 6cecb5c5e..e377f0e9b 100644 --- a/internal/metamorph/store/postgresql/postgres.go +++ b/internal/metamorph/store/postgresql/postgres.go @@ -275,7 +275,7 @@ func (p *PostgreSQL) Set(ctx context.Context, value *store.StoreData) error { return nil } -// SetBulk bulk inserts records into the transactions table +// SetBulk bulk inserts records into the transactions table. If a record with the same hash already exists the field last_submitted_at will be overwritten with NOW() func (p *PostgreSQL) SetBulk(ctx context.Context, data []*store.StoreData) error { storedAt := make([]time.Time, len(data)) hashes := make([][]byte, len(data)) @@ -310,7 +310,7 @@ func (p *PostgreSQL) SetBulk(ctx context.Context, data []*store.StoreData) error ,last_submitted_at ) SELECT * FROM UNNEST($1::TIMESTAMPTZ[], $2::BYTEA[], $3::INT[], $4::TEXT[], $5::TEXT[], $6::BOOL[], $7::BYTEA[], $8::TEXT[], $9::TIMESTAMPTZ[]) - ON CONFLICT (hash) DO NOTHING + ON CONFLICT (hash) DO UPDATE SET last_submitted_at = $10 ` _, err := p.db.ExecContext(ctx, q, pq.Array(storedAt), @@ -322,6 +322,7 @@ func (p *PostgreSQL) SetBulk(ctx context.Context, data []*store.StoreData) error pq.Array(rawTxs), pq.Array(lockedBy), pq.Array(lastSubmittedAt), + p.now(), ) if err != nil { return err diff --git a/internal/metamorph/store/postgresql/postgres_test.go b/internal/metamorph/store/postgresql/postgres_test.go index a490b9be5..6b23c9fcc 100644 --- a/internal/metamorph/store/postgresql/postgres_test.go +++ b/internal/metamorph/store/postgresql/postgres_test.go @@ -296,6 +296,7 @@ func TestPostgresDB(t *testing.T) { require.NotEqual(t, data[2], data2) require.Equal(t, metamorph_api.Status_SENT_TO_NETWORK, data2.Status) require.Equal(t, "metamorph-3", data2.LockedBy) + require.Equal(t, now, data2.LastSubmittedAt) }) t.Run("get unmined", func(t *testing.T) {