Skip to content

Commit

Permalink
ARCO-184: read and process callbacks from db on callbacker start
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadiuszos4chain committed Sep 2, 2024
1 parent ef04e5a commit 0fc2285
Show file tree
Hide file tree
Showing 7 changed files with 606 additions and 2 deletions.
8 changes: 8 additions & 0 deletions cmd/arc/services/callbacker.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ func StartCallbacker(logger *slog.Logger, appConfig *config.ArcConfig) (func(),
}

dispatcher = callbacker.NewCallbackDispatcher(sender, store, config.Pause)
logger.Info("Init callback dispatcher, add to processing abandoned callbacks")
err = dispatcher.Init()
if err != nil {
stopFn()
return nil, fmt.Errorf("failed to init callback dispatcher, couldn't process all abandoned callbacks: %v", err)
}

server = callbacker.NewServer(dispatcher, callbacker.WithLogger(logger.With(slog.String("module", "server"))))
err = server.Serve(config.ListenAddr, appConfig.GrpcMessageSize, appConfig.PrometheusEndpoint)
if err != nil {
Expand All @@ -91,6 +98,7 @@ func StartCallbacker(logger *slog.Logger, appConfig *config.ArcConfig) (func(),
stopFn()
return nil, fmt.Errorf("failed to start health server: %v", err)
}

logger.Info("Ready to work")
return stopFn, nil
}
Expand Down
2 changes: 1 addition & 1 deletion internal/callbacker/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (d *CallbackDispatcher) Init() error {

for {
callbacks, err := d.s.PopMany(ctx, bundleSize)
if err != nil || len(callbacks) > 0 {
if err != nil || len(callbacks) == 0 {
return err
}

Expand Down
52 changes: 52 additions & 0 deletions internal/callbacker/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package callbacker
import (
"context"
"fmt"
"math"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -91,6 +92,57 @@ func Test_CallbackDispatcher(t *testing.T) {
}
}

func Test_CallbackDispatcher_Init(t *testing.T) {
tcs := []struct {
name string
danglingCallbacksNum int
}{
{
name: "no dangling callbacks",
danglingCallbacksNum: 0,
},
{
name: "callbacks to process on init",
danglingCallbacksNum: 259,
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
// given
var danglingCallbacks []*store.CallbackData
for range tc.danglingCallbacksNum {
danglingCallbacks = append(danglingCallbacks, &store.CallbackData{})
}

cMq := &CallbackerIMock{
SendFunc: func(url, token string, callback *Callback) {},
}

sMq := &mocks.CallbackerStoreMock{
PopManyFunc: func(ctx context.Context, limit int) ([]*store.CallbackData, error) {
limit = int(math.Min(float64(len(danglingCallbacks)), float64(limit)))

r := danglingCallbacks[:limit]
danglingCallbacks = danglingCallbacks[limit:]

return r, nil
},
}

sut := NewCallbackDispatcher(cMq, sMq, 0)

// when
err := sut.Init()
time.Sleep(50 * time.Millisecond)

// then
require.NoError(t, err)
require.Equal(t, tc.danglingCallbacksNum, len(cMq.SendCalls()))
})
}
}

func Test_sendManager(t *testing.T) {
tcs := []struct {
name string
Expand Down
Loading

0 comments on commit 0fc2285

Please sign in to comment.