From 9acb62b8bcd29a5874545509dd94bdc945cacd2c Mon Sep 17 00:00:00 2001 From: ssilagadze Date: Thu, 7 Dec 2023 08:50:25 +0100 Subject: [PATCH 01/22] implement callbacker --- metamorph/callbacker.go | 94 ++++++++++++++++++++++++++++ metamorph/processor.go | 31 ++++----- metamorph/store/Interface.go | 1 + metamorph/store/badger/badger.go | 30 +++++++++ metamorph/store/dynamodb/dynamodb.go | 29 +++++++++ metamorph/store/sql/sql.go | 31 +++++++++ 6 files changed, 201 insertions(+), 15 deletions(-) create mode 100644 metamorph/callbacker.go diff --git a/metamorph/callbacker.go b/metamorph/callbacker.go new file mode 100644 index 000000000..243ecbf35 --- /dev/null +++ b/metamorph/callbacker.go @@ -0,0 +1,94 @@ +package metamorph + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "log/slog" + "net/http" + "time" + + "github.com/bitcoin-sv/arc/api" + "github.com/bitcoin-sv/arc/metamorph/metamorph_api" + "github.com/bitcoin-sv/arc/metamorph/store" + "github.com/ordishs/go-utils" +) + +const ( + CallbackTries = 5 + CallbackInterval = 30 +) + +func SendCallback(logger *slog.Logger, s store.MetamorphStore, tx *store.StoreData) { + sleepDuration := CallbackInterval + for i := 1; i < CallbackTries; i++ { + statusString := metamorph_api.Status(tx.Status).String() + blockHash := "" + if tx.BlockHash != nil { + blockHash = utils.ReverseAndHexEncodeSlice(tx.BlockHash.CloneBytes()) + } + + logger.Info("sending callback for transaction", slog.String("token", tx.CallbackToken), slog.String("hash", tx.Hash.String()), slog.String("url", tx.CallbackUrl), slog.Uint64("block height", tx.BlockHeight), slog.String("block hash", blockHash)) + + status := &api.TransactionStatus{ + BlockHash: &blockHash, + BlockHeight: &tx.BlockHeight, + TxStatus: &statusString, + Txid: tx.Hash.String(), + Timestamp: time.Now(), + } + statusBytes, err := json.Marshal(status) + if err != nil { + logger.Error("Couldn't marshal status - ", err) + return + } + statusBuffer := bytes.NewBuffer(statusBytes) + + var request *http.Request + request, err = http.NewRequest("POST", tx.CallbackUrl, statusBuffer) + if err != nil { + logger.Error("Couldn't marshal status - ", errors.Join(err, fmt.Errorf("failed to post callback for transaction id %s", tx.Hash))) + return + } + request.Header.Set("Content-Type", "application/json; charset=UTF-8") + if tx.CallbackToken != "" { + request.Header.Set("Authorization", "Bearer "+tx.CallbackToken) + } + + // default http client + httpClient := http.Client{} + httpClient.Timeout = 5 * time.Second + + var response *http.Response + response, err = httpClient.Do(request) + if err != nil { + logger.Error("Couldn't send transaction info through callback url - ", err) + return + } + defer response.Body.Close() + + // if callback was sent successfully we stop here + if response.StatusCode == http.StatusOK { + err = s.RemoveCallbacker(context.Background(), tx.Hash) + if err != nil { + logger.Info("Couldn't update/remove callback url - ", err) + continue + } + return + } + + // sleep before trying again + time.Sleep(time.Duration(sleepDuration) * time.Second) + // increase intervals on each failure + sleepDuration *= sleepDuration + } + + err := s.RemoveCallbacker(context.Background(), tx.Hash) + if err != nil { + logger.Error("Couldn't update/remove callback url - ", err) + return + } + logger.Error("Couldn't send transaction info through callback url after ", CallbackTries, " tries") +} diff --git a/metamorph/processor.go b/metamorph/processor.go index 1b42339e9..365b9bfef 100644 --- a/metamorph/processor.go +++ b/metamorph/processor.go @@ -387,21 +387,22 @@ func (p *Processor) SendStatusMinedForTransaction(hash *chainhash.Hash, blockHas resp.Close() p.ProcessorResponseMap.Delete(hash) - if p.cbChannel != nil { - data, _ := p.store.Get(spanCtx, hash[:]) - - if data != nil && data.CallbackUrl != "" { - p.cbChannel <- &callbacker_api.Callback{ - Hash: data.Hash[:], - Url: data.CallbackUrl, - Token: data.CallbackToken, - Status: int32(data.Status), - BlockHash: data.BlockHash[:], - BlockHeight: data.BlockHeight, - } - } - } - + // if p.cbChannel != nil { + // data, _ := p.store.Get(spanCtx, hash[:]) + + // if data != nil && data.CallbackUrl != "" { + // p.cbChannel <- &callbacker_api.Callback{ + // Hash: data.Hash[:], + // Url: data.CallbackUrl, + // Token: data.CallbackToken, + // Status: int32(data.Status), + // BlockHash: data.BlockHash[:], + // BlockHeight: data.BlockHeight, + // } + // } + // } + data, _ := p.store.Get(spanCtx, hash[:]) + SendCallback(p.logger, p.store, data) }, }) diff --git a/metamorph/store/Interface.go b/metamorph/store/Interface.go index cce2056e4..7759699e0 100644 --- a/metamorph/store/Interface.go +++ b/metamorph/store/Interface.go @@ -213,6 +213,7 @@ type MetamorphStore interface { SetUnlockedByName(ctx context.Context, lockedBy string) (int, error) GetUnmined(_ context.Context, callback func(s *StoreData)) error UpdateStatus(ctx context.Context, hash *chainhash.Hash, status metamorph_api.Status, rejectReason string) error + RemoveCallbacker(ctx context.Context, hash *chainhash.Hash) error UpdateMined(ctx context.Context, hash *chainhash.Hash, blockHash *chainhash.Hash, blockHeight uint64) error Close(ctx context.Context) error GetBlockProcessed(ctx context.Context, blockHash *chainhash.Hash) (*time.Time, error) diff --git a/metamorph/store/badger/badger.go b/metamorph/store/badger/badger.go index 8592e3154..a06299823 100644 --- a/metamorph/store/badger/badger.go +++ b/metamorph/store/badger/badger.go @@ -213,6 +213,36 @@ func (s *Badger) UpdateStatus(ctx context.Context, hash *chainhash.Hash, status return nil } +func (s *Badger) RemoveCallbacker(ctx context.Context, hash *chainhash.Hash) error { + start := gocore.CurrentNanos() + defer func() { + gocore.NewStat("mtm_store_badger").NewStat("RemoveCallbacker").AddTime(start) + }() + span, _ := opentracing.StartSpanFromContext(ctx, "badger:RemoveCallbacker") + defer span.Finish() + + // we need a lock here since we are doing 2 operations that need to be atomic + s.mu.Lock() + defer s.mu.Unlock() + + tx, err := s.Get(ctx, hash[:]) + if err != nil { + span.SetTag(string(ext.Error), true) + span.LogFields(log.Error(err)) + return err + } + + tx.CallbackUrl = "" + + if err = s.Set(ctx, hash[:], tx); err != nil { + span.SetTag(string(ext.Error), true) + span.LogFields(log.Error(err)) + return fmt.Errorf("failed to update data: %w", err) + } + + return nil +} + // UpdateMined updates the transaction to mined func (s *Badger) UpdateMined(ctx context.Context, hash *chainhash.Hash, blockHash *chainhash.Hash, blockHeight uint64) error { start := gocore.CurrentNanos() diff --git a/metamorph/store/dynamodb/dynamodb.go b/metamorph/store/dynamodb/dynamodb.go index e76601ff5..a22c3c0c5 100644 --- a/metamorph/store/dynamodb/dynamodb.go +++ b/metamorph/store/dynamodb/dynamodb.go @@ -511,6 +511,35 @@ func (ddb *DynamoDB) UpdateStatus(ctx context.Context, hash *chainhash.Hash, sta return nil } +func (ddb *DynamoDB) RemoveCallbacker(ctx context.Context, hash *chainhash.Hash) error { + // setup log and tracing + startNanos := ddb.now().UnixNano() + defer func() { + gocore.NewStat("mtm_store_sql").NewStat("RemoveCallbacker").AddTime(startNanos) + }() + span, _ := opentracing.StartSpanFromContext(ctx, "sql:RemoveCallbacker") + defer span.Finish() + + updateExpression := "SET callback_url = ''" + + // update tx + _, err := ddb.client.UpdateItem(ctx, &dynamodb.UpdateItemInput{ + TableName: aws.String(ddb.transactionsTableName), + Key: map[string]types.AttributeValue{ + "tx_hash": &types.AttributeValueMemberB{Value: hash.CloneBytes()}, + }, + UpdateExpression: aws.String(updateExpression), + }) + + if err != nil { + span.SetTag(string(ext.Error), true) + span.LogFields(log.Error(err)) + return err + } + + return nil +} + func (ddb *DynamoDB) UpdateMined(ctx context.Context, hash *chainhash.Hash, blockHash *chainhash.Hash, blockHeight uint64) error { // setup log and tracing startNanos := ddb.now().UnixNano() diff --git a/metamorph/store/sql/sql.go b/metamorph/store/sql/sql.go index 37927656b..7c99b904e 100644 --- a/metamorph/store/sql/sql.go +++ b/metamorph/store/sql/sql.go @@ -499,6 +499,37 @@ func (s *SQL) GetUnmined(ctx context.Context, callback func(s *store.StoreData)) return nil } +func (s *SQL) RemoveCallbacker(ctx context.Context, hash *chainhash.Hash) error { + startNanos := time.Now().UnixNano() + defer func() { + gocore.NewStat("mtm_store_sql").NewStat("RemoveCallbacker").AddTime(startNanos) + }() + span, _ := opentracing.StartSpanFromContext(ctx, "sql:RemoveCallbacker") + defer span.Finish() + + q := `UPDATE transactions SET callback_url = '' WHERE hash = $3;` + + result, err := s.db.ExecContext(ctx, q, hash[:]) + if err != nil { + span.SetTag(string(ext.Error), true) + span.LogFields(log.Error(err)) + return err + } + + var n int64 + n, err = result.RowsAffected() + if err != nil { + span.SetTag(string(ext.Error), true) + span.LogFields(log.Error(err)) + return err + } + if n == 0 { + return store.ErrNotFound + } + + return nil +} + func (s *SQL) UpdateStatus(ctx context.Context, hash *chainhash.Hash, status metamorph_api.Status, rejectReason string) error { startNanos := time.Now().UnixNano() defer func() { From 2f5f7f35d38e49a120eaa7477f9560319280543b Mon Sep 17 00:00:00 2001 From: ssilagadze Date: Thu, 7 Dec 2023 11:50:44 +0100 Subject: [PATCH 02/22] increase callback interval --- metamorph/callbacker.go | 8 +++++--- metamorph/processor.go | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/metamorph/callbacker.go b/metamorph/callbacker.go index 243ecbf35..874621fcf 100644 --- a/metamorph/callbacker.go +++ b/metamorph/callbacker.go @@ -65,7 +65,7 @@ func SendCallback(logger *slog.Logger, s store.MetamorphStore, tx *store.StoreDa response, err = httpClient.Do(request) if err != nil { logger.Error("Couldn't send transaction info through callback url - ", err) - return + continue } defer response.Body.Close() @@ -73,16 +73,18 @@ func SendCallback(logger *slog.Logger, s store.MetamorphStore, tx *store.StoreDa if response.StatusCode == http.StatusOK { err = s.RemoveCallbacker(context.Background(), tx.Hash) if err != nil { - logger.Info("Couldn't update/remove callback url - ", err) + logger.Error("Couldn't update/remove callback url - ", err) continue } return + } else { + logger.Error("callback response status code not ok - ", response.StatusCode) } // sleep before trying again time.Sleep(time.Duration(sleepDuration) * time.Second) // increase intervals on each failure - sleepDuration *= sleepDuration + sleepDuration *= 2 } err := s.RemoveCallbacker(context.Background(), tx.Hash) diff --git a/metamorph/processor.go b/metamorph/processor.go index 365b9bfef..0e308d5f8 100644 --- a/metamorph/processor.go +++ b/metamorph/processor.go @@ -402,7 +402,7 @@ func (p *Processor) SendStatusMinedForTransaction(hash *chainhash.Hash, blockHas // } // } data, _ := p.store.Get(spanCtx, hash[:]) - SendCallback(p.logger, p.store, data) + go SendCallback(p.logger, p.store, data) }, }) From b18c77ac80189d5b191fb7c1a27a9e7d2b03224c Mon Sep 17 00:00:00 2001 From: ssilagadze Date: Thu, 7 Dec 2023 12:10:57 +0100 Subject: [PATCH 03/22] regenerate mock --- metamorph/mocks/store_mock.go | 50 +++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/metamorph/mocks/store_mock.go b/metamorph/mocks/store_mock.go index 0b3e00f53..64b8bbd0b 100644 --- a/metamorph/mocks/store_mock.go +++ b/metamorph/mocks/store_mock.go @@ -40,6 +40,9 @@ var _ store.MetamorphStore = &MetamorphStoreMock{} // IsCentralisedFunc: func() bool { // panic("mock out the IsCentralised method") // }, +// RemoveCallbackerFunc: func(ctx context.Context, hash *chainhash.Hash) error { +// panic("mock out the RemoveCallbacker method") +// }, // SetFunc: func(ctx context.Context, key []byte, value *store.StoreData) error { // panic("mock out the Set method") // }, @@ -83,6 +86,9 @@ type MetamorphStoreMock struct { // IsCentralisedFunc mocks the IsCentralised method. IsCentralisedFunc func() bool + // RemoveCallbackerFunc mocks the RemoveCallbacker method. + RemoveCallbackerFunc func(ctx context.Context, hash *chainhash.Hash) error + // SetFunc mocks the Set method. SetFunc func(ctx context.Context, key []byte, value *store.StoreData) error @@ -139,6 +145,13 @@ type MetamorphStoreMock struct { // IsCentralised holds details about calls to the IsCentralised method. IsCentralised []struct { } + // RemoveCallbacker holds details about calls to the RemoveCallbacker method. + RemoveCallbacker []struct { + // Ctx is the ctx argument value. + Ctx context.Context + // Hash is the hash argument value. + Hash *chainhash.Hash + } // Set holds details about calls to the Set method. Set []struct { // Ctx is the ctx argument value. @@ -198,6 +211,7 @@ type MetamorphStoreMock struct { lockGetBlockProcessed sync.RWMutex lockGetUnmined sync.RWMutex lockIsCentralised sync.RWMutex + lockRemoveCallbacker sync.RWMutex lockSet sync.RWMutex lockSetBlockProcessed sync.RWMutex lockSetUnlocked sync.RWMutex @@ -409,6 +423,42 @@ func (mock *MetamorphStoreMock) IsCentralisedCalls() []struct { return calls } +// RemoveCallbacker calls RemoveCallbackerFunc. +func (mock *MetamorphStoreMock) RemoveCallbacker(ctx context.Context, hash *chainhash.Hash) error { + if mock.RemoveCallbackerFunc == nil { + panic("MetamorphStoreMock.RemoveCallbackerFunc: method is nil but MetamorphStore.RemoveCallbacker was just called") + } + callInfo := struct { + Ctx context.Context + Hash *chainhash.Hash + }{ + Ctx: ctx, + Hash: hash, + } + mock.lockRemoveCallbacker.Lock() + mock.calls.RemoveCallbacker = append(mock.calls.RemoveCallbacker, callInfo) + mock.lockRemoveCallbacker.Unlock() + return mock.RemoveCallbackerFunc(ctx, hash) +} + +// RemoveCallbackerCalls gets all the calls that were made to RemoveCallbacker. +// Check the length with: +// +// len(mockedMetamorphStore.RemoveCallbackerCalls()) +func (mock *MetamorphStoreMock) RemoveCallbackerCalls() []struct { + Ctx context.Context + Hash *chainhash.Hash +} { + var calls []struct { + Ctx context.Context + Hash *chainhash.Hash + } + mock.lockRemoveCallbacker.RLock() + calls = mock.calls.RemoveCallbacker + mock.lockRemoveCallbacker.RUnlock() + return calls +} + // Set calls SetFunc. func (mock *MetamorphStoreMock) Set(ctx context.Context, key []byte, value *store.StoreData) error { if mock.SetFunc == nil { From a2eb9729199d75569972a97a73286bcb64a6f387 Mon Sep 17 00:00:00 2001 From: ssilagadze Date: Thu, 7 Dec 2023 12:13:59 +0100 Subject: [PATCH 04/22] fix style --- metamorph/callbacker.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/metamorph/callbacker.go b/metamorph/callbacker.go index 874621fcf..8a72bc5df 100644 --- a/metamorph/callbacker.go +++ b/metamorph/callbacker.go @@ -44,10 +44,9 @@ func SendCallback(logger *slog.Logger, s store.MetamorphStore, tx *store.StoreDa logger.Error("Couldn't marshal status - ", err) return } - statusBuffer := bytes.NewBuffer(statusBytes) var request *http.Request - request, err = http.NewRequest("POST", tx.CallbackUrl, statusBuffer) + request, err = http.NewRequest("POST", tx.CallbackUrl, bytes.NewBuffer(statusBytes)) if err != nil { logger.Error("Couldn't marshal status - ", errors.Join(err, fmt.Errorf("failed to post callback for transaction id %s", tx.Hash))) return @@ -58,8 +57,9 @@ func SendCallback(logger *slog.Logger, s store.MetamorphStore, tx *store.StoreDa } // default http client - httpClient := http.Client{} - httpClient.Timeout = 5 * time.Second + httpClient := http.Client{ + Timeout: 5 * time.Second, + } var response *http.Response response, err = httpClient.Do(request) From b44af61ce3f4355d215d5161575ba9bee328d8cb Mon Sep 17 00:00:00 2001 From: ssilagadze Date: Thu, 7 Dec 2023 12:29:24 +0100 Subject: [PATCH 05/22] fix log parsing --- metamorph/callbacker.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/metamorph/callbacker.go b/metamorph/callbacker.go index 8a72bc5df..02ff0bb1f 100644 --- a/metamorph/callbacker.go +++ b/metamorph/callbacker.go @@ -8,6 +8,7 @@ import ( "fmt" "log/slog" "net/http" + "strconv" "time" "github.com/bitcoin-sv/arc/api" @@ -78,7 +79,7 @@ func SendCallback(logger *slog.Logger, s store.MetamorphStore, tx *store.StoreDa } return } else { - logger.Error("callback response status code not ok - ", response.StatusCode) + logger.Error("callback response status code not ok - ", slog.String("status", strconv.Itoa(response.StatusCode))) } // sleep before trying again @@ -92,5 +93,5 @@ func SendCallback(logger *slog.Logger, s store.MetamorphStore, tx *store.StoreDa logger.Error("Couldn't update/remove callback url - ", err) return } - logger.Error("Couldn't send transaction info through callback url after ", CallbackTries, " tries") + logger.Error("Couldn't send transaction info through callback url after ", slog.String("status", strconv.Itoa(CallbackTries)), " tries") } From fa080be04009493f13edd027007a796b657f25c3 Mon Sep 17 00:00:00 2001 From: ssilagadze Date: Thu, 7 Dec 2023 13:09:01 +0100 Subject: [PATCH 06/22] fix log parsing --- metamorph/callbacker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metamorph/callbacker.go b/metamorph/callbacker.go index 02ff0bb1f..0bee8381f 100644 --- a/metamorph/callbacker.go +++ b/metamorph/callbacker.go @@ -93,5 +93,5 @@ func SendCallback(logger *slog.Logger, s store.MetamorphStore, tx *store.StoreDa logger.Error("Couldn't update/remove callback url - ", err) return } - logger.Error("Couldn't send transaction info through callback url after ", slog.String("status", strconv.Itoa(CallbackTries)), " tries") + logger.Error("Couldn't send transaction info through callback url after tries: ", slog.String("status", strconv.Itoa(CallbackTries))) } From 1ec8963ef49430948f2d4210ba3fdde0af4fcd24 Mon Sep 17 00:00:00 2001 From: ssilagadze Date: Thu, 7 Dec 2023 13:53:06 +0100 Subject: [PATCH 07/22] implement new mock func --- metamorph/server_test.go | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/metamorph/server_test.go b/metamorph/server_test.go index c0ece2fc4..e1d6a6820 100644 --- a/metamorph/server_test.go +++ b/metamorph/server_test.go @@ -361,6 +361,9 @@ func TestServer_GetTransactionStatus(t *testing.T) { } return data, tt.getErr }, + RemoveCallbackerFunc: func(ctx context.Context, hash *chainhash.Hash) error { + return nil + }, } server := NewServer(metamorphStore, nil, client, source) @@ -622,6 +625,9 @@ func TestPutTransactions(t *testing.T) { return nil, tc.getErr }, + RemoveCallbackerFunc: func(ctx context.Context, hash *chainhash.Hash) error { + return nil + }, } btc := &ClientIMock{ @@ -698,6 +704,9 @@ func TestSetUnlockedbyName(t *testing.T) { SetUnlockedByNameFunc: func(ctx context.Context, lockedBy string) (int, error) { return tc.recordsAffected, tc.errSetUnlocked }, + RemoveCallbackerFunc: func(ctx context.Context, hash *chainhash.Hash) error { + return nil + }, } server := NewServer(metamorphStore, nil, nil, source) @@ -728,7 +737,12 @@ func TestStartGRPCServer(t *testing.T) { for _, tc := range tt { t.Run(tc.name, func(t *testing.T) { - metamorphStore := &MetamorphStoreMock{SetUnlockedFunc: func(ctx context.Context, hashes []*chainhash.Hash) error { return nil }} + metamorphStore := &MetamorphStoreMock{ + SetUnlockedFunc: func(ctx context.Context, hashes []*chainhash.Hash) error { return nil }, + RemoveCallbackerFunc: func(ctx context.Context, hash *chainhash.Hash) error { + return nil + }, + } btc := &ClientIMock{} From 075f767abaa320d5ad550c041b2b6305a32b0cda Mon Sep 17 00:00:00 2001 From: ssilagadze Date: Thu, 7 Dec 2023 14:07:09 +0100 Subject: [PATCH 08/22] implement new mock func --- metamorph/processor_test.go | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/metamorph/processor_test.go b/metamorph/processor_test.go index 2cdd5f11c..9d6e7a6e1 100644 --- a/metamorph/processor_test.go +++ b/metamorph/processor_test.go @@ -35,6 +35,9 @@ import ( func TestNewProcessor(t *testing.T) { mtmStore := &MetamorphStoreMock{ SetUnlockedFunc: func(ctx context.Context, hashes []*chainhash.Hash) error { return nil }, + RemoveCallbackerFunc: func(ctx context.Context, hash *chainhash.Hash) error { + return nil + }, } pm := p2p.NewPeerManagerMock() @@ -287,6 +290,9 @@ func TestLoadUnmined(t *testing.T) { IsCentralisedFunc: func() bool { return tc.isCentralised }, + RemoveCallbackerFunc: func(ctx context.Context, hash *chainhash.Hash) error { + return nil + }, } processor, err := NewProcessor(mtmStore, pm, nil, btxMock, @@ -741,6 +747,9 @@ func TestProcessExpiredSeenTransactions(t *testing.T) { return tc.updateMinedErr }, SetUnlockedFunc: func(ctx context.Context, hashes []*chainhash.Hash) error { return nil }, + RemoveCallbackerFunc: func(ctx context.Context, hash *chainhash.Hash) error { + return nil + }, } btxMock := &ClientIMock{ GetTransactionBlocksFunc: func(ctx context.Context, transaction *blocktx_api.Transactions) (*blocktx_api.TransactionBlocks, error) { @@ -789,7 +798,12 @@ func TestProcessExpiredTransactions(t *testing.T) { for _, tc := range tt { t.Run(tc.name, func(t *testing.T) { - metamorphStore := &MetamorphStoreMock{SetUnlockedFunc: func(ctx context.Context, hashes []*chainhash.Hash) error { return nil }} + metamorphStore := &MetamorphStoreMock{ + SetUnlockedFunc: func(ctx context.Context, hashes []*chainhash.Hash) error { return nil }, + RemoveCallbackerFunc: func(ctx context.Context, hash *chainhash.Hash) error { + return nil + }, + } pm := p2p.NewPeerManagerMock() processor, err := NewProcessor(metamorphStore, pm, nil, nil, WithProcessExpiredSeenTxsInterval(time.Hour), From 7b1da6dc8b9dd47469caebe0c7779fa6e91331c0 Mon Sep 17 00:00:00 2001 From: ssilagadze Date: Thu, 7 Dec 2023 14:41:56 +0100 Subject: [PATCH 09/22] remove old test testing channel --- metamorph/callbacker.go | 2 +- metamorph/processor_test.go | 75 ++++++++++++++++++------------------- 2 files changed, 38 insertions(+), 39 deletions(-) diff --git a/metamorph/callbacker.go b/metamorph/callbacker.go index 0bee8381f..84ac532e1 100644 --- a/metamorph/callbacker.go +++ b/metamorph/callbacker.go @@ -24,7 +24,7 @@ const ( func SendCallback(logger *slog.Logger, s store.MetamorphStore, tx *store.StoreData) { sleepDuration := CallbackInterval - for i := 1; i < CallbackTries; i++ { + for i := 0; i < CallbackTries; i++ { statusString := metamorph_api.Status(tx.Status).String() blockHash := "" if tx.BlockHash != nil { diff --git a/metamorph/processor_test.go b/metamorph/processor_test.go index 9d6e7a6e1..927a4fc7d 100644 --- a/metamorph/processor_test.go +++ b/metamorph/processor_test.go @@ -12,7 +12,6 @@ import ( "time" "github.com/bitcoin-sv/arc/blocktx/blocktx_api" - "github.com/bitcoin-sv/arc/callbacker/callbacker_api" . "github.com/bitcoin-sv/arc/metamorph" "github.com/bitcoin-sv/arc/metamorph/metamorph_api" . "github.com/bitcoin-sv/arc/metamorph/mocks" @@ -533,43 +532,43 @@ func TestSendStatusMinedForTransaction(t *testing.T) { assert.Equal(t, metamorph_api.Status_MINED, txStored.Status) }) - t.Run("SendStatusMinedForTransaction callback", func(t *testing.T) { - s, err := metamorphSql.New("sqlite_memory") - require.NoError(t, err) - setStoreTestData(t, s) - - pm := p2p.NewPeerManagerMock() - - var wg sync.WaitGroup - callbackCh := make(chan *callbacker_api.Callback) - wg.Add(1) - go func() { - for cb := range callbackCh { - assert.Equal(t, metamorph_api.Status_MINED, metamorph_api.Status(cb.Status)) - assert.Equal(t, testdata.TX1Hash.CloneBytes(), cb.Hash) - assert.Equal(t, testdata.Block1Hash[:], cb.BlockHash) - assert.Equal(t, uint64(1233), cb.BlockHeight) - assert.Equal(t, "https://test.com", cb.Url) - assert.Equal(t, "token", cb.Token) - wg.Done() - } - }() - - processor, err := NewProcessor(s, pm, callbackCh, nil) - require.NoError(t, err) - // add the tx to the map - processor.ProcessorResponseMap.Set(testdata.TX1Hash, processor_response.NewProcessorResponseWithStatus( - testdata.TX1Hash, - metamorph_api.Status_SEEN_ON_NETWORK, - )) - - ok, sendErr := processor.SendStatusMinedForTransaction(testdata.TX1Hash, testdata.Block1Hash, 1233) - time.Sleep(100 * time.Millisecond) - assert.True(t, ok) - assert.NoError(t, sendErr) - - wg.Wait() - }) + // t.Run("SendStatusMinedForTransaction callback", func(t *testing.T) { + // s, err := metamorphSql.New("sqlite_memory") + // require.NoError(t, err) + // setStoreTestData(t, s) + + // pm := p2p.NewPeerManagerMock() + + // var wg sync.WaitGroup + // callbackCh := make(chan *callbacker_api.Callback) + // wg.Add(1) + // go func() { + // for cb := range callbackCh { + // assert.Equal(t, metamorph_api.Status_MINED, metamorph_api.Status(cb.Status)) + // assert.Equal(t, testdata.TX1Hash.CloneBytes(), cb.Hash) + // assert.Equal(t, testdata.Block1Hash[:], cb.BlockHash) + // assert.Equal(t, uint64(1233), cb.BlockHeight) + // assert.Equal(t, "https://test.com", cb.Url) + // assert.Equal(t, "token", cb.Token) + // wg.Done() + // } + // }() + + // processor, err := NewProcessor(s, pm, callbackCh, nil) + // require.NoError(t, err) + // // add the tx to the map + // processor.ProcessorResponseMap.Set(testdata.TX1Hash, processor_response.NewProcessorResponseWithStatus( + // testdata.TX1Hash, + // metamorph_api.Status_SEEN_ON_NETWORK, + // )) + + // ok, sendErr := processor.SendStatusMinedForTransaction(testdata.TX1Hash, testdata.Block1Hash, 1233) + // time.Sleep(100 * time.Millisecond) + // assert.True(t, ok) + // assert.NoError(t, sendErr) + + // wg.Wait() + // }) t.Run("SendStatusForTransaction known tx - processed", func(t *testing.T) { s, err := metamorphSql.New("sqlite_memory") From 8cbfad978fb893cd764e09acc5fc7b6e2a7cf613 Mon Sep 17 00:00:00 2001 From: ssilagadze Date: Thu, 7 Dec 2023 15:29:56 +0100 Subject: [PATCH 10/22] fix nil func pointer --- metamorph/server_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/metamorph/server_test.go b/metamorph/server_test.go index e1d6a6820..4fce9227d 100644 --- a/metamorph/server_test.go +++ b/metamorph/server_test.go @@ -807,7 +807,11 @@ func TestCheckUtxos(t *testing.T) { for _, tc := range tt { t.Run(tc.name, func(t *testing.T) { - metamorphStore := &MetamorphStoreMock{} + metamorphStore := &MetamorphStoreMock{ + RemoveCallbackerFunc: func(ctx context.Context, hash *chainhash.Hash) error { + return nil + }, + } btc := &ClientIMock{} From fd92b43af1a99b1f308db91caea6e420f6f2fb28 Mon Sep 17 00:00:00 2001 From: ssilagadze Date: Thu, 7 Dec 2023 15:37:27 +0100 Subject: [PATCH 11/22] add GetFunc implementtaion --- metamorph/processor_test.go | 12 ++++++++++++ metamorph/server_test.go | 9 +++++++++ 2 files changed, 21 insertions(+) diff --git a/metamorph/processor_test.go b/metamorph/processor_test.go index 927a4fc7d..8a135834e 100644 --- a/metamorph/processor_test.go +++ b/metamorph/processor_test.go @@ -33,6 +33,9 @@ import ( func TestNewProcessor(t *testing.T) { mtmStore := &MetamorphStoreMock{ + GetFunc: func(ctx context.Context, key []byte) (*store.StoreData, error) { + return &store.StoreData{}, nil + }, SetUnlockedFunc: func(ctx context.Context, hashes []*chainhash.Hash) error { return nil }, RemoveCallbackerFunc: func(ctx context.Context, hash *chainhash.Hash) error { return nil @@ -252,6 +255,9 @@ func TestLoadUnmined(t *testing.T) { }, } mtmStore := &MetamorphStoreMock{ + GetFunc: func(ctx context.Context, key []byte) (*store.StoreData, error) { + return &store.StoreData{}, nil + }, GetUnminedFunc: func(contextMoqParam context.Context, callback func(s *store.StoreData)) error { for _, data := range tc.storedData { callback(data) @@ -735,6 +741,9 @@ func TestProcessExpiredSeenTransactions(t *testing.T) { t.Run(tc.name, func(t *testing.T) { metamorphStore := &MetamorphStoreMock{ + GetFunc: func(ctx context.Context, key []byte) (*store.StoreData, error) { + return &store.StoreData{}, nil + }, UpdateMinedFunc: func(ctx context.Context, hash *chainhash.Hash, blockHash *chainhash.Hash, blockHeight uint64) error { require.Condition(t, func() (success bool) { oneOfHash := hash.IsEqual(testdata.TX1Hash) || hash.IsEqual(testdata.TX2Hash) || hash.IsEqual(testdata.TX3Hash) @@ -798,6 +807,9 @@ func TestProcessExpiredTransactions(t *testing.T) { for _, tc := range tt { t.Run(tc.name, func(t *testing.T) { metamorphStore := &MetamorphStoreMock{ + GetFunc: func(ctx context.Context, key []byte) (*store.StoreData, error) { + return &store.StoreData{}, nil + }, SetUnlockedFunc: func(ctx context.Context, hashes []*chainhash.Hash) error { return nil }, RemoveCallbackerFunc: func(ctx context.Context, hash *chainhash.Hash) error { return nil diff --git a/metamorph/server_test.go b/metamorph/server_test.go index 4fce9227d..f9500d60f 100644 --- a/metamorph/server_test.go +++ b/metamorph/server_test.go @@ -701,6 +701,9 @@ func TestSetUnlockedbyName(t *testing.T) { for _, tc := range tt { t.Run(tc.name, func(t *testing.T) { metamorphStore := &MetamorphStoreMock{ + GetFunc: func(ctx context.Context, key []byte) (*store.StoreData, error) { + return &store.StoreData{}, nil + }, SetUnlockedByNameFunc: func(ctx context.Context, lockedBy string) (int, error) { return tc.recordsAffected, tc.errSetUnlocked }, @@ -738,6 +741,9 @@ func TestStartGRPCServer(t *testing.T) { for _, tc := range tt { t.Run(tc.name, func(t *testing.T) { metamorphStore := &MetamorphStoreMock{ + GetFunc: func(ctx context.Context, key []byte) (*store.StoreData, error) { + return &store.StoreData{}, nil + }, SetUnlockedFunc: func(ctx context.Context, hashes []*chainhash.Hash) error { return nil }, RemoveCallbackerFunc: func(ctx context.Context, hash *chainhash.Hash) error { return nil @@ -808,6 +814,9 @@ func TestCheckUtxos(t *testing.T) { for _, tc := range tt { t.Run(tc.name, func(t *testing.T) { metamorphStore := &MetamorphStoreMock{ + GetFunc: func(ctx context.Context, key []byte) (*store.StoreData, error) { + return &store.StoreData{}, nil + }, RemoveCallbackerFunc: func(ctx context.Context, hash *chainhash.Hash) error { return nil }, From 6178719ebfe97f5a34632c44a9d9a3315f92eb77 Mon Sep 17 00:00:00 2001 From: ssilagadze Date: Thu, 7 Dec 2023 15:39:39 +0100 Subject: [PATCH 12/22] add GetFunc implementtaion --- metamorph/processor_test.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/metamorph/processor_test.go b/metamorph/processor_test.go index 8a135834e..5b7a578fd 100644 --- a/metamorph/processor_test.go +++ b/metamorph/processor_test.go @@ -255,9 +255,6 @@ func TestLoadUnmined(t *testing.T) { }, } mtmStore := &MetamorphStoreMock{ - GetFunc: func(ctx context.Context, key []byte) (*store.StoreData, error) { - return &store.StoreData{}, nil - }, GetUnminedFunc: func(contextMoqParam context.Context, callback func(s *store.StoreData)) error { for _, data := range tc.storedData { callback(data) From 848d467b854f0a93a8955a69f84546d72cd7502f Mon Sep 17 00:00:00 2001 From: ssilagadze Date: Fri, 8 Dec 2023 08:35:12 +0100 Subject: [PATCH 13/22] fix test data --- metamorph/callbacker.go | 2 -- metamorph/processor_test.go | 6 +++--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/metamorph/callbacker.go b/metamorph/callbacker.go index 84ac532e1..1092250c8 100644 --- a/metamorph/callbacker.go +++ b/metamorph/callbacker.go @@ -31,8 +31,6 @@ func SendCallback(logger *slog.Logger, s store.MetamorphStore, tx *store.StoreDa blockHash = utils.ReverseAndHexEncodeSlice(tx.BlockHash.CloneBytes()) } - logger.Info("sending callback for transaction", slog.String("token", tx.CallbackToken), slog.String("hash", tx.Hash.String()), slog.String("url", tx.CallbackUrl), slog.Uint64("block height", tx.BlockHeight), slog.String("block hash", blockHash)) - status := &api.TransactionStatus{ BlockHash: &blockHash, BlockHeight: &tx.BlockHeight, diff --git a/metamorph/processor_test.go b/metamorph/processor_test.go index 5b7a578fd..dacbc6824 100644 --- a/metamorph/processor_test.go +++ b/metamorph/processor_test.go @@ -34,7 +34,7 @@ import ( func TestNewProcessor(t *testing.T) { mtmStore := &MetamorphStoreMock{ GetFunc: func(ctx context.Context, key []byte) (*store.StoreData, error) { - return &store.StoreData{}, nil + return &store.StoreData{Hash: testdata.TX2Hash}, nil }, SetUnlockedFunc: func(ctx context.Context, hashes []*chainhash.Hash) error { return nil }, RemoveCallbackerFunc: func(ctx context.Context, hash *chainhash.Hash) error { @@ -739,7 +739,7 @@ func TestProcessExpiredSeenTransactions(t *testing.T) { metamorphStore := &MetamorphStoreMock{ GetFunc: func(ctx context.Context, key []byte) (*store.StoreData, error) { - return &store.StoreData{}, nil + return &store.StoreData{Hash: testdata.TX2Hash}, nil }, UpdateMinedFunc: func(ctx context.Context, hash *chainhash.Hash, blockHash *chainhash.Hash, blockHeight uint64) error { require.Condition(t, func() (success bool) { @@ -805,7 +805,7 @@ func TestProcessExpiredTransactions(t *testing.T) { t.Run(tc.name, func(t *testing.T) { metamorphStore := &MetamorphStoreMock{ GetFunc: func(ctx context.Context, key []byte) (*store.StoreData, error) { - return &store.StoreData{}, nil + return &store.StoreData{Hash: testdata.TX2Hash}, nil }, SetUnlockedFunc: func(ctx context.Context, hashes []*chainhash.Hash) error { return nil }, RemoveCallbackerFunc: func(ctx context.Context, hash *chainhash.Hash) error { From 760d5df05245d76322cd2fb9d07e61d382d2b61b Mon Sep 17 00:00:00 2001 From: ssilagadze Date: Fri, 8 Dec 2023 09:08:11 +0100 Subject: [PATCH 14/22] fix test data --- metamorph/callbacker.go | 4 +++- test/arc_txt_endpoint_test.go | 23 ----------------------- 2 files changed, 3 insertions(+), 24 deletions(-) diff --git a/metamorph/callbacker.go b/metamorph/callbacker.go index 1092250c8..145358622 100644 --- a/metamorph/callbacker.go +++ b/metamorph/callbacker.go @@ -19,7 +19,7 @@ import ( const ( CallbackTries = 5 - CallbackInterval = 30 + CallbackInterval = 5 ) func SendCallback(logger *slog.Logger, s store.MetamorphStore, tx *store.StoreData) { @@ -31,6 +31,8 @@ func SendCallback(logger *slog.Logger, s store.MetamorphStore, tx *store.StoreDa blockHash = utils.ReverseAndHexEncodeSlice(tx.BlockHash.CloneBytes()) } + logger.Info("sending callback for transaction", slog.String("token", tx.CallbackToken), slog.String("hash", tx.Hash.String()), slog.String("url", tx.CallbackUrl), slog.Uint64("block height", tx.BlockHeight), slog.String("block hash", blockHash)) + status := &api.TransactionStatus{ BlockHash: &blockHash, BlockHeight: &tx.BlockHeight, diff --git a/test/arc_txt_endpoint_test.go b/test/arc_txt_endpoint_test.go index 942f424b0..10db07905 100644 --- a/test/arc_txt_endpoint_test.go +++ b/test/arc_txt_endpoint_test.go @@ -336,29 +336,6 @@ func TestPostCallbackToken(t *testing.T) { if expectedAuthHeader != req.Header.Get("Authorization") { errChan <- fmt.Errorf("auth header %s not as expected %s", expectedAuthHeader, req.Header.Get("Authorization")) } - - // Let ARC send the callback 2 times. First one fails. - if iterations == 0 { - t.Log("callback received, responding bad request") - - err = respondToCallback(w, false) - if err != nil { - t.Fatalf("Failed to respond to callback: %v", err) - } - - callbackReceivedChan <- &status - - iterations++ - return - } - - t.Log("callback received, responding success") - - err = respondToCallback(w, true) - if err != nil { - t.Fatalf("Failed to respond to callback: %v", err) - } - callbackReceivedChan <- &status }) go func(server *http.Server) { From 64d462cd47de3dc0f8063c4a10e7517ea70f1ebd Mon Sep 17 00:00:00 2001 From: ssilagadze Date: Fri, 8 Dec 2023 10:26:09 +0100 Subject: [PATCH 15/22] fix test data --- test/arc_txt_endpoint_test.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/test/arc_txt_endpoint_test.go b/test/arc_txt_endpoint_test.go index 10db07905..942f424b0 100644 --- a/test/arc_txt_endpoint_test.go +++ b/test/arc_txt_endpoint_test.go @@ -336,6 +336,29 @@ func TestPostCallbackToken(t *testing.T) { if expectedAuthHeader != req.Header.Get("Authorization") { errChan <- fmt.Errorf("auth header %s not as expected %s", expectedAuthHeader, req.Header.Get("Authorization")) } + + // Let ARC send the callback 2 times. First one fails. + if iterations == 0 { + t.Log("callback received, responding bad request") + + err = respondToCallback(w, false) + if err != nil { + t.Fatalf("Failed to respond to callback: %v", err) + } + + callbackReceivedChan <- &status + + iterations++ + return + } + + t.Log("callback received, responding success") + + err = respondToCallback(w, true) + if err != nil { + t.Fatalf("Failed to respond to callback: %v", err) + } + callbackReceivedChan <- &status }) go func(server *http.Server) { From 70c57c484c307666dbbeb43d4580cec69bc6eef7 Mon Sep 17 00:00:00 2001 From: ssilagadze Date: Fri, 8 Dec 2023 11:26:23 +0100 Subject: [PATCH 16/22] remove unnecessary else and check --- metamorph/callbacker.go | 4 ++-- metamorph/store/sql/sql.go | 3 --- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/metamorph/callbacker.go b/metamorph/callbacker.go index 145358622..af2a9843d 100644 --- a/metamorph/callbacker.go +++ b/metamorph/callbacker.go @@ -78,10 +78,10 @@ func SendCallback(logger *slog.Logger, s store.MetamorphStore, tx *store.StoreDa continue } return - } else { - logger.Error("callback response status code not ok - ", slog.String("status", strconv.Itoa(response.StatusCode))) } + logger.Error("callback response status code not ok - ", slog.String("status", strconv.Itoa(response.StatusCode))) + // sleep before trying again time.Sleep(time.Duration(sleepDuration) * time.Second) // increase intervals on each failure diff --git a/metamorph/store/sql/sql.go b/metamorph/store/sql/sql.go index 7c99b904e..286768545 100644 --- a/metamorph/store/sql/sql.go +++ b/metamorph/store/sql/sql.go @@ -523,9 +523,6 @@ func (s *SQL) RemoveCallbacker(ctx context.Context, hash *chainhash.Hash) error span.LogFields(log.Error(err)) return err } - if n == 0 { - return store.ErrNotFound - } return nil } From 969eda0eca583d127b281884fd5bd8396dc9fbff Mon Sep 17 00:00:00 2001 From: ssilagadze Date: Fri, 8 Dec 2023 11:42:24 +0100 Subject: [PATCH 17/22] remove old callbacker --- cmd/metamorph.go | 28 ---------------------------- metamorph/processor.go | 23 +---------------------- metamorph/store/sql/sql.go | 3 +-- 3 files changed, 2 insertions(+), 52 deletions(-) diff --git a/cmd/metamorph.go b/cmd/metamorph.go index 649fb2eda..ec3f521a0 100644 --- a/cmd/metamorph.go +++ b/cmd/metamorph.go @@ -8,18 +8,14 @@ import ( "net/url" "os" "path" - "path/filepath" "strconv" "time" awscfg "github.com/aws/aws-sdk-go-v2/config" awsdynamodb "github.com/aws/aws-sdk-go-v2/service/dynamodb" - "github.com/bitcoin-sv/arc/asynccaller" "github.com/bitcoin-sv/arc/blocktx" "github.com/bitcoin-sv/arc/blocktx/blocktx_api" blockTxStore "github.com/bitcoin-sv/arc/blocktx/store" - "github.com/bitcoin-sv/arc/callbacker" - "github.com/bitcoin-sv/arc/callbacker/callbacker_api" "github.com/bitcoin-sv/arc/config" "github.com/bitcoin-sv/arc/metamorph" "github.com/bitcoin-sv/arc/metamorph/store" @@ -112,29 +108,6 @@ func StartMetamorph(logger utils.Logger) (func(), error) { pm, statusMessageCh := initPeerManager(logger, s) - callbackerAddress := viper.GetString("callbacker.dialAddr") - if callbackerAddress == "" { - logger.Fatalf("no callbacker.dialAddr setting found") - } - cb := callbacker.NewClient(callbackerAddress) - - callbackRegisterPath, err := filepath.Abs(path.Join(folder, "callback-register")) - if err != nil { - logger.Fatalf("Could not get absolute path: %v", err) - } - - // create an async caller to callbacker - var cbAsyncCaller *asynccaller.AsyncCaller[callbacker_api.Callback] - cbAsyncCaller, err = asynccaller.New[callbacker_api.Callback]( - logger, - callbackRegisterPath, - 10*time.Second, - metamorph.NewRegisterCallbackClient(cb), - ) - if err != nil { - logger.Fatalf("error creating async caller: %v", err) - } - mapExpiryStr := viper.GetString("metamorph.processorCacheExpiryTime") mapExpiry, err := time.ParseDuration(mapExpiryStr) if err != nil { @@ -156,7 +129,6 @@ func StartMetamorph(logger utils.Logger) (func(), error) { metamorphProcessor, err := metamorph.NewProcessor( s, pm, - cbAsyncCaller.GetChannel(), btx, metamorph.WithCacheExpiryTime(mapExpiry), metamorph.WithProcessorLogger(processorLogger), diff --git a/metamorph/processor.go b/metamorph/processor.go index 0e308d5f8..58f3cadd3 100644 --- a/metamorph/processor.go +++ b/metamorph/processor.go @@ -12,7 +12,6 @@ import ( "github.com/bitcoin-sv/arc/blocktx" "github.com/bitcoin-sv/arc/blocktx/blocktx_api" - "github.com/bitcoin-sv/arc/callbacker/callbacker_api" "github.com/bitcoin-sv/arc/metamorph/metamorph_api" "github.com/bitcoin-sv/arc/metamorph/processor_response" "github.com/bitcoin-sv/arc/metamorph/store" @@ -41,7 +40,6 @@ const ( type Processor struct { store store.MetamorphStore - cbChannel chan *callbacker_api.Callback ProcessorResponseMap *ProcessorResponseMap pm p2p.PeerManagerI btc blocktx.ClientI @@ -72,8 +70,7 @@ type Processor struct { type Option func(f *Processor) -func NewProcessor(s store.MetamorphStore, pm p2p.PeerManagerI, - cbChannel chan *callbacker_api.Callback, btc blocktx.ClientI, opts ...Option) (*Processor, error) { +func NewProcessor(s store.MetamorphStore, pm p2p.PeerManagerI, btc blocktx.ClientI, opts ...Option) (*Processor, error) { if s == nil { return nil, errors.New("store cannot be nil") } @@ -85,7 +82,6 @@ func NewProcessor(s store.MetamorphStore, pm p2p.PeerManagerI, p := &Processor{ startTime: time.Now().UTC(), store: s, - cbChannel: cbChannel, pm: pm, btc: btc, dataRetentionPeriod: dataRetentionPeriodDefault, @@ -151,9 +147,6 @@ func (p *Processor) Shutdown() { p.processExpiredSeenTxsTicker.Stop() p.processExpiredTxsTicker.Stop() p.ProcessorResponseMap.Close() - if p.cbChannel != nil { - close(p.cbChannel) - } } func (p *Processor) unlockItems() error { @@ -387,20 +380,6 @@ func (p *Processor) SendStatusMinedForTransaction(hash *chainhash.Hash, blockHas resp.Close() p.ProcessorResponseMap.Delete(hash) - // if p.cbChannel != nil { - // data, _ := p.store.Get(spanCtx, hash[:]) - - // if data != nil && data.CallbackUrl != "" { - // p.cbChannel <- &callbacker_api.Callback{ - // Hash: data.Hash[:], - // Url: data.CallbackUrl, - // Token: data.CallbackToken, - // Status: int32(data.Status), - // BlockHash: data.BlockHash[:], - // BlockHeight: data.BlockHeight, - // } - // } - // } data, _ := p.store.Get(spanCtx, hash[:]) go SendCallback(p.logger, p.store, data) }, diff --git a/metamorph/store/sql/sql.go b/metamorph/store/sql/sql.go index 286768545..cd44b1a3b 100644 --- a/metamorph/store/sql/sql.go +++ b/metamorph/store/sql/sql.go @@ -516,8 +516,7 @@ func (s *SQL) RemoveCallbacker(ctx context.Context, hash *chainhash.Hash) error return err } - var n int64 - n, err = result.RowsAffected() + _, err = result.RowsAffected() if err != nil { span.SetTag(string(ext.Error), true) span.LogFields(log.Error(err)) From 93ce26f142947d0853a2538882cad8f97fcd5b32 Mon Sep 17 00:00:00 2001 From: ssilagadze Date: Fri, 8 Dec 2023 11:58:33 +0100 Subject: [PATCH 18/22] remove unnecessary parameter --- metamorph/processor_test.go | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/metamorph/processor_test.go b/metamorph/processor_test.go index dacbc6824..ed6ed26d3 100644 --- a/metamorph/processor_test.go +++ b/metamorph/processor_test.go @@ -77,7 +77,7 @@ func TestNewProcessor(t *testing.T) { for _, tc := range tt { t.Run(tc.name, func(t *testing.T) { - processor, err := NewProcessor(tc.store, tc.pm, nil, nil, + processor, err := NewProcessor(tc.store, tc.pm, nil, WithCacheExpiryTime(time.Second*5), WithProcessExpiredSeenTxsInterval(time.Second*5), WithProcessorLogger(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: LogLevelDefault}))), @@ -297,7 +297,7 @@ func TestLoadUnmined(t *testing.T) { }, } - processor, err := NewProcessor(mtmStore, pm, nil, btxMock, + processor, err := NewProcessor(mtmStore, pm, btxMock, WithProcessExpiredSeenTxsInterval(time.Hour*24), WithCacheExpiryTime(time.Hour*24), WithNow(func() time.Time { @@ -332,7 +332,7 @@ func TestProcessTransaction(t *testing.T) { pm := p2p.NewPeerManagerMock() - processor, err := NewProcessor(s, pm, nil, nil) + processor, err := NewProcessor(s, pm, nil) require.NoError(t, err) assert.Equal(t, 0, processor.ProcessorResponseMap.Len()) @@ -385,7 +385,7 @@ func Benchmark_ProcessTransaction(b *testing.B) { pm := p2p.NewPeerManagerMock() - processor, err := NewProcessor(s, pm, nil, nil) + processor, err := NewProcessor(s, pm, nil) require.NoError(b, err) assert.Equal(b, 0, processor.ProcessorResponseMap.Len()) @@ -409,7 +409,7 @@ func TestSendStatusForTransaction(t *testing.T) { pm := p2p.NewPeerManagerMock() - processor, err := NewProcessor(s, pm, nil, nil) + processor, err := NewProcessor(s, pm, nil) require.NoError(t, err) assert.Equal(t, 0, processor.ProcessorResponseMap.Len()) @@ -426,7 +426,7 @@ func TestSendStatusForTransaction(t *testing.T) { pm := p2p.NewPeerManagerMock() - processor, err := NewProcessor(s, pm, nil, nil) + processor, err := NewProcessor(s, pm, nil) require.NoError(t, err) assert.Equal(t, 0, processor.ProcessorResponseMap.Len()) @@ -444,7 +444,7 @@ func TestSendStatusForTransaction(t *testing.T) { pm := p2p.NewPeerManagerMock() - processor, err := NewProcessor(s, pm, nil, nil) + processor, err := NewProcessor(s, pm, nil) require.NoError(t, err) assert.Equal(t, 0, processor.ProcessorResponseMap.Len()) @@ -464,7 +464,7 @@ func TestSendStatusForTransaction(t *testing.T) { pm := p2p.NewPeerManagerMock() - processor, err := NewProcessor(s, pm, nil, nil) + processor, err := NewProcessor(s, pm, nil) require.NoError(t, err) assert.Equal(t, 0, processor.ProcessorResponseMap.Len()) @@ -516,7 +516,7 @@ func TestSendStatusMinedForTransaction(t *testing.T) { pm := p2p.NewPeerManagerMock() - processor, err := NewProcessor(s, pm, nil, nil) + processor, err := NewProcessor(s, pm, nil) require.NoError(t, err) processor.ProcessorResponseMap.Set(testdata.TX1Hash, processor_response.NewProcessorResponseWithStatus( testdata.TX1Hash, @@ -579,7 +579,7 @@ func TestSendStatusMinedForTransaction(t *testing.T) { pm := p2p.NewPeerManagerMock() - processor, err := NewProcessor(s, pm, nil, nil) + processor, err := NewProcessor(s, pm, nil) require.NoError(t, err) assert.Equal(t, 0, processor.ProcessorResponseMap.Len()) @@ -632,7 +632,7 @@ func BenchmarkProcessTransaction(b *testing.B) { }() pm := p2p.NewPeerManagerMock() - processor, err := NewProcessor(s, pm, nil, nil) + processor, err := NewProcessor(s, pm, nil) require.NoError(b, err) assert.Equal(b, 0, processor.ProcessorResponseMap.Len()) @@ -765,7 +765,7 @@ func TestProcessExpiredSeenTransactions(t *testing.T) { } pm := p2p.NewPeerManagerMock() - processor, err := NewProcessor(metamorphStore, pm, nil, btxMock, + processor, err := NewProcessor(metamorphStore, pm, btxMock, WithProcessExpiredSeenTxsInterval(20*time.Millisecond), WithProcessExpiredTxsInterval(time.Hour), ) @@ -813,7 +813,7 @@ func TestProcessExpiredTransactions(t *testing.T) { }, } pm := p2p.NewPeerManagerMock() - processor, err := NewProcessor(metamorphStore, pm, nil, nil, + processor, err := NewProcessor(metamorphStore, pm, nil, WithProcessExpiredSeenTxsInterval(time.Hour), WithProcessExpiredTxsInterval(time.Millisecond*20), WithNow(func() time.Time { From 272317d6f6896521373ff65a241eefdc18eb9632 Mon Sep 17 00:00:00 2001 From: ssilagadze Date: Mon, 11 Dec 2023 08:30:26 +0100 Subject: [PATCH 19/22] remove unnecessary parameter --- metamorph/processor_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metamorph/processor_test.go b/metamorph/processor_test.go index 091ceb60b..263c914d7 100644 --- a/metamorph/processor_test.go +++ b/metamorph/processor_test.go @@ -521,7 +521,7 @@ func TestSendStatusForTransaction(t *testing.T) { pm := p2p.NewPeerManagerMock() - processor, err := NewProcessor(metamorphStore, pm, nil, nil, WithNow(func() time.Time { + processor, err := NewProcessor(metamorphStore, pm, nil, WithNow(func() time.Time { return time.Date(2023, 10, 1, 13, 0, 0, 0, time.UTC) })) require.NoError(t, err) From 8e1811edde872fc98eadf77197083d3d51e0a87e Mon Sep 17 00:00:00 2001 From: ssilagadze Date: Mon, 11 Dec 2023 08:52:21 +0100 Subject: [PATCH 20/22] rename const --- metamorph/callbacker.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/metamorph/callbacker.go b/metamorph/callbacker.go index af2a9843d..f22e744cd 100644 --- a/metamorph/callbacker.go +++ b/metamorph/callbacker.go @@ -18,12 +18,12 @@ import ( ) const ( - CallbackTries = 5 - CallbackInterval = 5 + CallbackTries = 5 + CallbackIntervalSeconds = 5 ) func SendCallback(logger *slog.Logger, s store.MetamorphStore, tx *store.StoreData) { - sleepDuration := CallbackInterval + sleepDuration := CallbackIntervalSeconds for i := 0; i < CallbackTries; i++ { statusString := metamorph_api.Status(tx.Status).String() blockHash := "" From fbe226552c2e042b825bd50b90e0a4f3704ab837 Mon Sep 17 00:00:00 2001 From: ssilagadze Date: Tue, 12 Dec 2023 10:16:24 +0100 Subject: [PATCH 21/22] add back implementation for removing callback url --- metamorph/store/postgresql/postgres.go | 20 +++++++++++++++++ metamorph/store/sqlite/sqlite.go | 31 ++++++++++++++++++++++++++ 2 files changed, 51 insertions(+) diff --git a/metamorph/store/postgresql/postgres.go b/metamorph/store/postgresql/postgres.go index 649b61617..86b58b44d 100644 --- a/metamorph/store/postgresql/postgres.go +++ b/metamorph/store/postgresql/postgres.go @@ -533,6 +533,26 @@ func (p *PostgreSQL) UpdateMined(ctx context.Context, hash *chainhash.Hash, bloc return err } +func (p *PostgreSQL) RemoveCallbacker(ctx context.Context, hash *chainhash.Hash) error { + startNanos := p.now().UnixNano() + defer func() { + gocore.NewStat("mtm_store_sql").NewStat("RemoveCallbacker").AddTime(startNanos) + }() + span, _ := opentracing.StartSpanFromContext(ctx, "sql:RemoveCallbacker") + defer span.Finish() + + q := `UPDATE metamorph.transactions SET callback_url = '' WHERE hash = $5;` + + _, err := p.db.ExecContext(ctx, q, metamorph_api.Status_MINED, p.now().Format(time.RFC3339), hash[:]) + + if err != nil { + span.SetTag(string(ext.Error), true) + span.LogFields(log.Error(err)) + } + + return err +} + func (p *PostgreSQL) GetBlockProcessed(ctx context.Context, blockHash *chainhash.Hash) (*time.Time, error) { startNanos := p.now().UnixNano() defer func() { diff --git a/metamorph/store/sqlite/sqlite.go b/metamorph/store/sqlite/sqlite.go index b1c41ef27..ab01232b9 100644 --- a/metamorph/store/sqlite/sqlite.go +++ b/metamorph/store/sqlite/sqlite.go @@ -125,6 +125,37 @@ func (s *SqLite) IsCentralised() bool { return false } +func (s *SqLite) RemoveCallbacker(ctx context.Context, hash *chainhash.Hash) error { + startNanos := s.now().UnixNano() + defer func() { + gocore.NewStat("mtm_store_sql").NewStat("RemoveCallbacker").AddTime(startNanos) + }() + span, _ := opentracing.StartSpanFromContext(ctx, "sql:RemoveCallbacker") + defer span.Finish() + + q := `UPDATE transactions SET status = callback_url = '' WHERE hash = $3;` + + result, err := s.db.ExecContext(ctx, q, hash[:]) + if err != nil { + span.SetTag(string(ext.Error), true) + span.LogFields(log.Error(err)) + return err + } + + var n int64 + n, err = result.RowsAffected() + if err != nil { + span.SetTag(string(ext.Error), true) + span.LogFields(log.Error(err)) + return err + } + if n == 0 { + return store.ErrNotFound + } + + return nil +} + func (s *SqLite) SetUnlocked(ctx context.Context, hashes []*chainhash.Hash) error { return nil } From bba78674698f5c9022df2ea56235b90b78745a6b Mon Sep 17 00:00:00 2001 From: ssilagadze Date: Tue, 12 Dec 2023 10:28:17 +0100 Subject: [PATCH 22/22] reduce batch size to 50 --- test/arc_txt_endpoint_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/arc_txt_endpoint_test.go b/test/arc_txt_endpoint_test.go index 430f9aabb..a826cdf28 100644 --- a/test/arc_txt_endpoint_test.go +++ b/test/arc_txt_endpoint_test.go @@ -138,7 +138,7 @@ func TestBatchChainedTxs(t *testing.T) { utxos := getUtxos(t, address) require.True(t, len(utxos) > 0, "No UTXOs available for the address") - txs, err := createTxChain(privateKey, utxos[0], 100) + txs, err := createTxChain(privateKey, utxos[0], 50) require.NoError(t, err) arcBody := make([]api.TransactionRequest, len(txs))