diff --git a/cmd/metamorph.go b/cmd/metamorph.go index 381cfb4d1..ec655e12c 100644 --- a/cmd/metamorph.go +++ b/cmd/metamorph.go @@ -8,18 +8,14 @@ import ( "net/url" "os" "path" - "path/filepath" "strconv" "time" awscfg "github.com/aws/aws-sdk-go-v2/config" awsdynamodb "github.com/aws/aws-sdk-go-v2/service/dynamodb" - "github.com/bitcoin-sv/arc/asynccaller" "github.com/bitcoin-sv/arc/blocktx" "github.com/bitcoin-sv/arc/blocktx/blocktx_api" blockTxStore "github.com/bitcoin-sv/arc/blocktx/store" - "github.com/bitcoin-sv/arc/callbacker" - "github.com/bitcoin-sv/arc/callbacker/callbacker_api" "github.com/bitcoin-sv/arc/config" "github.com/bitcoin-sv/arc/metamorph" "github.com/bitcoin-sv/arc/metamorph/store" @@ -115,29 +111,6 @@ func StartMetamorph(logger utils.Logger) (func(), error) { pm, statusMessageCh := initPeerManager(logger, s) - callbackerAddress := viper.GetString("callbacker.dialAddr") - if callbackerAddress == "" { - logger.Fatalf("no callbacker.dialAddr setting found") - } - cb := callbacker.NewClient(callbackerAddress) - - callbackRegisterPath, err := filepath.Abs(path.Join(folder, "callback-register")) - if err != nil { - logger.Fatalf("Could not get absolute path: %v", err) - } - - // create an async caller to callbacker - var cbAsyncCaller *asynccaller.AsyncCaller[callbacker_api.Callback] - cbAsyncCaller, err = asynccaller.New[callbacker_api.Callback]( - logger, - callbackRegisterPath, - 10*time.Second, - metamorph.NewRegisterCallbackClient(cb), - ) - if err != nil { - logger.Fatalf("error creating async caller: %v", err) - } - mapExpiryStr := viper.GetString("metamorph.processorCacheExpiryTime") mapExpiry, err := time.ParseDuration(mapExpiryStr) if err != nil { @@ -159,7 +132,6 @@ func StartMetamorph(logger utils.Logger) (func(), error) { metamorphProcessor, err := metamorph.NewProcessor( s, pm, - cbAsyncCaller.GetChannel(), btx, metamorph.WithCacheExpiryTime(mapExpiry), metamorph.WithProcessorLogger(processorLogger), diff --git a/metamorph/callbacker.go b/metamorph/callbacker.go new file mode 100644 index 000000000..f22e744cd --- /dev/null +++ b/metamorph/callbacker.go @@ -0,0 +1,97 @@ +package metamorph + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "log/slog" + "net/http" + "strconv" + "time" + + "github.com/bitcoin-sv/arc/api" + "github.com/bitcoin-sv/arc/metamorph/metamorph_api" + "github.com/bitcoin-sv/arc/metamorph/store" + "github.com/ordishs/go-utils" +) + +const ( + CallbackTries = 5 + CallbackIntervalSeconds = 5 +) + +func SendCallback(logger *slog.Logger, s store.MetamorphStore, tx *store.StoreData) { + sleepDuration := CallbackIntervalSeconds + for i := 0; i < CallbackTries; i++ { + statusString := metamorph_api.Status(tx.Status).String() + blockHash := "" + if tx.BlockHash != nil { + blockHash = utils.ReverseAndHexEncodeSlice(tx.BlockHash.CloneBytes()) + } + + logger.Info("sending callback for transaction", slog.String("token", tx.CallbackToken), slog.String("hash", tx.Hash.String()), slog.String("url", tx.CallbackUrl), slog.Uint64("block height", tx.BlockHeight), slog.String("block hash", blockHash)) + + status := &api.TransactionStatus{ + BlockHash: &blockHash, + BlockHeight: &tx.BlockHeight, + TxStatus: &statusString, + Txid: tx.Hash.String(), + Timestamp: time.Now(), + } + statusBytes, err := json.Marshal(status) + if err != nil { + logger.Error("Couldn't marshal status - ", err) + return + } + + var request *http.Request + request, err = http.NewRequest("POST", tx.CallbackUrl, bytes.NewBuffer(statusBytes)) + if err != nil { + logger.Error("Couldn't marshal status - ", errors.Join(err, fmt.Errorf("failed to post callback for transaction id %s", tx.Hash))) + return + } + request.Header.Set("Content-Type", "application/json; charset=UTF-8") + if tx.CallbackToken != "" { + request.Header.Set("Authorization", "Bearer "+tx.CallbackToken) + } + + // default http client + httpClient := http.Client{ + Timeout: 5 * time.Second, + } + + var response *http.Response + response, err = httpClient.Do(request) + if err != nil { + logger.Error("Couldn't send transaction info through callback url - ", err) + continue + } + defer response.Body.Close() + + // if callback was sent successfully we stop here + if response.StatusCode == http.StatusOK { + err = s.RemoveCallbacker(context.Background(), tx.Hash) + if err != nil { + logger.Error("Couldn't update/remove callback url - ", err) + continue + } + return + } + + logger.Error("callback response status code not ok - ", slog.String("status", strconv.Itoa(response.StatusCode))) + + // sleep before trying again + time.Sleep(time.Duration(sleepDuration) * time.Second) + // increase intervals on each failure + sleepDuration *= 2 + } + + err := s.RemoveCallbacker(context.Background(), tx.Hash) + if err != nil { + logger.Error("Couldn't update/remove callback url - ", err) + return + } + logger.Error("Couldn't send transaction info through callback url after tries: ", slog.String("status", strconv.Itoa(CallbackTries))) +} diff --git a/metamorph/mocks/store_mock.go b/metamorph/mocks/store_mock.go index 0b3e00f53..64b8bbd0b 100644 --- a/metamorph/mocks/store_mock.go +++ b/metamorph/mocks/store_mock.go @@ -40,6 +40,9 @@ var _ store.MetamorphStore = &MetamorphStoreMock{} // IsCentralisedFunc: func() bool { // panic("mock out the IsCentralised method") // }, +// RemoveCallbackerFunc: func(ctx context.Context, hash *chainhash.Hash) error { +// panic("mock out the RemoveCallbacker method") +// }, // SetFunc: func(ctx context.Context, key []byte, value *store.StoreData) error { // panic("mock out the Set method") // }, @@ -83,6 +86,9 @@ type MetamorphStoreMock struct { // IsCentralisedFunc mocks the IsCentralised method. IsCentralisedFunc func() bool + // RemoveCallbackerFunc mocks the RemoveCallbacker method. + RemoveCallbackerFunc func(ctx context.Context, hash *chainhash.Hash) error + // SetFunc mocks the Set method. SetFunc func(ctx context.Context, key []byte, value *store.StoreData) error @@ -139,6 +145,13 @@ type MetamorphStoreMock struct { // IsCentralised holds details about calls to the IsCentralised method. IsCentralised []struct { } + // RemoveCallbacker holds details about calls to the RemoveCallbacker method. + RemoveCallbacker []struct { + // Ctx is the ctx argument value. + Ctx context.Context + // Hash is the hash argument value. + Hash *chainhash.Hash + } // Set holds details about calls to the Set method. Set []struct { // Ctx is the ctx argument value. @@ -198,6 +211,7 @@ type MetamorphStoreMock struct { lockGetBlockProcessed sync.RWMutex lockGetUnmined sync.RWMutex lockIsCentralised sync.RWMutex + lockRemoveCallbacker sync.RWMutex lockSet sync.RWMutex lockSetBlockProcessed sync.RWMutex lockSetUnlocked sync.RWMutex @@ -409,6 +423,42 @@ func (mock *MetamorphStoreMock) IsCentralisedCalls() []struct { return calls } +// RemoveCallbacker calls RemoveCallbackerFunc. +func (mock *MetamorphStoreMock) RemoveCallbacker(ctx context.Context, hash *chainhash.Hash) error { + if mock.RemoveCallbackerFunc == nil { + panic("MetamorphStoreMock.RemoveCallbackerFunc: method is nil but MetamorphStore.RemoveCallbacker was just called") + } + callInfo := struct { + Ctx context.Context + Hash *chainhash.Hash + }{ + Ctx: ctx, + Hash: hash, + } + mock.lockRemoveCallbacker.Lock() + mock.calls.RemoveCallbacker = append(mock.calls.RemoveCallbacker, callInfo) + mock.lockRemoveCallbacker.Unlock() + return mock.RemoveCallbackerFunc(ctx, hash) +} + +// RemoveCallbackerCalls gets all the calls that were made to RemoveCallbacker. +// Check the length with: +// +// len(mockedMetamorphStore.RemoveCallbackerCalls()) +func (mock *MetamorphStoreMock) RemoveCallbackerCalls() []struct { + Ctx context.Context + Hash *chainhash.Hash +} { + var calls []struct { + Ctx context.Context + Hash *chainhash.Hash + } + mock.lockRemoveCallbacker.RLock() + calls = mock.calls.RemoveCallbacker + mock.lockRemoveCallbacker.RUnlock() + return calls +} + // Set calls SetFunc. func (mock *MetamorphStoreMock) Set(ctx context.Context, key []byte, value *store.StoreData) error { if mock.SetFunc == nil { diff --git a/metamorph/processor.go b/metamorph/processor.go index 507093446..eea13e6ec 100644 --- a/metamorph/processor.go +++ b/metamorph/processor.go @@ -12,7 +12,6 @@ import ( "github.com/bitcoin-sv/arc/blocktx" "github.com/bitcoin-sv/arc/blocktx/blocktx_api" - "github.com/bitcoin-sv/arc/callbacker/callbacker_api" "github.com/bitcoin-sv/arc/metamorph/metamorph_api" "github.com/bitcoin-sv/arc/metamorph/processor_response" "github.com/bitcoin-sv/arc/metamorph/store" @@ -41,7 +40,6 @@ const ( type Processor struct { store store.MetamorphStore - cbChannel chan *callbacker_api.Callback ProcessorResponseMap *ProcessorResponseMap pm p2p.PeerManagerI btc blocktx.ClientI @@ -72,8 +70,7 @@ type Processor struct { type Option func(f *Processor) -func NewProcessor(s store.MetamorphStore, pm p2p.PeerManagerI, - cbChannel chan *callbacker_api.Callback, btc blocktx.ClientI, opts ...Option) (*Processor, error) { +func NewProcessor(s store.MetamorphStore, pm p2p.PeerManagerI, btc blocktx.ClientI, opts ...Option) (*Processor, error) { if s == nil { return nil, errors.New("store cannot be nil") } @@ -85,7 +82,6 @@ func NewProcessor(s store.MetamorphStore, pm p2p.PeerManagerI, p := &Processor{ startTime: time.Now().UTC(), store: s, - cbChannel: cbChannel, pm: pm, btc: btc, dataRetentionPeriod: dataRetentionPeriodDefault, @@ -151,9 +147,6 @@ func (p *Processor) Shutdown() { p.processExpiredSeenTxsTicker.Stop() p.processExpiredTxsTicker.Stop() p.ProcessorResponseMap.Close() - if p.cbChannel != nil { - close(p.cbChannel) - } } func (p *Processor) unlockItems() error { @@ -387,21 +380,8 @@ func (p *Processor) SendStatusMinedForTransaction(hash *chainhash.Hash, blockHas resp.Close() p.ProcessorResponseMap.Delete(hash) - if p.cbChannel != nil { - data, _ := p.store.Get(spanCtx, hash[:]) - - if data != nil && data.CallbackUrl != "" { - p.cbChannel <- &callbacker_api.Callback{ - Hash: data.Hash[:], - Url: data.CallbackUrl, - Token: data.CallbackToken, - Status: int32(data.Status), - BlockHash: data.BlockHash[:], - BlockHeight: data.BlockHeight, - } - } - } - + data, _ := p.store.Get(spanCtx, hash[:]) + go SendCallback(p.logger, p.store, data) }, }) diff --git a/metamorph/processor_test.go b/metamorph/processor_test.go index df4818772..a53588dbd 100644 --- a/metamorph/processor_test.go +++ b/metamorph/processor_test.go @@ -12,7 +12,6 @@ import ( "time" "github.com/bitcoin-sv/arc/blocktx/blocktx_api" - "github.com/bitcoin-sv/arc/callbacker/callbacker_api" . "github.com/bitcoin-sv/arc/metamorph" "github.com/bitcoin-sv/arc/metamorph/metamorph_api" . "github.com/bitcoin-sv/arc/metamorph/mocks" @@ -34,7 +33,13 @@ import ( func TestNewProcessor(t *testing.T) { mtmStore := &MetamorphStoreMock{ + GetFunc: func(ctx context.Context, key []byte) (*store.StoreData, error) { + return &store.StoreData{Hash: testdata.TX2Hash}, nil + }, SetUnlockedFunc: func(ctx context.Context, hashes []*chainhash.Hash) error { return nil }, + RemoveCallbackerFunc: func(ctx context.Context, hash *chainhash.Hash) error { + return nil + }, } pm := p2p.NewPeerManagerMock() @@ -72,7 +77,7 @@ func TestNewProcessor(t *testing.T) { for _, tc := range tt { t.Run(tc.name, func(t *testing.T) { - processor, err := NewProcessor(tc.store, tc.pm, nil, nil, + processor, err := NewProcessor(tc.store, tc.pm, nil, WithCacheExpiryTime(time.Second*5), WithProcessExpiredSeenTxsInterval(time.Second*5), WithProcessorLogger(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: LogLevelDefault}))), @@ -287,9 +292,12 @@ func TestLoadUnmined(t *testing.T) { IsCentralisedFunc: func() bool { return tc.isCentralised }, + RemoveCallbackerFunc: func(ctx context.Context, hash *chainhash.Hash) error { + return nil + }, } - processor, err := NewProcessor(mtmStore, pm, nil, btxMock, + processor, err := NewProcessor(mtmStore, pm, btxMock, WithProcessExpiredSeenTxsInterval(time.Hour*24), WithCacheExpiryTime(time.Hour*24), WithNow(func() time.Time { @@ -324,7 +332,7 @@ func TestProcessTransaction(t *testing.T) { pm := p2p.NewPeerManagerMock() - processor, err := NewProcessor(s, pm, nil, nil) + processor, err := NewProcessor(s, pm, nil) require.NoError(t, err) assert.Equal(t, 0, processor.ProcessorResponseMap.Len()) @@ -377,7 +385,7 @@ func Benchmark_ProcessTransaction(b *testing.B) { pm := p2p.NewPeerManagerMock() - processor, err := NewProcessor(s, pm, nil, nil) + processor, err := NewProcessor(s, pm, nil) require.NoError(b, err) assert.Equal(b, 0, processor.ProcessorResponseMap.Len()) @@ -513,7 +521,7 @@ func TestSendStatusForTransaction(t *testing.T) { pm := p2p.NewPeerManagerMock() - processor, err := NewProcessor(metamorphStore, pm, nil, nil, WithNow(func() time.Time { + processor, err := NewProcessor(metamorphStore, pm, nil, WithNow(func() time.Time { return time.Date(2023, 10, 1, 13, 0, 0, 0, time.UTC) })) require.NoError(t, err) @@ -561,7 +569,7 @@ func TestSendStatusMinedForTransaction(t *testing.T) { pm := p2p.NewPeerManagerMock() - processor, err := NewProcessor(s, pm, nil, nil) + processor, err := NewProcessor(s, pm, nil) require.NoError(t, err) processor.ProcessorResponseMap.Set(testdata.TX1Hash, processor_response.NewProcessorResponseWithStatus( testdata.TX1Hash, @@ -580,43 +588,43 @@ func TestSendStatusMinedForTransaction(t *testing.T) { assert.Equal(t, metamorph_api.Status_MINED, txStored.Status) }) - t.Run("SendStatusMinedForTransaction callback", func(t *testing.T) { - s, err := sqlite.New(true, "") - require.NoError(t, err) - setStoreTestData(t, s) - - pm := p2p.NewPeerManagerMock() - - var wg sync.WaitGroup - callbackCh := make(chan *callbacker_api.Callback) - wg.Add(1) - go func() { - for cb := range callbackCh { - assert.Equal(t, metamorph_api.Status_MINED, metamorph_api.Status(cb.Status)) - assert.Equal(t, testdata.TX1Hash.CloneBytes(), cb.Hash) - assert.Equal(t, testdata.Block1Hash[:], cb.BlockHash) - assert.Equal(t, uint64(1233), cb.BlockHeight) - assert.Equal(t, "https://test.com", cb.Url) - assert.Equal(t, "token", cb.Token) - wg.Done() - } - }() - - processor, err := NewProcessor(s, pm, callbackCh, nil) - require.NoError(t, err) - // add the tx to the map - processor.ProcessorResponseMap.Set(testdata.TX1Hash, processor_response.NewProcessorResponseWithStatus( - testdata.TX1Hash, - metamorph_api.Status_SEEN_ON_NETWORK, - )) - - ok, sendErr := processor.SendStatusMinedForTransaction(testdata.TX1Hash, testdata.Block1Hash, 1233) - time.Sleep(100 * time.Millisecond) - assert.True(t, ok) - assert.NoError(t, sendErr) - - wg.Wait() - }) + // t.Run("SendStatusMinedForTransaction callback", func(t *testing.T) { + // s, err := metamorphSql.New("sqlite_memory") + // require.NoError(t, err) + // setStoreTestData(t, s) + + // pm := p2p.NewPeerManagerMock() + + // var wg sync.WaitGroup + // callbackCh := make(chan *callbacker_api.Callback) + // wg.Add(1) + // go func() { + // for cb := range callbackCh { + // assert.Equal(t, metamorph_api.Status_MINED, metamorph_api.Status(cb.Status)) + // assert.Equal(t, testdata.TX1Hash.CloneBytes(), cb.Hash) + // assert.Equal(t, testdata.Block1Hash[:], cb.BlockHash) + // assert.Equal(t, uint64(1233), cb.BlockHeight) + // assert.Equal(t, "https://test.com", cb.Url) + // assert.Equal(t, "token", cb.Token) + // wg.Done() + // } + // }() + + // processor, err := NewProcessor(s, pm, callbackCh, nil) + // require.NoError(t, err) + // // add the tx to the map + // processor.ProcessorResponseMap.Set(testdata.TX1Hash, processor_response.NewProcessorResponseWithStatus( + // testdata.TX1Hash, + // metamorph_api.Status_SEEN_ON_NETWORK, + // )) + + // ok, sendErr := processor.SendStatusMinedForTransaction(testdata.TX1Hash, testdata.Block1Hash, 1233) + // time.Sleep(100 * time.Millisecond) + // assert.True(t, ok) + // assert.NoError(t, sendErr) + + // wg.Wait() + // }) t.Run("SendStatusForTransaction known tx - processed", func(t *testing.T) { s, err := sqlite.New(true, "") @@ -624,7 +632,7 @@ func TestSendStatusMinedForTransaction(t *testing.T) { pm := p2p.NewPeerManagerMock() - processor, err := NewProcessor(s, pm, nil, nil) + processor, err := NewProcessor(s, pm, nil) require.NoError(t, err) assert.Equal(t, 0, processor.ProcessorResponseMap.Len()) @@ -677,7 +685,7 @@ func BenchmarkProcessTransaction(b *testing.B) { }() pm := p2p.NewPeerManagerMock() - processor, err := NewProcessor(s, pm, nil, nil) + processor, err := NewProcessor(s, pm, nil) require.NoError(b, err) assert.Equal(b, 0, processor.ProcessorResponseMap.Len()) @@ -783,6 +791,9 @@ func TestProcessExpiredSeenTransactions(t *testing.T) { t.Run(tc.name, func(t *testing.T) { metamorphStore := &MetamorphStoreMock{ + GetFunc: func(ctx context.Context, key []byte) (*store.StoreData, error) { + return &store.StoreData{Hash: testdata.TX2Hash}, nil + }, UpdateMinedFunc: func(ctx context.Context, hash *chainhash.Hash, blockHash *chainhash.Hash, blockHeight uint64) error { require.Condition(t, func() (success bool) { oneOfHash := hash.IsEqual(testdata.TX1Hash) || hash.IsEqual(testdata.TX2Hash) || hash.IsEqual(testdata.TX3Hash) @@ -794,6 +805,9 @@ func TestProcessExpiredSeenTransactions(t *testing.T) { return tc.updateMinedErr }, SetUnlockedFunc: func(ctx context.Context, hashes []*chainhash.Hash) error { return nil }, + RemoveCallbackerFunc: func(ctx context.Context, hash *chainhash.Hash) error { + return nil + }, } btxMock := &ClientIMock{ GetTransactionBlocksFunc: func(ctx context.Context, transaction *blocktx_api.Transactions) (*blocktx_api.TransactionBlocks, error) { @@ -804,7 +818,7 @@ func TestProcessExpiredSeenTransactions(t *testing.T) { } pm := p2p.NewPeerManagerMock() - processor, err := NewProcessor(metamorphStore, pm, nil, btxMock, + processor, err := NewProcessor(metamorphStore, pm, btxMock, WithProcessExpiredSeenTxsInterval(20*time.Millisecond), WithProcessExpiredTxsInterval(time.Hour), ) @@ -842,9 +856,17 @@ func TestProcessExpiredTransactions(t *testing.T) { for _, tc := range tt { t.Run(tc.name, func(t *testing.T) { - metamorphStore := &MetamorphStoreMock{SetUnlockedFunc: func(ctx context.Context, hashes []*chainhash.Hash) error { return nil }} + metamorphStore := &MetamorphStoreMock{ + GetFunc: func(ctx context.Context, key []byte) (*store.StoreData, error) { + return &store.StoreData{Hash: testdata.TX2Hash}, nil + }, + SetUnlockedFunc: func(ctx context.Context, hashes []*chainhash.Hash) error { return nil }, + RemoveCallbackerFunc: func(ctx context.Context, hash *chainhash.Hash) error { + return nil + }, + } pm := p2p.NewPeerManagerMock() - processor, err := NewProcessor(metamorphStore, pm, nil, nil, + processor, err := NewProcessor(metamorphStore, pm, nil, WithProcessExpiredSeenTxsInterval(time.Hour), WithProcessExpiredTxsInterval(time.Millisecond*20), WithNow(func() time.Time { diff --git a/metamorph/server_test.go b/metamorph/server_test.go index 50708bf0f..293e0dbb4 100644 --- a/metamorph/server_test.go +++ b/metamorph/server_test.go @@ -361,6 +361,9 @@ func TestServer_GetTransactionStatus(t *testing.T) { } return data, tt.getErr }, + RemoveCallbackerFunc: func(ctx context.Context, hash *chainhash.Hash) error { + return nil + }, } server := NewServer(metamorphStore, nil, client, source) @@ -616,6 +619,9 @@ func TestPutTransactions(t *testing.T) { return nil, tc.getErr }, + RemoveCallbackerFunc: func(ctx context.Context, hash *chainhash.Hash) error { + return nil + }, } btc := &ClientIMock{ @@ -685,9 +691,15 @@ func TestSetUnlockedbyName(t *testing.T) { for _, tc := range tt { t.Run(tc.name, func(t *testing.T) { metamorphStore := &MetamorphStoreMock{ + GetFunc: func(ctx context.Context, key []byte) (*store.StoreData, error) { + return &store.StoreData{}, nil + }, SetUnlockedByNameFunc: func(ctx context.Context, lockedBy string) (int, error) { return tc.recordsAffected, tc.errSetUnlocked }, + RemoveCallbackerFunc: func(ctx context.Context, hash *chainhash.Hash) error { + return nil + }, } server := NewServer(metamorphStore, nil, nil, source) @@ -718,7 +730,15 @@ func TestStartGRPCServer(t *testing.T) { for _, tc := range tt { t.Run(tc.name, func(t *testing.T) { - metamorphStore := &MetamorphStoreMock{SetUnlockedFunc: func(ctx context.Context, hashes []*chainhash.Hash) error { return nil }} + metamorphStore := &MetamorphStoreMock{ + GetFunc: func(ctx context.Context, key []byte) (*store.StoreData, error) { + return &store.StoreData{}, nil + }, + SetUnlockedFunc: func(ctx context.Context, hashes []*chainhash.Hash) error { return nil }, + RemoveCallbackerFunc: func(ctx context.Context, hash *chainhash.Hash) error { + return nil + }, + } btc := &ClientIMock{} @@ -783,7 +803,14 @@ func TestCheckUtxos(t *testing.T) { for _, tc := range tt { t.Run(tc.name, func(t *testing.T) { - metamorphStore := &MetamorphStoreMock{} + metamorphStore := &MetamorphStoreMock{ + GetFunc: func(ctx context.Context, key []byte) (*store.StoreData, error) { + return &store.StoreData{}, nil + }, + RemoveCallbackerFunc: func(ctx context.Context, hash *chainhash.Hash) error { + return nil + }, + } btc := &ClientIMock{} diff --git a/metamorph/store/Interface.go b/metamorph/store/Interface.go index cce2056e4..7759699e0 100644 --- a/metamorph/store/Interface.go +++ b/metamorph/store/Interface.go @@ -213,6 +213,7 @@ type MetamorphStore interface { SetUnlockedByName(ctx context.Context, lockedBy string) (int, error) GetUnmined(_ context.Context, callback func(s *StoreData)) error UpdateStatus(ctx context.Context, hash *chainhash.Hash, status metamorph_api.Status, rejectReason string) error + RemoveCallbacker(ctx context.Context, hash *chainhash.Hash) error UpdateMined(ctx context.Context, hash *chainhash.Hash, blockHash *chainhash.Hash, blockHeight uint64) error Close(ctx context.Context) error GetBlockProcessed(ctx context.Context, blockHash *chainhash.Hash) (*time.Time, error) diff --git a/metamorph/store/badger/badger.go b/metamorph/store/badger/badger.go index 8592e3154..a06299823 100644 --- a/metamorph/store/badger/badger.go +++ b/metamorph/store/badger/badger.go @@ -213,6 +213,36 @@ func (s *Badger) UpdateStatus(ctx context.Context, hash *chainhash.Hash, status return nil } +func (s *Badger) RemoveCallbacker(ctx context.Context, hash *chainhash.Hash) error { + start := gocore.CurrentNanos() + defer func() { + gocore.NewStat("mtm_store_badger").NewStat("RemoveCallbacker").AddTime(start) + }() + span, _ := opentracing.StartSpanFromContext(ctx, "badger:RemoveCallbacker") + defer span.Finish() + + // we need a lock here since we are doing 2 operations that need to be atomic + s.mu.Lock() + defer s.mu.Unlock() + + tx, err := s.Get(ctx, hash[:]) + if err != nil { + span.SetTag(string(ext.Error), true) + span.LogFields(log.Error(err)) + return err + } + + tx.CallbackUrl = "" + + if err = s.Set(ctx, hash[:], tx); err != nil { + span.SetTag(string(ext.Error), true) + span.LogFields(log.Error(err)) + return fmt.Errorf("failed to update data: %w", err) + } + + return nil +} + // UpdateMined updates the transaction to mined func (s *Badger) UpdateMined(ctx context.Context, hash *chainhash.Hash, blockHash *chainhash.Hash, blockHeight uint64) error { start := gocore.CurrentNanos() diff --git a/metamorph/store/dynamodb/dynamodb.go b/metamorph/store/dynamodb/dynamodb.go index 1b5bf9364..7d430d369 100644 --- a/metamorph/store/dynamodb/dynamodb.go +++ b/metamorph/store/dynamodb/dynamodb.go @@ -509,6 +509,35 @@ func (ddb *DynamoDB) UpdateStatus(ctx context.Context, hash *chainhash.Hash, sta return nil } +func (ddb *DynamoDB) RemoveCallbacker(ctx context.Context, hash *chainhash.Hash) error { + // setup log and tracing + startNanos := ddb.now().UnixNano() + defer func() { + gocore.NewStat("mtm_store_sql").NewStat("RemoveCallbacker").AddTime(startNanos) + }() + span, _ := opentracing.StartSpanFromContext(ctx, "sql:RemoveCallbacker") + defer span.Finish() + + updateExpression := "SET callback_url = ''" + + // update tx + _, err := ddb.client.UpdateItem(ctx, &dynamodb.UpdateItemInput{ + TableName: aws.String(ddb.transactionsTableName), + Key: map[string]types.AttributeValue{ + "tx_hash": &types.AttributeValueMemberB{Value: hash.CloneBytes()}, + }, + UpdateExpression: aws.String(updateExpression), + }) + + if err != nil { + span.SetTag(string(ext.Error), true) + span.LogFields(log.Error(err)) + return err + } + + return nil +} + func (ddb *DynamoDB) UpdateMined(ctx context.Context, hash *chainhash.Hash, blockHash *chainhash.Hash, blockHeight uint64) error { // setup log and tracing startNanos := ddb.now().UnixNano() diff --git a/metamorph/store/postgresql/postgres.go b/metamorph/store/postgresql/postgres.go index 649b61617..86b58b44d 100644 --- a/metamorph/store/postgresql/postgres.go +++ b/metamorph/store/postgresql/postgres.go @@ -533,6 +533,26 @@ func (p *PostgreSQL) UpdateMined(ctx context.Context, hash *chainhash.Hash, bloc return err } +func (p *PostgreSQL) RemoveCallbacker(ctx context.Context, hash *chainhash.Hash) error { + startNanos := p.now().UnixNano() + defer func() { + gocore.NewStat("mtm_store_sql").NewStat("RemoveCallbacker").AddTime(startNanos) + }() + span, _ := opentracing.StartSpanFromContext(ctx, "sql:RemoveCallbacker") + defer span.Finish() + + q := `UPDATE metamorph.transactions SET callback_url = '' WHERE hash = $5;` + + _, err := p.db.ExecContext(ctx, q, metamorph_api.Status_MINED, p.now().Format(time.RFC3339), hash[:]) + + if err != nil { + span.SetTag(string(ext.Error), true) + span.LogFields(log.Error(err)) + } + + return err +} + func (p *PostgreSQL) GetBlockProcessed(ctx context.Context, blockHash *chainhash.Hash) (*time.Time, error) { startNanos := p.now().UnixNano() defer func() { diff --git a/metamorph/store/sqlite/sqlite.go b/metamorph/store/sqlite/sqlite.go index b1c41ef27..ab01232b9 100644 --- a/metamorph/store/sqlite/sqlite.go +++ b/metamorph/store/sqlite/sqlite.go @@ -125,6 +125,37 @@ func (s *SqLite) IsCentralised() bool { return false } +func (s *SqLite) RemoveCallbacker(ctx context.Context, hash *chainhash.Hash) error { + startNanos := s.now().UnixNano() + defer func() { + gocore.NewStat("mtm_store_sql").NewStat("RemoveCallbacker").AddTime(startNanos) + }() + span, _ := opentracing.StartSpanFromContext(ctx, "sql:RemoveCallbacker") + defer span.Finish() + + q := `UPDATE transactions SET status = callback_url = '' WHERE hash = $3;` + + result, err := s.db.ExecContext(ctx, q, hash[:]) + if err != nil { + span.SetTag(string(ext.Error), true) + span.LogFields(log.Error(err)) + return err + } + + var n int64 + n, err = result.RowsAffected() + if err != nil { + span.SetTag(string(ext.Error), true) + span.LogFields(log.Error(err)) + return err + } + if n == 0 { + return store.ErrNotFound + } + + return nil +} + func (s *SqLite) SetUnlocked(ctx context.Context, hashes []*chainhash.Hash) error { return nil } diff --git a/test/arc_txt_endpoint_test.go b/test/arc_txt_endpoint_test.go index 430f9aabb..a826cdf28 100644 --- a/test/arc_txt_endpoint_test.go +++ b/test/arc_txt_endpoint_test.go @@ -138,7 +138,7 @@ func TestBatchChainedTxs(t *testing.T) { utxos := getUtxos(t, address) require.True(t, len(utxos) > 0, "No UTXOs available for the address") - txs, err := createTxChain(privateKey, utxos[0], 100) + txs, err := createTxChain(privateKey, utxos[0], 50) require.NoError(t, err) arcBody := make([]api.TransactionRequest, len(txs))