Skip to content

Commit

Permalink
ARCO-184: minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadiuszos4chain committed Sep 2, 2024
1 parent 0fc2285 commit fb4e1d9
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 41 deletions.
25 changes: 13 additions & 12 deletions internal/callbacker/store/postgresql/internal/tests/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@ func CallbackRecordEqual(a, b *store.CallbackData) bool {
func ReadAllCallbacks(t *testing.T, db *sql.DB) []*store.CallbackData {
t.Helper()

r, err := db.Query(`SELECT url
,token
,tx_id
,tx_status
,extra_info
,merkle_path
,block_hash
,block_height
,timestamp
,competing_txs
FROM callbacker.callbacks`,
r, err := db.Query(
`SELECT url
,token
,tx_id
,tx_status
,extra_info
,merkle_path
,block_hash
,block_height
,timestamp
,competing_txs
FROM callbacker.callbacks`,
)

if err != nil {
Expand All @@ -44,7 +45,7 @@ func ReadAllCallbacks(t *testing.T, db *sql.DB) []*store.CallbackData {
var bheight sql.NullInt64
var competingTxs sql.NullString

r.Scan(&c.Url, &c.Token, &c.TxID, &c.TxStatus, &ei, &mp, &bh, &bheight, &c.Timestamp, &competingTxs)
_ = r.Scan(&c.Url, &c.Token, &c.TxID, &c.TxStatus, &ei, &mp, &bh, &bheight, &c.Timestamp, &competingTxs)

if ei.Valid {
c.ExtraInfo = &ei.String
Expand Down
50 changes: 27 additions & 23 deletions internal/callbacker/store/postgresql/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package postgresql
import (
"context"
"database/sql"
"errors"
"fmt"
"strings"
"time"
Expand Down Expand Up @@ -31,7 +32,6 @@ func New(dbInfo string, idleConns int, maxOpenConns int) (*PostgreSQL, error) {
return &PostgreSQL{db: db}, nil
}

// It closes the connection to the underlying database.
func (p *PostgreSQL) Close() error {
return p.db.Close()
}
Expand Down Expand Up @@ -108,8 +108,7 @@ func (p *PostgreSQL) SetMany(ctx context.Context, data []*store.CallbackData) er
,UNNEST($7::TEXT[])
,UNNEST($8::BIGINT[])
,UNNEST($9::TIMESTAMPTZ[])
,UNNEST($10::TEXT[])
,UNNEST($10::TEXT[])
ON CONFLICT DO NOTHING`

_, err := p.db.ExecContext(ctx, query,
Expand All @@ -128,31 +127,37 @@ func (p *PostgreSQL) SetMany(ctx context.Context, data []*store.CallbackData) er
return err
}

func (p *PostgreSQL) PopMany(ctx context.Context, limit int) ([]*store.CallbackData, error) {
func (p *PostgreSQL) PopMany(ctx context.Context, limit int) (res []*store.CallbackData, err error) {
tx, err := p.db.Begin()
if err != nil {
return nil, err
}
defer tx.Rollback()
defer func() {
if err != nil {
if rErr := tx.Rollback(); rErr != nil {
err = errors.Join(err, fmt.Errorf("failed to rollback: %v", rErr))
}
}
}()

const q = `DELETE FROM callbacker.callbacks
WHERE ctid IN (
SELECT ctid FROM callbacker.callbacks
ORDER BY timestamp
LIMIT $1
FOR UPDATE
)
RETURNING
url
,token
,tx_id
,tx_status
,extra_info
,merkle_path
,block_hash
,block_height
,competing_txs
,timestamp`
WHERE ctid IN (
SELECT ctid FROM callbacker.callbacks
ORDER BY timestamp
LIMIT $1
FOR UPDATE
)
RETURNING
url
,token
,tx_id
,tx_status
,extra_info
,merkle_path
,block_hash
,block_height
,competing_txs
,timestamp`

rows, err := tx.QueryContext(ctx, q, limit)
if err != nil {
Expand Down Expand Up @@ -219,7 +224,6 @@ func (p *PostgreSQL) PopMany(ctx context.Context, limit int) ([]*store.CallbackD
return records, nil
}

// ptrTo returns a pointer to the given value.
func ptrTo[T any](v T) *T {
return &v
}
10 changes: 4 additions & 6 deletions internal/callbacker/store/postgresql/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,6 @@ func TestPostgresDBt(t *testing.T) {
}
}
require.NotEmpty(t, uniqueRecords)
fmt.Println(uniqueRecords)

// read all from db
dbCallbacks := tutils.ReadAllCallbacks(t, postgresDB.db)
Expand All @@ -225,13 +224,14 @@ func TestPostgresDBt(t *testing.T) {
}

if tutils.CallbackRecordEqual(ur, c) {
// remove
// remove if found
delete(uniqueRecords, ur)
break
}
}
}

// uniqueRecords map should be empty if all entries have been visited
require.Empty(t, uniqueRecords)
})

Expand Down Expand Up @@ -284,15 +284,13 @@ func TestPostgresDBt(t *testing.T) {

}

func pruneTables(t *testing.T, db *sql.DB) error {
func pruneTables(t *testing.T, db *sql.DB) {
t.Helper()

_, err := db.Exec("TRUNCATE TABLE callbacker.callbacks;")
if err != nil {
return err
t.Fatal(err)
}

return nil
}

func loadFixtures(t *testing.T, db *sql.DB, path string) {
Expand Down

0 comments on commit fb4e1d9

Please sign in to comment.