Skip to content

Commit

Permalink
ARCO-149: change callback column to allow saving multiple parameters (#…
Browse files Browse the repository at this point in the history
…539)

* ARCO-149: refactor callback column to allow saving multiple parameters

* ARCO-149: fix unkeyed fields

* ARCO-149: remove unnecessary comments

* ARCO-149: refactor migrations and postgres methods

* ARCO-149: remove goroutine from callback

* ARCO-149: change direct comparisions to sql not null

* ARCO-149: check callback url and token length instead of comparing to null

* ARCO-149: revert defer func changes
pawellewandowski98 authored Aug 9, 2024
1 parent ae67cee commit 03db169
Showing 11 changed files with 172 additions and 108 deletions.
24 changes: 15 additions & 9 deletions internal/metamorph/callbacker.go
Original file line number Diff line number Diff line change
@@ -124,9 +124,14 @@ func (p *Callbacker) SendCallback(logger *slog.Logger, tx *store.StoreData) {
status.ExtraInfo = &tx.RejectReason
}

for i := 0; i < CallbackTries; i++ {
for _, callback := range tx.Callbacks {
p.sendCallback(logger, tx, callback, status, sleepDuration)
}
}

logger.Debug("Sending callback for transaction", slog.String("hash", tx.Hash.String()), slog.String("url", tx.CallbackUrl), slog.String("token", tx.CallbackToken), slog.String("status", statusString), slog.Uint64("block height", tx.BlockHeight), slog.String("block hash", blockHash))
func (p *Callbacker) sendCallback(logger *slog.Logger, tx *store.StoreData, callback store.StoreCallback, status *Callback, sleepDuration int) {
for i := 0; i < CallbackTries; i++ {
logger.Debug("Sending callback for transaction", slog.String("hash", tx.Hash.String()), slog.String("url", callback.CallbackURL), slog.String("token", callback.CallbackToken), slog.String("status", *status.TxStatus), slog.Uint64("block height", tx.BlockHeight), slog.String("block hash", *status.BlockHash))

statusBytes, err := json.Marshal(status)
if err != nil {
@@ -135,22 +140,23 @@ func (p *Callbacker) SendCallback(logger *slog.Logger, tx *store.StoreData) {
}

var request *http.Request
request, err = http.NewRequest("POST", tx.CallbackUrl, bytes.NewBuffer(statusBytes))
request, err = http.NewRequest("POST", callback.CallbackURL, bytes.NewBuffer(statusBytes))
if err != nil {
logger.Error("Couldn't marshal status", slog.String("url", tx.CallbackUrl), slog.String("token", tx.CallbackToken), slog.String("hash", tx.Hash.String()), slog.String("err", errors.Join(err, fmt.Errorf("failed to post callback for transaction id %s", tx.Hash)).Error()))
logger.Error("Couldn't marshal status", slog.String("url", callback.CallbackURL), slog.String("token", callback.CallbackToken), slog.String("hash", tx.Hash.String()), slog.String("err", errors.Join(err, fmt.Errorf("failed to post callback for transaction id %s", tx.Hash)).Error()))
return
}
request.Header.Set("Content-Type", "application/json; charset=UTF-8")
if tx.CallbackToken != "" {
request.Header.Set("Authorization", "Bearer "+tx.CallbackToken)
if callback.CallbackToken != "" {
request.Header.Set("Authorization", "Bearer "+callback.CallbackToken)
}

var response *http.Response
response, err = p.httpClient.Do(request)
if err != nil {
logger.Debug("Couldn't send transaction callback", slog.String("url", tx.CallbackUrl), slog.String("token", tx.CallbackToken), slog.String("hash", tx.Hash.String()), slog.String("err", err.Error()))
logger.Debug("Couldn't send transaction callback", slog.String("url", callback.CallbackURL), slog.String("token", callback.CallbackToken), slog.String("hash", tx.Hash.String()), slog.String("err", err.Error()))
continue
}

defer response.Body.Close()

// if callback was sent successfully we stop here
@@ -168,7 +174,7 @@ func (p *Callbacker) SendCallback(logger *slog.Logger, tx *store.StoreData) {
return
}

logger.Debug("Callback response status code not ok", slog.String("url", tx.CallbackUrl), slog.String("token", tx.CallbackToken), slog.String("hash", tx.Hash.String()), slog.Int("status", response.StatusCode))
logger.Debug("Callback response status code not ok", slog.String("url", callback.CallbackURL), slog.String("token", callback.CallbackToken), slog.String("hash", tx.Hash.String()), slog.Int("status", response.StatusCode))

// sleep before trying again
time.Sleep(time.Duration(sleepDuration) * time.Second)
@@ -177,7 +183,7 @@ func (p *Callbacker) SendCallback(logger *slog.Logger, tx *store.StoreData) {
}

p.callbackerStats.callbackFailedCount.Inc()
logger.Warn("Couldn't send transaction callback after tries", slog.String("url", tx.CallbackUrl), slog.String("token", tx.CallbackToken), slog.String("hash", tx.Hash.String()), slog.Int("retries", CallbackTries))
logger.Warn("Couldn't send transaction callback after tries", slog.String("url", callback.CallbackURL), slog.String("token", callback.CallbackToken), slog.String("hash", tx.Hash.String()), slog.Int("retries", CallbackTries))
}

func (p *Callbacker) Shutdown(logger *slog.Logger) {
11 changes: 7 additions & 4 deletions internal/metamorph/processor.go
Original file line number Diff line number Diff line change
@@ -284,7 +284,7 @@ func (p *Processor) updateMined(txsBlocks []*blocktx_api.TransactionBlock) {
}

for _, data := range updatedData {
if data.CallbackUrl != "" {
if len(data.Callbacks) > 0 {
go p.callbackSender.SendCallback(p.logger, data)
}
}
@@ -311,11 +311,14 @@ func (p *Processor) StartProcessSubmittedTxs() {
continue
}
now := p.now()
callback := store.StoreCallback{
CallbackURL: submittedTx.GetCallbackUrl(),
CallbackToken: submittedTx.GetCallbackToken(),
}
sReq := &store.StoreData{
Hash: PtrTo(chainhash.DoubleHashH(submittedTx.GetRawTx())),
Status: metamorph_api.Status_STORED,
CallbackUrl: submittedTx.GetCallbackUrl(),
CallbackToken: submittedTx.GetCallbackToken(),
Callbacks: []store.StoreCallback{callback},
FullStatusUpdates: submittedTx.GetFullStatusUpdates(),
RawTx: submittedTx.GetRawTx(),
StoredAt: now,
@@ -429,7 +432,7 @@ func (p *Processor) statusUpdateWithCallback(statusUpdates, doubleSpendUpdates [
sendCallback = data.Status >= metamorph_api.Status_REJECTED
}

if sendCallback && data.CallbackUrl != "" {
if sendCallback && len(data.Callbacks) > 0 {
go p.callbackSender.SendCallback(p.logger, data)
}
}
15 changes: 9 additions & 6 deletions internal/metamorph/processor_test.go
Original file line number Diff line number Diff line change
@@ -363,7 +363,7 @@ func TestStartSendStatusForTransaction(t *testing.T) {
Hash: testdata.TX1Hash,
Status: metamorph_api.Status_SEEN_IN_ORPHAN_MEMPOOL,
FullStatusUpdates: true,
CallbackUrl: "http://callback.com",
Callbacks: []store.StoreCallback{{CallbackURL: "http://callback.com"}},
},
},
{
@@ -372,15 +372,15 @@ func TestStartSendStatusForTransaction(t *testing.T) {
Status: metamorph_api.Status_SEEN_ON_NETWORK,
RejectReason: "",
FullStatusUpdates: true,
CallbackUrl: "http://callback.com",
Callbacks: []store.StoreCallback{{CallbackURL: "http://callback.com"}},
},
},
{
{
Hash: testdata.TX6Hash,
Status: metamorph_api.Status_DOUBLE_SPEND_ATTEMPTED,
FullStatusUpdates: true,
CallbackUrl: "http://callback.com",
Callbacks: []store.StoreCallback{{CallbackURL: "http://callback.com"}},
CompetingTxs: []string{"1234"},
},
},
@@ -424,15 +424,15 @@ func TestStartSendStatusForTransaction(t *testing.T) {
{
{
Hash: testdata.TX1Hash,
CallbackUrl: "http://callback.com",
Callbacks: []store.StoreCallback{{CallbackURL: "http://callback.com"}},
FullStatusUpdates: true,
Status: metamorph_api.Status_SEEN_ON_NETWORK,
},
},
{
{
Hash: testdata.TX2Hash,
CallbackUrl: "http://callback.com",
Callbacks: []store.StoreCallback{{CallbackURL: "http://callback.com"}},
FullStatusUpdates: true,
Status: metamorph_api.Status_DOUBLE_SPEND_ATTEMPTED,
CompetingTxs: []string{"1234", "different_competing_tx"},
@@ -827,7 +827,10 @@ func TestStartProcessMinedCallbacks(t *testing.T) {
UpdateMinedFunc: func(ctx context.Context, txsBlocks []*blocktx_api.TransactionBlock) ([]*store.StoreData, error) {
require.Len(t, txsBlocks, tc.expectedTxsBlocks)

return []*store.StoreData{{CallbackUrl: "http://callback.com"}, {CallbackUrl: "http://callback.com"}, {}}, tc.updateMinedErr
return []*store.StoreData{
{Callbacks: []store.StoreCallback{{CallbackURL: "http://callback.com"}}},
{Callbacks: []store.StoreCallback{{CallbackURL: "http://callback.com"}}},
{}}, tc.updateMinedErr
},
SetUnlockedByNameFunc: func(ctx context.Context, lockedBy string) (int64, error) { return 0, nil },
}
20 changes: 12 additions & 8 deletions internal/metamorph/server.go
Original file line number Diff line number Diff line change
@@ -174,10 +174,12 @@ func (s *Server) PutTransaction(ctx context.Context, req *metamorph_api.Transact

// Convert gRPC req to store.StoreData struct...
sReq := &store.StoreData{
Hash: hash,
Status: statusReceived,
CallbackUrl: req.GetCallbackUrl(),
CallbackToken: req.GetCallbackToken(),
Hash: hash,
Status: statusReceived,
Callbacks: []store.StoreCallback{{
CallbackURL: req.GetCallbackUrl(),
CallbackToken: req.GetCallbackToken(),
}},
FullStatusUpdates: req.GetFullStatusUpdates(),
RawTx: req.GetRawTx(),
}
@@ -208,10 +210,12 @@ func (s *Server) PutTransactions(ctx context.Context, req *metamorph_api.Transac

// Convert gRPC req to store.StoreData struct...
sReq := &store.StoreData{
Hash: hash,
Status: statusReceived,
CallbackUrl: txReq.GetCallbackUrl(),
CallbackToken: txReq.GetCallbackToken(),
Hash: hash,
Status: statusReceived,
Callbacks: []store.StoreCallback{{
CallbackURL: txReq.GetCallbackUrl(),
CallbackToken: txReq.GetCallbackToken(),
}},
FullStatusUpdates: txReq.GetFullStatusUpdates(),
RawTx: txReq.GetRawTx(),
}
15 changes: 7 additions & 8 deletions internal/metamorph/server_test.go
Original file line number Diff line number Diff line change
@@ -227,14 +227,13 @@ func TestServer_GetTransactionStatus(t *testing.T) {
metamorphStore := &storeMocks.MetamorphStoreMock{
GetFunc: func(ctx context.Context, key []byte) (*store.StoreData, error) {
data := &store.StoreData{
StoredAt: testdata.Time,
AnnouncedAt: testdata.Time.Add(1 * time.Second),
MinedAt: testdata.Time.Add(2 * time.Second),
Hash: testdata.TX1Hash,
Status: tt.status,
CallbackUrl: "https://test.com",
CallbackToken: "token",
MerklePath: "00000",
StoredAt: testdata.Time,
AnnouncedAt: testdata.Time.Add(1 * time.Second),
MinedAt: testdata.Time.Add(2 * time.Second),
Hash: testdata.TX1Hash,
Status: tt.status,
Callbacks: []store.StoreCallback{{CallbackURL: "https://test.com", CallbackToken: "token"}},
MerklePath: "00000",
}
return data, tt.getErr
},
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
-- Step 1: Add the old 'callback_url' and 'callback_token' columns back
ALTER TABLE metamorph.transactions
ADD COLUMN callback_url TEXT,
ADD COLUMN callback_token TEXT;

-- Step 2: Populate 'callback_url' and 'callback_token' with the first object in the 'callbacks' JSON array
UPDATE metamorph.transactions
SET callback_url = (callbacks->0->>'callback_url'),
callback_token = (callbacks->0->>'callback_token')
WHERE jsonb_array_length(callbacks) > 0;

-- Step 3: Drop the new 'callbacks' column
ALTER TABLE metamorph.transactions
DROP COLUMN callbacks;
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-- Step 1: Add the new 'callback' column
ALTER TABLE metamorph.transactions
ADD COLUMN callbacks JSONB;

-- Step 2: Populate the 'callback' column with data from 'callback_url' and 'callback_token'
UPDATE metamorph.transactions
SET callbacks = json_build_array(
json_build_object(
'callback_url', callback_url,
'callback_token', callback_token
)
)WHERE LENGTH(callback_url) > 0 OR LENGTH(callback_token) > 0;

-- Step 3: Drop the old 'callback_url' and 'callback_token' columns
ALTER TABLE metamorph.transactions
DROP COLUMN callback_url,
DROP COLUMN callback_token;
Loading

0 comments on commit 03db169

Please sign in to comment.