Skip to content

Commit

Permalink
ARCO-194: update status history in transactions (#581)
Browse files Browse the repository at this point in the history
* ARCO-193: remove unused columns add status history and last modified

* ARCO-193: remove unnecessary comments

* ARCO-194: update status history

* ARCO-194: fix tests error

* ARCO-194: fix e2e tests

* ARCO-194: fix PR comments

* ARCO-194: fix unit tests

* ARCO-194: replace LastModified with actual time in tests
  • Loading branch information
pawellewandowski98 authored Sep 9, 2024
1 parent b51638e commit 21cc7a5
Show file tree
Hide file tree
Showing 8 changed files with 192 additions and 55 deletions.
28 changes: 14 additions & 14 deletions internal/metamorph/metamorph_api/metamorph_api.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 7 additions & 7 deletions internal/metamorph/metamorph_api/metamorph_api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,13 @@ message Transaction {
message TransactionStatus {
bool timed_out = 1;
google.protobuf.Timestamp stored_at = 2;
string txid = 5;
Status status = 6;
string reject_reason = 7;
repeated string competingTxs = 8;
uint64 block_height = 9;
string block_hash = 10;
string merkle_path = 11;
string txid = 3;
Status status = 4;
string reject_reason = 5;
repeated string competingTxs = 6;
uint64 block_height = 7;
string block_hash = 8;
string merkle_path = 9;
}

// swagger:model TransactionStatuses
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Down Migration: Remove default constraint from 'status_history' column in 'metamorph.transactions' table
-- This migration removes the default value constraint from the 'status_history' column, so new records will require an explicit value or default to NULL.

ALTER TABLE metamorph.transactions
ALTER COLUMN status_history DROP DEFAULT;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-- Up Migration: Add default constraint to 'status_history' column in 'metamorph.transactions' table
-- This migration sets the default value of the 'status_history' column to an empty JSON array ([]) for future records.

ALTER TABLE metamorph.transactions
ALTER COLUMN status_history SET DEFAULT '[]'::JSONB;
52 changes: 35 additions & 17 deletions internal/metamorph/store/postgresql/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,7 @@ func (p *PostgreSQL) Get(ctx context.Context, hash []byte) (*store.StoreData, er
data.LastSubmittedAt = lastSubmittedAt.UTC()

if lastModified.Valid {
lastModifiedUTC := lastModified.Time.UTC()
data.LastModified = &lastModifiedUTC
data.LastModified = lastModified.Time.UTC()
}

if status.Valid {
Expand Down Expand Up @@ -322,6 +321,9 @@ func (p *PostgreSQL) Set(ctx context.Context, value *store.StoreData) error {
return err
}

if value.StatusHistory == nil {
value.StatusHistory = make([]*store.StoreStatus, 0)
}
statusHistoryData, err := prepareStructForSaving(value.StatusHistory)
if err != nil {
return err
Expand All @@ -341,7 +343,7 @@ func (p *PostgreSQL) Set(ctx context.Context, value *store.StoreData) error {
p.hostname,
value.LastSubmittedAt,
statusHistoryData,
value.LastModified,
p.now(),
)
if err != nil {
return err
Expand All @@ -361,7 +363,7 @@ func (p *PostgreSQL) SetBulk(ctx context.Context, data []*store.StoreData) error
rawTxs := make([][]byte, len(data))
lockedBy := make([]string, len(data))
lastSubmittedAt := make([]time.Time, len(data))
lastModified := make([]*time.Time, len(data))

for i, txData := range data {
storedAt[i] = txData.StoredAt
hashes[i] = txData.Hash[:]
Expand All @@ -370,14 +372,16 @@ func (p *PostgreSQL) SetBulk(ctx context.Context, data []*store.StoreData) error
rawTxs[i] = txData.RawTx
lockedBy[i] = p.hostname
lastSubmittedAt[i] = txData.LastSubmittedAt
lastModified[i] = txData.LastModified

callbacksData, err := prepareStructForSaving(txData.Callbacks)
if err != nil {
return err
}
callbacks[i] = string(callbacksData)

if txData.StatusHistory == nil {
txData.StatusHistory = make([]*store.StoreStatus, 0)
}
statusHistoryData, err := prepareStructForSaving(txData.StatusHistory)
if err != nil {
return err
Expand Down Expand Up @@ -407,8 +411,8 @@ func (p *PostgreSQL) SetBulk(ctx context.Context, data []*store.StoreData) error
UNNEST($7::TEXT[]),
UNNEST($8::TIMESTAMPTZ[]),
UNNEST($9::TEXT[])::JSONB,
UNNEST($10::TIMESTAMPTZ[])
ON CONFLICT (hash) DO UPDATE SET last_submitted_at = $11
$10
ON CONFLICT (hash) DO UPDATE SET last_submitted_at = $10, callbacks=EXCLUDED.callbacks;
`

_, err := p.db.ExecContext(ctx, q,
Expand All @@ -421,7 +425,6 @@ func (p *PostgreSQL) SetBulk(ctx context.Context, data []*store.StoreData) error
pq.Array(lockedBy),
pq.Array(lastSubmittedAt),
pq.Array(statusHistory),
pq.Array(lastModified),
p.now(),
)
if err != nil {
Expand Down Expand Up @@ -580,12 +583,17 @@ func (p *PostgreSQL) UpdateStatusBulk(ctx context.Context, updates []store.Updat
UPDATE metamorph.transactions
SET
status=bulk_query.status,
reject_reason=bulk_query.reject_reason
reject_reason=bulk_query.reject_reason,
last_modified=$1,
status_history=status_history || json_build_object(
'status', metamorph.transactions.status,
'timestamp', last_modified
)::JSONB
FROM
(
SELECT *
FROM
UNNEST($1::BYTEA[], $2::INT[], $3::TEXT[])
UNNEST($2::BYTEA[], $3::INT[], $4::TEXT[])
AS t(hash, status, reject_reason)
) AS bulk_query
WHERE metamorph.transactions.hash=bulk_query.hash
Expand Down Expand Up @@ -621,7 +629,7 @@ func (p *PostgreSQL) UpdateStatusBulk(ctx context.Context, updates []store.Updat
return nil, err
}

rows, err := tx.QueryContext(ctx, qBulk, pq.Array(txHashes), pq.Array(statuses), pq.Array(rejectReasons))
rows, err := tx.QueryContext(ctx, qBulk, p.now(), pq.Array(txHashes), pq.Array(statuses), pq.Array(rejectReasons))
if err != nil {
if rollBackErr := tx.Rollback(); rollBackErr != nil {
return nil, errors.Join(err, fmt.Errorf("failed to rollback: %v", rollBackErr))
Expand Down Expand Up @@ -652,10 +660,15 @@ func (p *PostgreSQL) UpdateDoubleSpend(ctx context.Context, updates []store.Upda
SET
status=bulk_query.status,
reject_reason=bulk_query.reject_reason,
competing_txs=bulk_query.competing_txs
competing_txs=bulk_query.competing_txs,
last_modified=$1,
status_history=status_history || json_build_object(
'status', metamorph.transactions.status,
'timestamp', last_modified
)::JSONB
FROM
(
SELECT * FROM UNNEST($1::BYTEA[], $2::INT[], $3::TEXT[], $4::TEXT[])
SELECT * FROM UNNEST($2::BYTEA[], $3::INT[], $4::TEXT[], $5::TEXT[])
AS t(hash, status, reject_reason, competing_txs)
) AS bulk_query
WHERE metamorph.transactions.hash=bulk_query.hash
Expand Down Expand Up @@ -722,7 +735,7 @@ func (p *PostgreSQL) UpdateDoubleSpend(ctx context.Context, updates []store.Upda
}
}

rows, err = tx.QueryContext(ctx, qBulk, pq.Array(txHashes), pq.Array(statuses), pq.Array(rejectReasons), pq.Array(competingTxs))
rows, err = tx.QueryContext(ctx, qBulk, p.now(), pq.Array(txHashes), pq.Array(statuses), pq.Array(rejectReasons), pq.Array(competingTxs))
if err != nil {
if rollbackErr := tx.Rollback(); rollbackErr != nil {
return nil, errors.Join(err, fmt.Errorf("failed to rollback: %v", rollbackErr))
Expand Down Expand Up @@ -770,12 +783,17 @@ func (p *PostgreSQL) UpdateMined(ctx context.Context, txsBlocks []*blocktx_api.T
status=$1,
block_hash=bulk_query.block_hash,
block_height=bulk_query.block_height,
merkle_path=bulk_query.merkle_path
merkle_path=bulk_query.merkle_path,
last_modified=$2,
status_history=status_history || json_build_object(
'status', status,
'timestamp', last_modified
)::JSONB
FROM
(
SELECT *
FROM
UNNEST($2::BYTEA[], $3::BYTEA[], $4::BIGINT[], $5::TEXT[])
UNNEST($3::BYTEA[], $4::BYTEA[], $5::BIGINT[], $6::TEXT[])
AS t(hash, block_hash, block_height, merkle_path)
) AS bulk_query
WHERE
Expand Down Expand Up @@ -813,7 +831,7 @@ func (p *PostgreSQL) UpdateMined(ctx context.Context, txsBlocks []*blocktx_api.T

rejectedResponses := updateDoubleSpendRejected(ctx, rows, tx)

rows, err = tx.QueryContext(ctx, qBulkUpdate, metamorph_api.Status_MINED, pq.Array(txHashes), pq.Array(blockHashes), pq.Array(blockHeights), pq.Array(merklePaths))
rows, err = tx.QueryContext(ctx, qBulkUpdate, metamorph_api.Status_MINED, p.now(), pq.Array(txHashes), pq.Array(blockHashes), pq.Array(blockHeights), pq.Array(merklePaths))
if err != nil {
if rollBackErr := tx.Rollback(); rollBackErr != nil {
return nil, errors.Join(err, fmt.Errorf("failed to rollback: %v", rollBackErr))
Expand Down
13 changes: 10 additions & 3 deletions internal/metamorph/store/postgresql/postgres_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func getStoreDataFromRows(rows *sql.Rows) ([]*store.StoreData, error) {
}

if lastModified.Valid {
data.LastModified = &lastModified.Time
data.LastModified = lastModified.Time.UTC()
}

data.RejectReason = rejectReason.String
Expand Down Expand Up @@ -234,11 +234,18 @@ func readCallbacksFromDB(callbacks []byte) ([]store.StoreCallback, error) {
return callbacksData, nil
}

func readStatusHistoryFromDB(statusHistory []byte) ([]store.StoreStatus, error) {
var statusHistoryData []store.StoreStatus
func readStatusHistoryFromDB(statusHistory []byte) ([]*store.StoreStatus, error) {
var statusHistoryData []*store.StoreStatus
err := json.Unmarshal(statusHistory, &statusHistoryData)
if err != nil {
return nil, err
}

for _, status := range statusHistoryData {
if status != nil {
status.Timestamp = status.Timestamp.UTC()
}
}

return statusHistoryData, nil
}
Loading

0 comments on commit 21cc7a5

Please sign in to comment.