diff --git a/internal/metamorph/processor.go b/internal/metamorph/processor.go index cada062f0..dea8596b5 100644 --- a/internal/metamorph/processor.go +++ b/internal/metamorph/processor.go @@ -316,7 +316,13 @@ func (p *Processor) updateMined(ctx context.Context, txsBlocks []*blocktx_api.Tr for _, data := range updatedData { if len(data.Callbacks) > 0 { - p.callbackSender.SendCallback(ctx, data) + requests := toSendRequest(data) + for _, request := range requests { + err = p.mqClient.PublishMarshal(ctx, CallbackTopic, request) + if err != nil { + p.logger.Error("failed to publish callback", slog.String("err", err.Error())) + } + } } p.delTxFromCache(data.Hash) @@ -557,7 +563,13 @@ func (p *Processor) statusUpdateWithCallback(ctx context.Context, statusUpdates, } if sendCallback && len(data.Callbacks) > 0 { - p.callbackSender.SendCallback(ctx, data) + requests := toSendRequest(data) + for _, request := range requests { + err = p.mqClient.PublishMarshal(ctx, CallbackTopic, request) + if err != nil { + p.logger.Error("failed to publish callback", slog.String("err", err.Error())) + } + } } } return nil diff --git a/internal/metamorph/processor_helpers.go b/internal/metamorph/processor_helpers.go index 741fee192..f3e68149a 100644 --- a/internal/metamorph/processor_helpers.go +++ b/internal/metamorph/processor_helpers.go @@ -7,6 +7,8 @@ import ( "log/slog" "github.com/bitcoin-sv/arc/internal/cache" + "github.com/bitcoin-sv/arc/internal/callbacker/callbacker_api" + "github.com/bitcoin-sv/arc/internal/metamorph/metamorph_api" "github.com/bitcoin-sv/arc/internal/metamorph/store" ) @@ -181,3 +183,60 @@ func filterUpdates(all []store.UpdateStatus, processed []*store.Data) []store.Up } return unprocessed } + +func toSendRequest(d *store.Data) []*callbacker_api.SendRequest { + if len(d.Callbacks) == 0 { + return nil + } + + requests := make([]*callbacker_api.SendRequest, 0, len(d.Callbacks)) + for _, c := range d.Callbacks { + if c.CallbackURL != "" { + routing := &callbacker_api.CallbackRouting{ + Url: c.CallbackURL, + Token: c.CallbackToken, + AllowBatch: c.AllowBatch, + } + + request := &callbacker_api.SendRequest{ + CallbackRouting: routing, + + Txid: d.Hash.String(), + Status: callbacker_api.Status(d.Status), + MerklePath: d.MerklePath, + ExtraInfo: getCallbackExtraInfo(d), + CompetingTxs: getCallbackCompetitingTxs(d), + + BlockHash: getCallbackBlockHash(d), + BlockHeight: d.BlockHeight, + } + requests = append(requests, request) + } + } + + return requests +} + +func getCallbackExtraInfo(d *store.Data) string { + if d.Status == metamorph_api.Status_MINED && len(d.CompetingTxs) > 0 { + return minedDoubleSpendMsg + } + + return d.RejectReason +} + +func getCallbackCompetitingTxs(d *store.Data) []string { + if d.Status == metamorph_api.Status_MINED { + return nil + } + + return d.CompetingTxs +} + +func getCallbackBlockHash(d *store.Data) string { + if d.BlockHash == nil { + return "" + } + + return d.BlockHash.String() +} diff --git a/internal/metamorph/processor_test.go b/internal/metamorph/processor_test.go index 8f543028b..925ec6844 100644 --- a/internal/metamorph/processor_test.go +++ b/internal/metamorph/processor_test.go @@ -13,9 +13,9 @@ import ( "github.com/libsv/go-p2p" "github.com/libsv/go-p2p/chaincfg/chainhash" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/reflect/protoreflect" "github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api" "github.com/bitcoin-sv/arc/internal/cache" @@ -493,7 +493,6 @@ func TestStartSendStatusForTransaction(t *testing.T) { t.Run(tc.name, func(t *testing.T) { // given counter := 0 - callbackSent := make(chan struct{}, tc.expectedCallbacks) metamorphStore := &storeMocks.MetamorphStoreMock{ GetFunc: func(_ context.Context, _ []byte) (*store.Data, error) { @@ -532,22 +531,31 @@ func TestStartSendStatusForTransaction(t *testing.T) { } } - callbackSender := &mocks.CallbackSenderMock{ - SendCallbackFunc: func(_ context.Context, _ *store.Data) { - callbackSent <- struct{}{} + statusMessageChannel := make(chan *metamorph.TxStatusMessage, 10) + + mqClient := &mocks.MessageQueueMock{ + PublishMarshalFunc: func(_ context.Context, _ string, _ protoreflect.ProtoMessage) error { + return nil }, } - statusMessageChannel := make(chan *metamorph.TxStatusMessage, 10) - - sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, statusMessageChannel, metamorph.WithNow(func() time.Time { return time.Date(2023, 10, 1, 13, 0, 0, 0, time.UTC) }), metamorph.WithProcessStatusUpdatesInterval(200*time.Millisecond), metamorph.WithProcessStatusUpdatesBatchSize(3), metamorph.WithCallbackSender(callbackSender)) + sut, err := metamorph.NewProcessor( + metamorphStore, + cStore, + pm, + statusMessageChannel, + metamorph.WithNow(func() time.Time { return time.Date(2023, 10, 1, 13, 0, 0, 0, time.UTC) }), + metamorph.WithProcessStatusUpdatesInterval(200*time.Millisecond), + metamorph.WithProcessStatusUpdatesBatchSize(3), + metamorph.WithMessageQueueClient(mqClient), + ) require.NoError(t, err) // when sut.StartProcessStatusUpdatesInStorage() sut.StartSendStatusUpdate() - assert.Equal(t, 0, sut.GetProcessorMapSize()) + require.Equal(t, 0, sut.GetProcessorMapSize()) for _, testInput := range tc.inputs { statusMessageChannel <- &metamorph.TxStatusMessage{ Hash: testInput.hash, @@ -557,25 +565,12 @@ func TestStartSendStatusForTransaction(t *testing.T) { } } - timeoutTimer := time.NewTimer(time.Second * 5) - callbackCounter := 0 - if tc.expectedCallbacks > 0 { - select { - case <-callbackSent: - callbackCounter++ - if callbackCounter == tc.expectedCallbacks { - break - } - case <-timeoutTimer.C: - t.Fatal("expected callbacks never sent") - } - } time.Sleep(time.Millisecond * 300) // then - assert.Equal(t, tc.expectedUpdateStatusCalls, len(metamorphStore.UpdateStatusBulkCalls())) - assert.Equal(t, tc.expectedDoubleSpendCalls, len(metamorphStore.UpdateDoubleSpendCalls())) - assert.Equal(t, tc.expectedCallbacks, len(callbackSender.SendCallbackCalls())) + require.Equal(t, tc.expectedUpdateStatusCalls, len(metamorphStore.UpdateStatusBulkCalls())) + require.Equal(t, tc.expectedDoubleSpendCalls, len(metamorphStore.UpdateDoubleSpendCalls())) + require.Equal(t, tc.expectedCallbacks, len(mqClient.PublishMarshalCalls())) sut.Shutdown() }) } @@ -903,8 +898,8 @@ func TestStartProcessMinedCallbacks(t *testing.T) { require.Len(t, txsBlocks, tc.expectedTxsBlocks) return []*store.Data{ - {Hash: testdata.TX1Hash, Callbacks: []store.Callback{{CallbackURL: "http://callback.com"}}}, - {Hash: testdata.TX1Hash, Callbacks: []store.Callback{{CallbackURL: "http://callback.com"}}}, + {Hash: testdata.TX1Hash, Callbacks: []store.Callback{{CallbackURL: "https://callback.com"}}}, + {Hash: testdata.TX2Hash, Callbacks: []store.Callback{{CallbackURL: "https://callback.com"}}}, {Hash: testdata.TX1Hash}, }, tc.updateMinedErr }, @@ -912,9 +907,13 @@ func TestStartProcessMinedCallbacks(t *testing.T) { } pm := &mocks.PeerManagerMock{ShutdownFunc: func() {}} minedTxsChan := make(chan *blocktx_api.TransactionBlock, 5) - callbackSender := &mocks.CallbackSenderMock{ - SendCallbackFunc: func(_ context.Context, _ *store.Data) {}, + + mqClient := &mocks.MessageQueueMock{ + PublishMarshalFunc: func(_ context.Context, _ string, _ protoreflect.ProtoMessage) error { + return nil + }, } + cStore := cache.NewMemoryStore() sut, err := metamorph.NewProcessor( metamorphStore, @@ -922,9 +921,9 @@ func TestStartProcessMinedCallbacks(t *testing.T) { pm, nil, metamorph.WithMinedTxsChan(minedTxsChan), - metamorph.WithCallbackSender(callbackSender), metamorph.WithProcessMinedBatchSize(tc.processMinedBatchSize), metamorph.WithProcessMinedInterval(tc.processMinedInterval), + metamorph.WithMessageQueueClient(mqClient), ) require.NoError(t, err) @@ -941,7 +940,7 @@ func TestStartProcessMinedCallbacks(t *testing.T) { sut.Shutdown() // then - require.Equal(t, tc.expectedSendCallbackCalls, len(callbackSender.SendCallbackCalls())) + require.Equal(t, tc.expectedSendCallbackCalls, len(mqClient.PublishMarshalCalls())) }) } }