Skip to content

Commit

Permalink
refactor: Metamorph processor uses message queue client subscribed to…
Browse files Browse the repository at this point in the history
… callback topic (#715)
  • Loading branch information
boecklim authored Dec 19, 2024
1 parent 52182a0 commit 1477947
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 33 deletions.
16 changes: 14 additions & 2 deletions internal/metamorph/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,13 @@ func (p *Processor) updateMined(ctx context.Context, txsBlocks []*blocktx_api.Tr

for _, data := range updatedData {
if len(data.Callbacks) > 0 {
p.callbackSender.SendCallback(ctx, data)
requests := toSendRequest(data)
for _, request := range requests {
err = p.mqClient.PublishMarshal(ctx, CallbackTopic, request)
if err != nil {
p.logger.Error("failed to publish callback", slog.String("err", err.Error()))
}
}
}

p.delTxFromCache(data.Hash)
Expand Down Expand Up @@ -557,7 +563,13 @@ func (p *Processor) statusUpdateWithCallback(ctx context.Context, statusUpdates,
}

if sendCallback && len(data.Callbacks) > 0 {
p.callbackSender.SendCallback(ctx, data)
requests := toSendRequest(data)
for _, request := range requests {
err = p.mqClient.PublishMarshal(ctx, CallbackTopic, request)
if err != nil {
p.logger.Error("failed to publish callback", slog.String("err", err.Error()))
}
}
}
}
return nil
Expand Down
59 changes: 59 additions & 0 deletions internal/metamorph/processor_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"log/slog"

"github.com/bitcoin-sv/arc/internal/cache"
"github.com/bitcoin-sv/arc/internal/callbacker/callbacker_api"
"github.com/bitcoin-sv/arc/internal/metamorph/metamorph_api"
"github.com/bitcoin-sv/arc/internal/metamorph/store"
)

Expand Down Expand Up @@ -181,3 +183,60 @@ func filterUpdates(all []store.UpdateStatus, processed []*store.Data) []store.Up
}
return unprocessed
}

func toSendRequest(d *store.Data) []*callbacker_api.SendRequest {
if len(d.Callbacks) == 0 {
return nil
}

requests := make([]*callbacker_api.SendRequest, 0, len(d.Callbacks))
for _, c := range d.Callbacks {
if c.CallbackURL != "" {
routing := &callbacker_api.CallbackRouting{
Url: c.CallbackURL,
Token: c.CallbackToken,
AllowBatch: c.AllowBatch,
}

request := &callbacker_api.SendRequest{
CallbackRouting: routing,

Txid: d.Hash.String(),
Status: callbacker_api.Status(d.Status),
MerklePath: d.MerklePath,
ExtraInfo: getCallbackExtraInfo(d),
CompetingTxs: getCallbackCompetitingTxs(d),

BlockHash: getCallbackBlockHash(d),
BlockHeight: d.BlockHeight,
}
requests = append(requests, request)
}
}

return requests
}

func getCallbackExtraInfo(d *store.Data) string {
if d.Status == metamorph_api.Status_MINED && len(d.CompetingTxs) > 0 {
return minedDoubleSpendMsg
}

return d.RejectReason
}

func getCallbackCompetitingTxs(d *store.Data) []string {
if d.Status == metamorph_api.Status_MINED {
return nil
}

return d.CompetingTxs
}

