From 03db169cf5c2b5cf395bf007200b824bb85883cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Lewandowski?= <35259896+pawellewandowski98@users.noreply.github.com> Date: Fri, 9 Aug 2024 15:53:19 +0200 Subject: [PATCH] ARCO-149: change callback column to allow saving multiple parameters (#539) * ARCO-149: refactor callback column to allow saving multiple parameters * ARCO-149: fix unkeyed fields * ARCO-149: remove unnecessary comments * ARCO-149: refactor migrations and postgres methods * ARCO-149: remove goroutine from callback * ARCO-149: change direct comparisions to sql not null * ARCO-149: check callback url and token length instead of comparing to null * ARCO-149: revert defer func changes --- internal/metamorph/callbacker.go | 24 ++++-- internal/metamorph/processor.go | 11 ++- internal/metamorph/processor_test.go | 15 ++-- internal/metamorph/server.go | 20 +++-- internal/metamorph/server_test.go | 15 ++-- .../000016_multiple_callbacks.down.sql | 14 +++ .../000016_multiple_callbacks.up.sql | 17 ++++ .../metamorph/store/postgresql/postgres.go | 85 ++++++++++--------- .../store/postgresql/postgres_helpers.go | 39 ++++++--- .../store/postgresql/postgres_test.go | 32 +++---- internal/metamorph/store/store.go | 8 +- 11 files changed, 172 insertions(+), 108 deletions(-) create mode 100644 internal/metamorph/store/postgresql/migrations/000016_multiple_callbacks.down.sql create mode 100644 internal/metamorph/store/postgresql/migrations/000016_multiple_callbacks.up.sql diff --git a/internal/metamorph/callbacker.go b/internal/metamorph/callbacker.go index aaa77d4d5..6666757a4 100644 --- a/internal/metamorph/callbacker.go +++ b/internal/metamorph/callbacker.go @@ -124,9 +124,14 @@ func (p *Callbacker) SendCallback(logger *slog.Logger, tx *store.StoreData) { status.ExtraInfo = &tx.RejectReason } - for i := 0; i < CallbackTries; i++ { + for _, callback := range tx.Callbacks { + p.sendCallback(logger, tx, callback, status, sleepDuration) + } +} - logger.Debug("Sending callback for transaction", slog.String("hash", tx.Hash.String()), slog.String("url", tx.CallbackUrl), slog.String("token", tx.CallbackToken), slog.String("status", statusString), slog.Uint64("block height", tx.BlockHeight), slog.String("block hash", blockHash)) +func (p *Callbacker) sendCallback(logger *slog.Logger, tx *store.StoreData, callback store.StoreCallback, status *Callback, sleepDuration int) { + for i := 0; i < CallbackTries; i++ { + logger.Debug("Sending callback for transaction", slog.String("hash", tx.Hash.String()), slog.String("url", callback.CallbackURL), slog.String("token", callback.CallbackToken), slog.String("status", *status.TxStatus), slog.Uint64("block height", tx.BlockHeight), slog.String("block hash", *status.BlockHash)) statusBytes, err := json.Marshal(status) if err != nil { @@ -135,22 +140,23 @@ func (p *Callbacker) SendCallback(logger *slog.Logger, tx *store.StoreData) { } var request *http.Request - request, err = http.NewRequest("POST", tx.CallbackUrl, bytes.NewBuffer(statusBytes)) + request, err = http.NewRequest("POST", callback.CallbackURL, bytes.NewBuffer(statusBytes)) if err != nil { - logger.Error("Couldn't marshal status", slog.String("url", tx.CallbackUrl), slog.String("token", tx.CallbackToken), slog.String("hash", tx.Hash.String()), slog.String("err", errors.Join(err, fmt.Errorf("failed to post callback for transaction id %s", tx.Hash)).Error())) + logger.Error("Couldn't marshal status", slog.String("url", callback.CallbackURL), slog.String("token", callback.CallbackToken), slog.String("hash", tx.Hash.String()), slog.String("err", errors.Join(err, fmt.Errorf("failed to post callback for transaction id %s", tx.Hash)).Error())) return } request.Header.Set("Content-Type", "application/json; charset=UTF-8") - if tx.CallbackToken != "" { - request.Header.Set("Authorization", "Bearer "+tx.CallbackToken) + if callback.CallbackToken != "" { + request.Header.Set("Authorization", "Bearer "+callback.CallbackToken) } var response *http.Response response, err = p.httpClient.Do(request) if err != nil { - logger.Debug("Couldn't send transaction callback", slog.String("url", tx.CallbackUrl), slog.String("token", tx.CallbackToken), slog.String("hash", tx.Hash.String()), slog.String("err", err.Error())) + logger.Debug("Couldn't send transaction callback", slog.String("url", callback.CallbackURL), slog.String("token", callback.CallbackToken), slog.String("hash", tx.Hash.String()), slog.String("err", err.Error())) continue } + defer response.Body.Close() // if callback was sent successfully we stop here @@ -168,7 +174,7 @@ func (p *Callbacker) SendCallback(logger *slog.Logger, tx *store.StoreData) { return } - logger.Debug("Callback response status code not ok", slog.String("url", tx.CallbackUrl), slog.String("token", tx.CallbackToken), slog.String("hash", tx.Hash.String()), slog.Int("status", response.StatusCode)) + logger.Debug("Callback response status code not ok", slog.String("url", callback.CallbackURL), slog.String("token", callback.CallbackToken), slog.String("hash", tx.Hash.String()), slog.Int("status", response.StatusCode)) // sleep before trying again time.Sleep(time.Duration(sleepDuration) * time.Second) @@ -177,7 +183,7 @@ func (p *Callbacker) SendCallback(logger *slog.Logger, tx *store.StoreData) { } p.callbackerStats.callbackFailedCount.Inc() - logger.Warn("Couldn't send transaction callback after tries", slog.String("url", tx.CallbackUrl), slog.String("token", tx.CallbackToken), slog.String("hash", tx.Hash.String()), slog.Int("retries", CallbackTries)) + logger.Warn("Couldn't send transaction callback after tries", slog.String("url", callback.CallbackURL), slog.String("token", callback.CallbackToken), slog.String("hash", tx.Hash.String()), slog.Int("retries", CallbackTries)) } func (p *Callbacker) Shutdown(logger *slog.Logger) { diff --git a/internal/metamorph/processor.go b/internal/metamorph/processor.go index 832f62063..34fe02bf3 100644 --- a/internal/metamorph/processor.go +++ b/internal/metamorph/processor.go @@ -284,7 +284,7 @@ func (p *Processor) updateMined(txsBlocks []*blocktx_api.TransactionBlock) { } for _, data := range updatedData { - if data.CallbackUrl != "" { + if len(data.Callbacks) > 0 { go p.callbackSender.SendCallback(p.logger, data) } } @@ -311,11 +311,14 @@ func (p *Processor) StartProcessSubmittedTxs() { continue } now := p.now() + callback := store.StoreCallback{ + CallbackURL: submittedTx.GetCallbackUrl(), + CallbackToken: submittedTx.GetCallbackToken(), + } sReq := &store.StoreData{ Hash: PtrTo(chainhash.DoubleHashH(submittedTx.GetRawTx())), Status: metamorph_api.Status_STORED, - CallbackUrl: submittedTx.GetCallbackUrl(), - CallbackToken: submittedTx.GetCallbackToken(), + Callbacks: []store.StoreCallback{callback}, FullStatusUpdates: submittedTx.GetFullStatusUpdates(), RawTx: submittedTx.GetRawTx(), StoredAt: now, @@ -429,7 +432,7 @@ func (p *Processor) statusUpdateWithCallback(statusUpdates, doubleSpendUpdates [ sendCallback = data.Status >= metamorph_api.Status_REJECTED } - if sendCallback && data.CallbackUrl != "" { + if sendCallback && len(data.Callbacks) > 0 { go p.callbackSender.SendCallback(p.logger, data) } } diff --git a/internal/metamorph/processor_test.go b/internal/metamorph/processor_test.go index 7962264e6..4cd294040 100644 --- a/internal/metamorph/processor_test.go +++ b/internal/metamorph/processor_test.go @@ -363,7 +363,7 @@ func TestStartSendStatusForTransaction(t *testing.T) { Hash: testdata.TX1Hash, Status: metamorph_api.Status_SEEN_IN_ORPHAN_MEMPOOL, FullStatusUpdates: true, - CallbackUrl: "http://callback.com", + Callbacks: []store.StoreCallback{{CallbackURL: "http://callback.com"}}, }, }, { @@ -372,7 +372,7 @@ func TestStartSendStatusForTransaction(t *testing.T) { Status: metamorph_api.Status_SEEN_ON_NETWORK, RejectReason: "", FullStatusUpdates: true, - CallbackUrl: "http://callback.com", + Callbacks: []store.StoreCallback{{CallbackURL: "http://callback.com"}}, }, }, { @@ -380,7 +380,7 @@ func TestStartSendStatusForTransaction(t *testing.T) { Hash: testdata.TX6Hash, Status: metamorph_api.Status_DOUBLE_SPEND_ATTEMPTED, FullStatusUpdates: true, - CallbackUrl: "http://callback.com", + Callbacks: []store.StoreCallback{{CallbackURL: "http://callback.com"}}, CompetingTxs: []string{"1234"}, }, }, @@ -424,7 +424,7 @@ func TestStartSendStatusForTransaction(t *testing.T) { { { Hash: testdata.TX1Hash, - CallbackUrl: "http://callback.com", + Callbacks: []store.StoreCallback{{CallbackURL: "http://callback.com"}}, FullStatusUpdates: true, Status: metamorph_api.Status_SEEN_ON_NETWORK, }, @@ -432,7 +432,7 @@ func TestStartSendStatusForTransaction(t *testing.T) { { { Hash: testdata.TX2Hash, - CallbackUrl: "http://callback.com", + Callbacks: []store.StoreCallback{{CallbackURL: "http://callback.com"}}, FullStatusUpdates: true, Status: metamorph_api.Status_DOUBLE_SPEND_ATTEMPTED, CompetingTxs: []string{"1234", "different_competing_tx"}, @@ -827,7 +827,10 @@ func TestStartProcessMinedCallbacks(t *testing.T) { UpdateMinedFunc: func(ctx context.Context, txsBlocks []*blocktx_api.TransactionBlock) ([]*store.StoreData, error) { require.Len(t, txsBlocks, tc.expectedTxsBlocks) - return []*store.StoreData{{CallbackUrl: "http://callback.com"}, {CallbackUrl: "http://callback.com"}, {}}, tc.updateMinedErr + return []*store.StoreData{ + {Callbacks: []store.StoreCallback{{CallbackURL: "http://callback.com"}}}, + {Callbacks: []store.StoreCallback{{CallbackURL: "http://callback.com"}}}, + {}}, tc.updateMinedErr }, SetUnlockedByNameFunc: func(ctx context.Context, lockedBy string) (int64, error) { return 0, nil }, } diff --git a/internal/metamorph/server.go b/internal/metamorph/server.go index e434aabaf..0d2f0e888 100644 --- a/internal/metamorph/server.go +++ b/internal/metamorph/server.go @@ -174,10 +174,12 @@ func (s *Server) PutTransaction(ctx context.Context, req *metamorph_api.Transact // Convert gRPC req to store.StoreData struct... sReq := &store.StoreData{ - Hash: hash, - Status: statusReceived, - CallbackUrl: req.GetCallbackUrl(), - CallbackToken: req.GetCallbackToken(), + Hash: hash, + Status: statusReceived, + Callbacks: []store.StoreCallback{{ + CallbackURL: req.GetCallbackUrl(), + CallbackToken: req.GetCallbackToken(), + }}, FullStatusUpdates: req.GetFullStatusUpdates(), RawTx: req.GetRawTx(), } @@ -208,10 +210,12 @@ func (s *Server) PutTransactions(ctx context.Context, req *metamorph_api.Transac // Convert gRPC req to store.StoreData struct... sReq := &store.StoreData{ - Hash: hash, - Status: statusReceived, - CallbackUrl: txReq.GetCallbackUrl(), - CallbackToken: txReq.GetCallbackToken(), + Hash: hash, + Status: statusReceived, + Callbacks: []store.StoreCallback{{ + CallbackURL: txReq.GetCallbackUrl(), + CallbackToken: txReq.GetCallbackToken(), + }}, FullStatusUpdates: txReq.GetFullStatusUpdates(), RawTx: txReq.GetRawTx(), } diff --git a/internal/metamorph/server_test.go b/internal/metamorph/server_test.go index 1b7a24dc1..ca7f99fbe 100644 --- a/internal/metamorph/server_test.go +++ b/internal/metamorph/server_test.go @@ -227,14 +227,13 @@ func TestServer_GetTransactionStatus(t *testing.T) { metamorphStore := &storeMocks.MetamorphStoreMock{ GetFunc: func(ctx context.Context, key []byte) (*store.StoreData, error) { data := &store.StoreData{ - StoredAt: testdata.Time, - AnnouncedAt: testdata.Time.Add(1 * time.Second), - MinedAt: testdata.Time.Add(2 * time.Second), - Hash: testdata.TX1Hash, - Status: tt.status, - CallbackUrl: "https://test.com", - CallbackToken: "token", - MerklePath: "00000", + StoredAt: testdata.Time, + AnnouncedAt: testdata.Time.Add(1 * time.Second), + MinedAt: testdata.Time.Add(2 * time.Second), + Hash: testdata.TX1Hash, + Status: tt.status, + Callbacks: []store.StoreCallback{{CallbackURL: "https://test.com", CallbackToken: "token"}}, + MerklePath: "00000", } return data, tt.getErr }, diff --git a/internal/metamorph/store/postgresql/migrations/000016_multiple_callbacks.down.sql b/internal/metamorph/store/postgresql/migrations/000016_multiple_callbacks.down.sql new file mode 100644 index 000000000..0bfe4fbdf --- /dev/null +++ b/internal/metamorph/store/postgresql/migrations/000016_multiple_callbacks.down.sql @@ -0,0 +1,14 @@ +-- Step 1: Add the old 'callback_url' and 'callback_token' columns back +ALTER TABLE metamorph.transactions + ADD COLUMN callback_url TEXT, + ADD COLUMN callback_token TEXT; + +-- Step 2: Populate 'callback_url' and 'callback_token' with the first object in the 'callbacks' JSON array +UPDATE metamorph.transactions +SET callback_url = (callbacks->0->>'callback_url'), + callback_token = (callbacks->0->>'callback_token') +WHERE jsonb_array_length(callbacks) > 0; + +-- Step 3: Drop the new 'callbacks' column +ALTER TABLE metamorph.transactions + DROP COLUMN callbacks; diff --git a/internal/metamorph/store/postgresql/migrations/000016_multiple_callbacks.up.sql b/internal/metamorph/store/postgresql/migrations/000016_multiple_callbacks.up.sql new file mode 100644 index 000000000..2dedbdcb9 --- /dev/null +++ b/internal/metamorph/store/postgresql/migrations/000016_multiple_callbacks.up.sql @@ -0,0 +1,17 @@ +-- Step 1: Add the new 'callback' column +ALTER TABLE metamorph.transactions + ADD COLUMN callbacks JSONB; + +-- Step 2: Populate the 'callback' column with data from 'callback_url' and 'callback_token' +UPDATE metamorph.transactions +SET callbacks = json_build_array( + json_build_object( + 'callback_url', callback_url, + 'callback_token', callback_token + ) +)WHERE LENGTH(callback_url) > 0 OR LENGTH(callback_token) > 0; + +-- Step 3: Drop the old 'callback_url' and 'callback_token' columns +ALTER TABLE metamorph.transactions +DROP COLUMN callback_url, +DROP COLUMN callback_token; \ No newline at end of file diff --git a/internal/metamorph/store/postgresql/postgres.go b/internal/metamorph/store/postgresql/postgres.go index 6529fa405..9c94df7df 100644 --- a/internal/metamorph/store/postgresql/postgres.go +++ b/internal/metamorph/store/postgresql/postgres.go @@ -80,8 +80,7 @@ func (p *PostgreSQL) Get(ctx context.Context, hash []byte) (*store.StoreData, er ,status ,block_height ,block_hash - ,callback_url - ,callback_token + ,callbacks ,full_status_updates ,reject_reason ,competing_txs @@ -98,8 +97,7 @@ func (p *PostgreSQL) Get(ctx context.Context, hash []byte) (*store.StoreData, er var status sql.NullInt32 var blockHeight sql.NullInt64 var blockHash []byte - var callbackUrl sql.NullString - var callbackToken sql.NullString + var callbacksData []byte var fullStatusUpdates bool var rejectReason sql.NullString var competingTxs string @@ -116,8 +114,7 @@ func (p *PostgreSQL) Get(ctx context.Context, hash []byte) (*store.StoreData, er &status, &blockHeight, &blockHash, - &callbackUrl, - &callbackToken, + &callbacksData, &fullStatusUpdates, &rejectReason, &competingTxs, @@ -168,12 +165,12 @@ func (p *PostgreSQL) Get(ctx context.Context, hash []byte) (*store.StoreData, er data.BlockHeight = uint64(blockHeight.Int64) } - if callbackUrl.Valid { - data.CallbackUrl = callbackUrl.String - } - - if callbackToken.Valid { - data.CallbackToken = callbackToken.String + if len(callbacksData) > 0 { + callbacks, err := readCallbacksFromDB(callbacksData) + if err != nil { + return nil, err + } + data.Callbacks = callbacks } data.FullStatusUpdates = fullStatusUpdates @@ -242,8 +239,7 @@ func (p *PostgreSQL) GetMany(ctx context.Context, keys [][]byte) ([]*store.Store ,status ,block_height ,block_hash - ,callback_url - ,callback_token + ,callbacks ,full_status_updates ,reject_reason ,competing_txs @@ -283,8 +279,7 @@ func (p *PostgreSQL) Set(ctx context.Context, value *store.StoreData) error { ,status ,block_height ,block_hash - ,callback_url - ,callback_token + ,callbacks ,full_status_updates ,reject_reason ,competing_txs @@ -306,8 +301,7 @@ func (p *PostgreSQL) Set(ctx context.Context, value *store.StoreData) error { ,$12 ,$13 ,$14 - ,$15 - ) ON CONFLICT (hash) DO UPDATE SET last_submitted_at=$15` + ) ON CONFLICT (hash) DO UPDATE SET last_submitted_at=$14` var txHash []byte var blockHash []byte @@ -325,7 +319,12 @@ func (p *PostgreSQL) Set(ctx context.Context, value *store.StoreData) error { value.StoredAt = p.now() } - _, err := p.db.ExecContext(ctx, q, + callbacksData, err := prepareCallbacksForSaving(value.Callbacks) + if err != nil { + return err + } + + _, err = p.db.ExecContext(ctx, q, value.StoredAt, value.AnnouncedAt, value.MinedAt, @@ -333,8 +332,7 @@ func (p *PostgreSQL) Set(ctx context.Context, value *store.StoreData) error { value.Status, value.BlockHeight, blockHash, - value.CallbackUrl, - value.CallbackToken, + callbacksData, value.FullStatusUpdates, value.RejectReason, strings.Join(value.CompetingTxs, ","), @@ -354,8 +352,7 @@ func (p *PostgreSQL) SetBulk(ctx context.Context, data []*store.StoreData) error storedAt := make([]time.Time, len(data)) hashes := make([][]byte, len(data)) statuses := make([]int, len(data)) - callbackURL := make([]string, len(data)) - callbackToken := make([]string, len(data)) + callbacks := make([]string, len(data)) fullStatusUpdate := make([]bool, len(data)) rawTxs := make([][]byte, len(data)) lockedBy := make([]string, len(data)) @@ -364,34 +361,45 @@ func (p *PostgreSQL) SetBulk(ctx context.Context, data []*store.StoreData) error storedAt[i] = txData.StoredAt hashes[i] = txData.Hash[:] statuses[i] = int(txData.Status) - callbackURL[i] = txData.CallbackUrl - callbackToken[i] = txData.CallbackToken fullStatusUpdate[i] = txData.FullStatusUpdates rawTxs[i] = txData.RawTx lockedBy[i] = p.hostname lastSubmittedAt[i] = txData.LastSubmittedAt + + callbacksData, err := prepareCallbacksForSaving(txData.Callbacks) + if err != nil { + return err + } + callbacks[i] = string(callbacksData) } q := `INSERT INTO metamorph.transactions ( stored_at ,hash ,status - ,callback_url - ,callback_token + ,callbacks ,full_status_updates ,raw_tx ,locked_by ,last_submitted_at ) - SELECT * FROM UNNEST($1::TIMESTAMPTZ[], $2::BYTEA[], $3::INT[], $4::TEXT[], $5::TEXT[], $6::BOOL[], $7::BYTEA[], $8::TEXT[], $9::TIMESTAMPTZ[]) - ON CONFLICT (hash) DO UPDATE SET last_submitted_at = $10 + SELECT + UNNEST($1::TIMESTAMPTZ[]), + UNNEST($2::BYTEA[]), + UNNEST($3::INT[]), + UNNEST($4::TEXT[])::JSONB, + UNNEST($5::BOOL[]), + UNNEST($6::BYTEA[]), + UNNEST($7::TEXT[]), + UNNEST($8::TIMESTAMPTZ[]) + ON CONFLICT (hash) DO UPDATE SET last_submitted_at = $9 ` + _, err := p.db.ExecContext(ctx, q, pq.Array(storedAt), pq.Array(hashes), pq.Array(statuses), - pq.Array(callbackURL), - pq.Array(callbackToken), + pq.Array(callbacks), pq.Array(fullStatusUpdate), pq.Array(rawTxs), pq.Array(lockedBy), @@ -435,8 +443,7 @@ func (p *PostgreSQL) GetUnmined(ctx context.Context, since time.Time, limit int6 ,status ,block_height ,block_hash - ,callback_url - ,callback_token + ,callbacks ,full_status_updates ,reject_reason ,competing_txs @@ -503,8 +510,7 @@ func (p *PostgreSQL) GetSeenOnNetwork(ctx context.Context, since time.Time, unti ,status ,block_height ,block_hash - ,callback_url - ,callback_token + ,callbacks ,full_status_updates ,reject_reason ,competing_txs @@ -573,8 +579,7 @@ func (p *PostgreSQL) UpdateStatusBulk(ctx context.Context, updates []store.Updat ,metamorph.transactions.status ,metamorph.transactions.block_height ,metamorph.transactions.block_hash - ,metamorph.transactions.callback_url - ,metamorph.transactions.callback_token + ,metamorph.transactions.callbacks ,metamorph.transactions.full_status_updates ,metamorph.transactions.reject_reason ,metamorph.transactions.competing_txs @@ -645,8 +650,7 @@ func (p *PostgreSQL) UpdateDoubleSpend(ctx context.Context, updates []store.Upda ,metamorph.transactions.status ,metamorph.transactions.block_height ,metamorph.transactions.block_hash - ,metamorph.transactions.callback_url - ,metamorph.transactions.callback_token + ,metamorph.transactions.callbacks ,metamorph.transactions.full_status_updates ,metamorph.transactions.reject_reason ,metamorph.transactions.competing_txs @@ -765,8 +769,7 @@ func (p *PostgreSQL) UpdateMined(ctx context.Context, txsBlocks []*blocktx_api.T ,t.status ,t.block_height ,t.block_hash - ,t.callback_url - ,t.callback_token + ,t.callbacks ,t.full_status_updates ,t.reject_reason ,t.competing_txs diff --git a/internal/metamorph/store/postgresql/postgres_helpers.go b/internal/metamorph/store/postgresql/postgres_helpers.go index 5ec91cc27..7c52080df 100644 --- a/internal/metamorph/store/postgresql/postgres_helpers.go +++ b/internal/metamorph/store/postgresql/postgres_helpers.go @@ -3,6 +3,7 @@ package postgresql import ( "context" "database/sql" + "encoding/json" "strings" "github.com/bitcoin-sv/arc/internal/metamorph/metamorph_api" @@ -50,8 +51,7 @@ func getStoreDataFromRows(rows *sql.Rows) ([]*store.StoreData, error) { var blockHeight sql.NullInt64 var blockHash []byte - var callbackUrl sql.NullString - var callbackToken sql.NullString + var callbacksData []byte var rejectReason sql.NullString var competingTxs string var merklePath sql.NullString @@ -65,8 +65,7 @@ func getStoreDataFromRows(rows *sql.Rows) ([]*store.StoreData, error) { &status, &blockHeight, &blockHash, - &callbackUrl, - &callbackToken, + &callbacksData, &data.FullStatusUpdates, &rejectReason, &competingTxs, @@ -109,12 +108,12 @@ func getStoreDataFromRows(rows *sql.Rows) ([]*store.StoreData, error) { data.BlockHeight = uint64(blockHeight.Int64) } - if callbackUrl.Valid { - data.CallbackUrl = callbackUrl.String - } - - if callbackToken.Valid { - data.CallbackToken = callbackToken.String + if len(callbacksData) > 0 { + callbacks, err := readCallbacksFromDB(callbacksData) + if err != nil { + return nil, err + } + data.Callbacks = callbacks } if rejectReason.Valid { @@ -183,8 +182,7 @@ func updateDoubleSpendRejected(ctx context.Context, rows *sql.Rows, tx *sql.Tx) ,t.status ,t.block_height ,t.block_hash - ,t.callback_url - ,t.callback_token + ,t.callbacks ,t.full_status_updates ,t.reject_reason ,t.competing_txs @@ -226,3 +224,20 @@ func updateDoubleSpendRejected(ctx context.Context, rows *sql.Rows, tx *sql.Tx) return res } + +func prepareCallbacksForSaving(callbacks []store.StoreCallback) ([]byte, error) { + callbacksBytes, err := json.Marshal(callbacks) + if err != nil { + return nil, err + } + return callbacksBytes, nil +} + +func readCallbacksFromDB(callbacks []byte) ([]store.StoreCallback, error) { + var callbacksData []store.StoreCallback + err := json.Unmarshal(callbacks, &callbacksData) + if err != nil { + return nil, err + } + return callbacksData, nil +} diff --git a/internal/metamorph/store/postgresql/postgres_test.go b/internal/metamorph/store/postgresql/postgres_test.go index fb968ec2e..32e0e9bb5 100644 --- a/internal/metamorph/store/postgresql/postgres_test.go +++ b/internal/metamorph/store/postgresql/postgres_test.go @@ -175,18 +175,17 @@ func TestPostgresDB(t *testing.T) { now := time.Date(2023, 10, 1, 14, 25, 0, 0, time.UTC) minedHash := testdata.TX1Hash minedData := &store.StoreData{ - RawTx: make([]byte, 0), - StoredAt: now, - AnnouncedAt: time.Date(2023, 10, 1, 12, 5, 0, 0, time.UTC), - MinedAt: time.Date(2023, 10, 1, 12, 10, 0, 0, time.UTC), - Hash: minedHash, - Status: metamorph_api.Status_MINED, - BlockHeight: 100, - BlockHash: testdata.Block1Hash, - CallbackUrl: "http://callback.example.com", - CallbackToken: "12345", - RejectReason: "not rejected", - LockedBy: "metamorph-1", + RawTx: make([]byte, 0), + StoredAt: now, + AnnouncedAt: time.Date(2023, 10, 1, 12, 5, 0, 0, time.UTC), + MinedAt: time.Date(2023, 10, 1, 12, 10, 0, 0, time.UTC), + Hash: minedHash, + Status: metamorph_api.Status_MINED, + BlockHeight: 100, + BlockHash: testdata.Block1Hash, + Callbacks: []store.StoreCallback{{CallbackURL: "http://callback.example.com", CallbackToken: "12345"}}, + RejectReason: "not rejected", + LockedBy: "metamorph-1", } unminedHash := testdata.TX1Hash @@ -303,9 +302,8 @@ func TestPostgresDB(t *testing.T) { StoredAt: now, Hash: testdata.TX1Hash, Status: metamorph_api.Status_STORED, - CallbackUrl: "callback.example.com", + Callbacks: []store.StoreCallback{{CallbackURL: "http://callback.example.com", CallbackToken: "1234"}}, FullStatusUpdates: false, - CallbackToken: "1234", LastSubmittedAt: now, LockedBy: "metamorph-1", }, @@ -314,9 +312,8 @@ func TestPostgresDB(t *testing.T) { StoredAt: now, Hash: testdata.TX6Hash, Status: metamorph_api.Status_STORED, - CallbackUrl: "callback-2.example.com", + Callbacks: []store.StoreCallback{{CallbackURL: "http://callback.example2.com", CallbackToken: "5678"}}, FullStatusUpdates: true, - CallbackToken: "5678", LastSubmittedAt: now, LockedBy: "metamorph-1", }, @@ -325,9 +322,8 @@ func TestPostgresDB(t *testing.T) { StoredAt: now, Hash: hash2, Status: metamorph_api.Status_STORED, - CallbackUrl: "callback-3.example.com", + Callbacks: []store.StoreCallback{{CallbackURL: "http://callback.example3.com", CallbackToken: "5678"}}, FullStatusUpdates: true, - CallbackToken: "5678", LastSubmittedAt: now, LockedBy: "metamorph-1", }, diff --git a/internal/metamorph/store/store.go b/internal/metamorph/store/store.go index bcbaa7cff..6060b1a35 100644 --- a/internal/metamorph/store/store.go +++ b/internal/metamorph/store/store.go @@ -21,9 +21,8 @@ type StoreData struct { Status metamorph_api.Status BlockHeight uint64 BlockHash *chainhash.Hash - CallbackUrl string + Callbacks []StoreCallback FullStatusUpdates bool - CallbackToken string RejectReason string CompetingTxs []string LockedBy string @@ -33,6 +32,11 @@ type StoreData struct { Retries int } +type StoreCallback struct { + CallbackURL string `json:"callback_url"` + CallbackToken string `json:"callback_token"` +} + type Stats struct { StatusStored int64 StatusAnnouncedToNetwork int64