Skip to content

Commit

Permalink
Merge pull request #493 from bitcoin-sv/refactor/bulk-set
Browse files Browse the repository at this point in the history
Refactor/bulk set
  • Loading branch information
boecklim authored Jul 3, 2024
2 parents 1ebb72f + 7470c4e commit 9dafb48
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 5 deletions.
7 changes: 4 additions & 3 deletions internal/metamorph/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,14 +236,16 @@ func (p *Processor) StartProcessSubmittedTxs() {
if submittedTx == nil {
continue
}

now := p.now()
sReq := &store.StoreData{
Hash: PtrTo(chainhash.DoubleHashH(submittedTx.GetRawTx())),
Status: metamorph_api.Status_STORED,
CallbackUrl: submittedTx.GetCallbackUrl(),
CallbackToken: submittedTx.GetCallbackToken(),
FullStatusUpdates: submittedTx.GetFullStatusUpdates(),
RawTx: submittedTx.GetRawTx(),
StoredAt: now,
LastSubmittedAt: now,
}

reqs = append(reqs, sReq)
Expand Down Expand Up @@ -626,10 +628,9 @@ func (p *Processor) ProcessTransaction(req *ProcessorRequest) {
}

func (p *Processor) ProcessTransactions(sReq []*store.StoreData) {
ctx := context.Background()

// store in database
err := p.store.SetBulk(ctx, sReq)
err := p.store.SetBulk(p.ctx, sReq)
if err != nil {
p.logger.Error("Failed to bulk store txs", slog.Int("number", len(sReq)), slog.String("err", err.Error()))
return
Expand Down
5 changes: 3 additions & 2 deletions internal/metamorph/store/postgresql/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func (p *PostgreSQL) Set(ctx context.Context, value *store.StoreData) error {
return nil
}

// SetBulk bulk inserts records into the transactions table
// SetBulk bulk inserts records into the transactions table. If a record with the same hash already exists the field last_submitted_at will be overwritten with NOW()
func (p *PostgreSQL) SetBulk(ctx context.Context, data []*store.StoreData) error {
storedAt := make([]time.Time, len(data))
hashes := make([][]byte, len(data))
Expand Down Expand Up @@ -310,7 +310,7 @@ func (p *PostgreSQL) SetBulk(ctx context.Context, data []*store.StoreData) error
,last_submitted_at
)
SELECT * FROM UNNEST($1::TIMESTAMPTZ[], $2::BYTEA[], $3::INT[], $4::TEXT[], $5::TEXT[], $6::BOOL[], $7::BYTEA[], $8::TEXT[], $9::TIMESTAMPTZ[])
ON CONFLICT (hash) DO NOTHING
ON CONFLICT (hash) DO UPDATE SET last_submitted_at = $10
`
_, err := p.db.ExecContext(ctx, q,
pq.Array(storedAt),
Expand All @@ -322,6 +322,7 @@ func (p *PostgreSQL) SetBulk(ctx context.Context, data []*store.StoreData) error
pq.Array(rawTxs),
pq.Array(lockedBy),
pq.Array(lastSubmittedAt),
p.now(),
)
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions internal/metamorph/store/postgresql/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ func TestPostgresDB(t *testing.T) {
require.NotEqual(t, data[2], data2)
require.Equal(t, metamorph_api.Status_SENT_TO_NETWORK, data2.Status)
require.Equal(t, "metamorph-3", data2.LockedBy)
require.Equal(t, now, data2.LastSubmittedAt)
})

t.Run("get unmined", func(t *testing.T) {
Expand Down

0 comments on commit 9dafb48

Please sign in to comment.