func getCallbackBlockHash(d *store.Data) string {
if d.BlockHash == nil {
return ""
}

return d.BlockHash.String()
}
61 changes: 30 additions & 31 deletions internal/metamorph/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (

"github.com/libsv/go-p2p"
"github.com/libsv/go-p2p/chaincfg/chainhash"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"

"github.com/bitcoin-sv/arc/internal/blocktx/blocktx_api"
"github.com/bitcoin-sv/arc/internal/cache"
Expand Down Expand Up @@ -493,7 +493,6 @@ func TestStartSendStatusForTransaction(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
// given
counter := 0
callbackSent := make(chan struct{}, tc.expectedCallbacks)

metamorphStore := &storeMocks.MetamorphStoreMock{
GetFunc: func(_ context.Context, _ []byte) (*store.Data, error) {
Expand Down Expand Up @@ -532,22 +531,31 @@ func TestStartSendStatusForTransaction(t *testing.T) {
}
}

callbackSender := &mocks.CallbackSenderMock{
SendCallbackFunc: func(_ context.Context, _ *store.Data) {
callbackSent <- struct{}{}
statusMessageChannel := make(chan *metamorph.TxStatusMessage, 10)

mqClient := &mocks.MessageQueueMock{
PublishMarshalFunc: func(_ context.Context, _ string, _ protoreflect.ProtoMessage) error {
return nil
},
}

statusMessageChannel := make(chan *metamorph.TxStatusMessage, 10)

sut, err := metamorph.NewProcessor(metamorphStore, cStore, pm, statusMessageChannel, metamorph.WithNow(func() time.Time { return time.Date(2023, 10, 1, 13, 0, 0, 0, time.UTC) }), metamorph.WithProcessStatusUpdatesInterval(200*time.Millisecond), metamorph.WithProcessStatusUpdatesBatchSize(3), metamorph.WithCallbackSender(callbackSender))
sut, err := metamorph.NewProcessor(
metamorphStore,
cStore,
pm,
statusMessageChannel,
metamorph.WithNow(func() time.Time { return time.Date(2023, 10, 1, 13, 0, 0, 0, time.UTC) }),
metamorph.WithProcessStatusUpdatesInterval(200*time.Millisecond),
metamorph.WithProcessStatusUpdatesBatchSize(3),
metamorph.WithMessageQueueClient(mqClient),
)
require.NoError(t, err)

// when
sut.StartProcessStatusUpdatesInStorage()
sut.StartSendStatusUpdate()

assert.Equal(t, 0, sut.GetProcessorMapSize())
require.Equal(t, 0, sut.GetProcessorMapSize())
for _, testInput := range tc.inputs {
statusMessageChannel <- &metamorph.TxStatusMessage{
Hash: testInput.hash,
Expand All @@ -557,25 +565,12 @@ func TestStartSendStatusForTransaction(t *testing.T) {
}
}

timeoutTimer := time.NewTimer(time.Second * 5)
callbackCounter := 0
if tc.expectedCallbacks > 0 {
select {
case <-callbackSent:
callbackCounter++
if callbackCounter == tc.expectedCallbacks {
break
}
case <-timeoutTimer.C:
t.Fatal("expected callbacks never sent")
}
}
time.Sleep(time.Millisecond * 300)

// then
assert.Equal(t, tc.expectedUpdateStatusCalls, len(metamorphStore.UpdateStatusBulkCalls()))
assert.Equal(t, tc.expectedDoubleSpendCalls, len(metamorphStore.UpdateDoubleSpendCalls()))
assert.Equal(t, tc.expectedCallbacks, len(callbackSender.SendCallbackCalls()))
require.Equal(t, tc.expectedUpdateStatusCalls, len(metamorphStore.UpdateStatusBulkCalls()))
require.Equal(t, tc.expectedDoubleSpendCalls, len(metamorphStore.UpdateDoubleSpendCalls()))
require.Equal(t, tc.expectedCallbacks, len(mqClient.PublishMarshalCalls()))
sut.Shutdown()
})
}
Expand Down Expand Up @@ -903,28 +898,32 @@ func TestStartProcessMinedCallbacks(t *testing.T) {
require.Len(t, txsBlocks, tc.expectedTxsBlocks)

return []*store.Data{
{Hash: testdata.TX1Hash, Callbacks: []store.Callback{{CallbackURL: "http://callback.com"}}},
{Hash: testdata.TX1Hash, Callbacks: []store.Callback{{CallbackURL: "http://callback.com"}}},
{Hash: testdata.TX1Hash, Callbacks: []store.Callback{{CallbackURL: "https://callback.com"}}},
{Hash: testdata.TX2Hash, Callbacks: []store.Callback{{CallbackURL: "https://callback.com"}}},
{Hash: testdata.TX1Hash},
}, tc.updateMinedErr
},
SetUnlockedByNameFunc: func(_ context.Context, _ string) (int64, error) { return 0, nil },
}
pm := &mocks.PeerManagerMock{ShutdownFunc: func() {}}
minedTxsChan := make(chan *blocktx_api.TransactionBlock, 5)
callbackSender := &mocks.CallbackSenderMock{
SendCallbackFunc: func(_ context.Context, _ *store.Data) {},

mqClient := &mocks.MessageQueueMock{
PublishMarshalFunc: func(_ context.Context, _ string, _ protoreflect.ProtoMessage) error {
return nil
},
}

cStore := cache.NewMemoryStore()
sut, err := metamorph.NewProcessor(
metamorphStore,
cStore,
pm,
nil,
metamorph.WithMinedTxsChan(minedTxsChan),
metamorph.WithCallbackSender(callbackSender),
metamorph.WithProcessMinedBatchSize(tc.processMinedBatchSize),
metamorph.WithProcessMinedInterval(tc.processMinedInterval),
metamorph.WithMessageQueueClient(mqClient),
)
require.NoError(t, err)

Expand All @@ -941,7 +940,7 @@ func TestStartProcessMinedCallbacks(t *testing.T) {
sut.Shutdown()

// then
require.Equal(t, tc.expectedSendCallbackCalls, len(callbackSender.SendCallbackCalls()))
require.Equal(t, tc.expectedSendCallbackCalls, len(mqClient.PublishMarshalCalls()))
})
}
}
Expand Down

0 comments on commit 1477947

Please sign in to comment.