From 6ecfb84aad72a8bc387ea792f6656891b0059f31 Mon Sep 17 00:00:00 2001 From: shotasilagadzetaal <139438093+shotasilagadzetaal@users.noreply.github.com> Date: Thu, 5 Dec 2024 12:54:48 +0400 Subject: [PATCH] refactor: Callbacker logic with less strict URL policy (#685) --- CHANGELOG.md | 8 ++ cmd/arc/services/callbacker.go | 12 ++- config/config.go | 28 +++--- config/defaults.go | 19 ++-- config/example_config.yaml | 7 +- doc/README.md | 2 +- internal/callbacker/background_workers.go | 8 +- internal/callbacker/dispatcher.go | 21 ++--- internal/callbacker/dispatcher_test.go | 4 +- internal/callbacker/quarantine_policy.go | 44 ---------- internal/callbacker/quarantine_policy_test.go | 53 ----------- internal/callbacker/send_manager.go | 87 +++++++------------ internal/callbacker/send_manager_test.go | 64 ++++++-------- internal/callbacker/sender.go | 30 +++---- internal/callbacker/server_test.go | 3 +- .../callbacker/store/postgresql/postgres.go | 6 +- test/config/config.yaml | 3 + 17 files changed, 130 insertions(+), 269 deletions(-) delete mode 100644 internal/callbacker/quarantine_policy.go delete mode 100644 internal/callbacker/quarantine_policy_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 44de76785..1b725bce6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ All notable changes to this project will be documented in this file. The format ## Table of Contents - [Unreleased](#unreleased) +- [1.3.12](#1312---2024-12-05) - [1.3.2](#132---2024-10-30) - [1.3.0](#130---2024-08-21) - [1.2.0](#120---2024-08-13) @@ -19,6 +20,13 @@ All notable changes to this project will be documented in this file. The format ## [Unreleased] +## [1.3.12] - 2024-12-05 + +### Changed +- The Callbacker service handles unsuccessful callback attempts by delaying individual callbacks and retrying them again instead of putting an unsuccessful receiver in quarantine + +### Added + ## [1.3.2] - 2024-10-30 ### Changed diff --git a/cmd/arc/services/callbacker.go b/cmd/arc/services/callbacker.go index 257d9ae28..7f072bc07 100644 --- a/cmd/arc/services/callbacker.go +++ b/cmd/arc/services/callbacker.go @@ -12,7 +12,7 @@ Key components: - callback sender: responsible for sending callbacks - background tasks: - periodically cleans up old, unsent callbacks from storage - - periodically checks the storage for callbacks in quarantine (temporary ban) and re-attempts dispatch after the quarantine period + - periodically checks the storage for callbacks in delayed state (temporary ban) and re-attempts dispatch after the delay period - gRPC server with endpoints: - Health: provides a health check endpoint for the service - SendCallback: receives and processes new callback requests @@ -71,16 +71,14 @@ func StartCallbacker(logger *slog.Logger, appConfig *config.ArcConfig) (func(), } sendConfig := callbacker.SendConfig{ + Expiration: cfg.Expiration, Delay: cfg.Delay, + DelayDuration: cfg.DelayDuration, PauseAfterSingleModeSuccessfulSend: cfg.Pause, BatchSendInterval: cfg.BatchSendInterval, } - quarantineConfig := callbacker.QuarantineConfig{ - BaseDuration: cfg.QuarantinePolicy.BaseDuration, - PermQuarantineAfterDuration: cfg.QuarantinePolicy.PermQuarantineAfter, - } - dispatcher = callbacker.NewCallbackDispatcher(sender, callbackerStore, logger, &sendConfig, &quarantineConfig) + dispatcher = callbacker.NewCallbackDispatcher(sender, callbackerStore, logger, &sendConfig) err = dispatchPersistedCallbacks(callbackerStore, dispatcher, logger) if err != nil { stopFn() @@ -89,7 +87,7 @@ func StartCallbacker(logger *slog.Logger, appConfig *config.ArcConfig) (func(), workers = callbacker.NewBackgroundWorkers(callbackerStore, dispatcher, logger) workers.StartCallbackStoreCleanup(cfg.PruneInterval, cfg.PruneOlderThan) - workers.StartQuarantineCallbacksDispatch(cfg.QuarantineCheckInterval) + workers.StartFailedCallbacksDispatch(cfg.FailedCallbackCheckInterval) server, err = callbacker.NewServer(appConfig.PrometheusEndpoint, appConfig.GrpcMessageSize, logger, dispatcher, nil) if err != nil { diff --git a/config/config.go b/config/config.go index 8342ba8c7..8444e01c0 100644 --- a/config/config.go +++ b/config/config.go @@ -174,20 +174,16 @@ type K8sWatcherConfig struct { } type CallbackerConfig struct { - ListenAddr string `mapstructure:"listenAddr"` - DialAddr string `mapstructure:"dialAddr"` - Health *HealthConfig `mapstructure:"health"` - Delay time.Duration `mapstructure:"delay"` - Pause time.Duration `mapstructure:"pause"` - BatchSendInterval time.Duration `mapstructure:"batchSendInterval"` - Db *DbConfig `mapstructure:"db"` - PruneInterval time.Duration `mapstructure:"pruneInterval"` - PruneOlderThan time.Duration `mapstructure:"pruneOlderThan"` - QuarantineCheckInterval time.Duration `mapstructure:"quarantineCheckInterval"` - QuarantinePolicy *CallbackerQuarantinePolicy `mapstructure:"quarantinePolicy"` -} - -type CallbackerQuarantinePolicy struct { - BaseDuration time.Duration `mapstructure:"baseDuration"` - PermQuarantineAfter time.Duration `mapstructure:"permQuarantineAfter"` + ListenAddr string `mapstructure:"listenAddr"` + DialAddr string `mapstructure:"dialAddr"` + Health *HealthConfig `mapstructure:"health"` + Delay time.Duration `mapstructure:"delay"` + Pause time.Duration `mapstructure:"pause"` + BatchSendInterval time.Duration `mapstructure:"batchSendInterval"` + Db *DbConfig `mapstructure:"db"` + PruneInterval time.Duration `mapstructure:"pruneInterval"` + PruneOlderThan time.Duration `mapstructure:"pruneOlderThan"` + DelayDuration time.Duration `mapstructure:"delayDuration"` + FailedCallbackCheckInterval time.Duration `mapstructure:"failedCallbackCheckInterval"` + Expiration time.Duration `mapstructure:"expiration"` } diff --git a/config/defaults.go b/config/defaults.go index 072942904..df81123f4 100644 --- a/config/defaults.go +++ b/config/defaults.go @@ -182,17 +182,14 @@ func getCallbackerConfig() *CallbackerConfig { Health: &HealthConfig{ SeverDialAddr: "localhost:8025", }, - Delay: 0, - Pause: 0, - BatchSendInterval: time.Duration(5 * time.Second), - Db: getDbConfig("callbacker"), - PruneInterval: 24 * time.Hour, - PruneOlderThan: 14 * 24 * time.Hour, - QuarantineCheckInterval: time.Minute, - QuarantinePolicy: &CallbackerQuarantinePolicy{ - BaseDuration: 10 * time.Minute, - PermQuarantineAfter: 24 * time.Hour, - }, + Delay: 0, + Pause: 0, + BatchSendInterval: time.Duration(5 * time.Second), + Db: getDbConfig("callbacker"), + PruneInterval: 24 * time.Hour, + PruneOlderThan: 14 * 24 * time.Hour, + FailedCallbackCheckInterval: time.Minute, + Expiration: 24 * time.Hour, } } diff --git a/config/example_config.yaml b/config/example_config.yaml index 10b9b979c..5c96e8db5 100644 --- a/config/example_config.yaml +++ b/config/example_config.yaml @@ -165,7 +165,6 @@ callbacker: sslMode: disable pruneInterval: 24h # interval at which old or failed callbacks are pruned from the store pruneOlderThan: 336h # age threshold for pruning callbacks (older than this value will be removed) - quarantineCheckInterval: 1m # interval at which the store is checked for quarantined callbacks to be re-sent - quarantinePolicy: - baseDuration: 5m # initial duration a callback and its receiver is quarantined after failure - permQuarantineAfter: 24h # maximum time a callback can remain unsent before it's put in permanent quarantine + failedCallbackCheckInterval: 1m # interval at which the store is checked for failed callbacks to be re-sent + delayDuration: 5s # we try callbacks a few times with this delay after which if it fails consistently we store them in db + expiration: 24h # maximum time a callback can remain unsent before it's put as permanently failed \ No newline at end of file diff --git a/doc/README.md b/doc/README.md index cd088d2c7..002cb94dc 100644 --- a/doc/README.md +++ b/doc/README.md @@ -107,7 +107,7 @@ To prevent DDoS attacks on callback receivers, each Callbacker service instance >NOTE: Typically, there are several instances of Callbacker, and each one operates independently. -The Callbacker handles request retries and treats any HTTP status code outside the range of `200–299` as a failure. If the receiver fails to return a success status after a certain number of retries, it is placed in quarantine for a certain period. During this time, sending callbacks to the receiver is paused, and all callbacks are stored persistently in the Callbacker service for later retries. +The Callbacker handles request retries and treats any HTTP status code outside the range of `200–299` as a failure. If the receiver fails to return a success status after a certain number of retries, it is placed in failed state for a certain period. During this time, sending callbacks to the receiver is paused, and all callbacks are stored persistently in the Callbacker service for later retries. >NOTE: Callbacks that have not been successfully sent for an extended period (e.g., 24 hours) are no longer sent. diff --git a/internal/callbacker/background_workers.go b/internal/callbacker/background_workers.go index 296f4e30a..65688dae9 100644 --- a/internal/callbacker/background_workers.go +++ b/internal/callbacker/background_workers.go @@ -37,9 +37,9 @@ func (w *BackgroundWorkers) StartCallbackStoreCleanup(interval, olderThanDuratio go w.pruneCallbacks(interval, olderThanDuration) } -func (w *BackgroundWorkers) StartQuarantineCallbacksDispatch(interval time.Duration) { +func (w *BackgroundWorkers) StartFailedCallbacksDispatch(interval time.Duration) { w.workersWg.Add(1) - go w.dispatchQuarantineCallbacks(interval) + go w.dispatchFailedCallbacks(interval) } func (w *BackgroundWorkers) GracefulStop() { @@ -64,7 +64,7 @@ func (w *BackgroundWorkers) pruneCallbacks(interval, olderThanDuration time.Dura err := w.s.DeleteFailedOlderThan(ctx, olderThan) if err != nil { - w.l.Error("failed to delete old callbacks in quarantine", slog.String("err", err.Error())) + w.l.Error("failed to delete old callbacks in delay", slog.String("err", err.Error())) } case <-w.ctx.Done(): @@ -74,7 +74,7 @@ func (w *BackgroundWorkers) pruneCallbacks(interval, olderThanDuration time.Dura } } -func (w *BackgroundWorkers) dispatchQuarantineCallbacks(interval time.Duration) { +func (w *BackgroundWorkers) dispatchFailedCallbacks(interval time.Duration) { const batchSize = 100 ctx := context.Background() diff --git a/internal/callbacker/dispatcher.go b/internal/callbacker/dispatcher.go index 9e43be6d8..52cdd43bc 100644 --- a/internal/callbacker/dispatcher.go +++ b/internal/callbacker/dispatcher.go @@ -30,8 +30,7 @@ type CallbackDispatcher struct { managers map[string]*sendManager managersMu sync.Mutex - quarantinePolicy *quarantinePolicy - sendConfig *SendConfig + sendConfig *SendConfig } type CallbackEntry struct { @@ -43,28 +42,20 @@ type CallbackEntry struct { type SendConfig struct { Delay time.Duration PauseAfterSingleModeSuccessfulSend time.Duration + DelayDuration time.Duration BatchSendInterval time.Duration -} - -type QuarantineConfig struct { - BaseDuration, PermQuarantineAfterDuration time.Duration + Expiration time.Duration } func NewCallbackDispatcher(callbacker SenderI, cStore store.CallbackerStore, logger *slog.Logger, - sendingConfig *SendConfig, quarantineConfig *QuarantineConfig) *CallbackDispatcher { + sendingConfig *SendConfig) *CallbackDispatcher { return &CallbackDispatcher{ sender: callbacker, store: cStore, logger: logger.With(slog.String("module", "dispatcher")), sendConfig: sendingConfig, - - quarantinePolicy: &quarantinePolicy{ - baseDuration: quarantineConfig.BaseDuration, - permQuarantineAfter: quarantineConfig.PermQuarantineAfterDuration, - now: time.Now, - }, - managers: make(map[string]*sendManager), + managers: make(map[string]*sendManager), } } @@ -82,7 +73,7 @@ func (d *CallbackDispatcher) Dispatch(url string, dto *CallbackEntry, allowBatch manager, ok := d.managers[url] if !ok { - manager = runNewSendManager(url, d.sender, d.store, d.logger, d.quarantinePolicy, d.sendConfig) + manager = runNewSendManager(url, d.sender, d.store, d.logger, d.sendConfig) d.managers[url] = manager } d.managersMu.Unlock() diff --git a/internal/callbacker/dispatcher_test.go b/internal/callbacker/dispatcher_test.go index 8f9dc5992..3a47fa87e 100644 --- a/internal/callbacker/dispatcher_test.go +++ b/internal/callbacker/dispatcher_test.go @@ -53,10 +53,10 @@ func TestCallbackDispatcher(t *testing.T) { sendingConfig := SendConfig{ PauseAfterSingleModeSuccessfulSend: tc.sendInterval, + Expiration: time.Duration(24 * time.Hour), } - quarantineConfig := QuarantineConfig{} - sut := NewCallbackDispatcher(cMq, sMq, slog.Default(), &sendingConfig, &quarantineConfig) + sut := NewCallbackDispatcher(cMq, sMq, slog.Default(), &sendingConfig) var receivers []string for i := range tc.numOfReceivers { diff --git a/internal/callbacker/quarantine_policy.go b/internal/callbacker/quarantine_policy.go deleted file mode 100644 index 52dba76e6..000000000 --- a/internal/callbacker/quarantine_policy.go +++ /dev/null @@ -1,44 +0,0 @@ -package callbacker - -import "time" - -type quarantinePolicy struct { - baseDuration time.Duration - permQuarantineAfter time.Duration - now func() time.Time -} - -// arbitrarily chosen date in a distant future -var infinity = time.Date(2999, time.January, 1, 0, 0, 0, 0, time.UTC) - -// Until calculates the time until which the quarantine should last based on a reference time. -// It compares the current time with the given referenceTime and adjusts the duration of quarantine as follows: -// -// 1. If the time since referenceTime is less than or equal to baseDuration: -// - The quarantine ends after the baseDuration has passed from the current time. -// - Returns the current time plus baseDuration. -// -// 2. If the time since referenceTime is greater than baseDuration but less than permQuarantineAfter: -// - The quarantine duration is extended to double the time that has passed since the referenceTime. -// - Returns the current time plus twice the elapsed time. -// -// 3. If the time since referenceTime exceeds permQuarantineAfter: -// - The quarantine is considered "permanent", meaning no further action is needed. -// - Returns a predefined "infinity" date (January 1, 2999). -// -// This function dynamically adjusts the quarantine period based on how long it has been since -// the reference time, with the possibility of an extended period or abandonment after a certain threshold. -func (p *quarantinePolicy) Until(referenceTime time.Time) time.Time { - duration := p.baseDuration - - since := p.now().Sub(referenceTime) - if since > p.baseDuration { - if since > p.permQuarantineAfter { - return infinity - } - - duration = since * 2 - } - - return p.now().Add(duration) -} diff --git a/internal/callbacker/quarantine_policy_test.go b/internal/callbacker/quarantine_policy_test.go deleted file mode 100644 index fcf90ead2..000000000 --- a/internal/callbacker/quarantine_policy_test.go +++ /dev/null @@ -1,53 +0,0 @@ -package callbacker - -import ( - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -func TestQuarantinePolicy(t *testing.T) { - tt := []struct { - name string - nowFn func() time.Time - refTime time.Time - expectedResult time.Time - }{ - { - name: "reference point in time close to call time- return base duration", - nowFn: func() time.Time { return time.Date(2024, 9, 11, 12, 30, 0, 0, time.UTC) }, - refTime: time.Date(2024, 9, 11, 12, 29, 0, 0, time.UTC), - expectedResult: time.Date(2024, 9, 11, 12, 40, 0, 0, time.UTC), - }, - { - name: "reference point in time is earlier than permanent quarantine- return infinity", - nowFn: func() time.Time { return time.Date(2024, 9, 11, 12, 30, 0, 0, time.UTC) }, - refTime: time.Date(2024, 9, 11, 12, 0, 0, 0, time.UTC), - expectedResult: infinity, - }, - { - name: "reference point in time is eariel than base duration- return double time span", - nowFn: func() time.Time { return time.Date(2024, 9, 11, 12, 30, 0, 0, time.UTC) }, - refTime: time.Date(2024, 9, 11, 12, 15, 0, 0, time.UTC), - expectedResult: time.Date(2024, 9, 11, 13, 0, 0, 0, time.UTC), - }, - } - - for _, tc := range tt { - t.Run(tc.name, func(t *testing.T) { - // given - sut := &quarantinePolicy{ - baseDuration: 10 * time.Minute, - permQuarantineAfter: 20 * time.Minute, - now: tc.nowFn, - } - - // when - actualTime := sut.Until(tc.refTime) - - // then - require.Equal(t, tc.expectedResult, actualTime) - }) - } -} diff --git a/internal/callbacker/send_manager.go b/internal/callbacker/send_manager.go index d07513185..b192c07f4 100644 --- a/internal/callbacker/send_manager.go +++ b/internal/callbacker/send_manager.go @@ -4,23 +4,21 @@ package callbacker /* The SendManager is responsible for managing the sequential sending of callbacks to a specified URL. -It supports single and batched callbacks, handles failures by placing the URL in quarantine, and ensures +It supports single and batched callbacks, handles failures by placing the URL in failed state, and ensures safe storage of unsent callbacks during graceful shutdowns. The manager operates in various modes: - ActiveMode (normal sending) - - QuarantineMode (temporarily halting sends on failure) - StoppingMode (for graceful shutdown). -It processes callbacks from two channels, ensuring either single or batch dispatch, and manages retries based on a quarantine policy. +It processes callbacks from two channels, ensuring either single or batch dispatch, and manages retries based on a failure policy. Key components: - SenderI : responsible for sending callbacks -- quarantine policy: the duration for quarantining a URL are governed by a configurable policy, determining how long the URL remains inactive before retry attempts Sending logic: callbacks are sent to the designated URL one at a time, ensuring sequential and orderly processing. -Quarantine handling: if a URL fails to respond with a success status, the URL is placed in quarantine (based on a defined policy). - During this period, all callbacks for the quarantined URL are stored with a quarantine timestamp, preventing further dispatch attempts until the quarantine expires. +Failure handling: if a URL fails to respond with a success status, the URL is placed in failed state (based on a defined policy). + During this period, all callbacks for the failed URL are stored with a failed timestamp, preventing further dispatch attempts until the we retry again. Graceful Shutdown: on service termination, the sendManager ensures that any unsent callbacks are safely persisted in the store, ensuring no loss of data during shutdown. @@ -40,10 +38,11 @@ type sendManager struct { url string // dependencies - sender SenderI - store store.CallbackerStore - logger *slog.Logger - quarantinePolicy *quarantinePolicy + sender SenderI + store store.CallbackerStore + logger *slog.Logger + + expiration time.Duration // internal state entries chan *CallbackEntry @@ -54,6 +53,7 @@ type sendManager struct { sendDelay time.Duration singleSendSleep time.Duration batchSendInterval time.Duration + delayDuration time.Duration modeMu sync.Mutex mode mode @@ -61,10 +61,11 @@ type sendManager struct { type mode uint8 +var infinity = time.Date(2999, time.January, 1, 0, 0, 0, 0, time.UTC) + const ( IdleMode mode = iota ActiveMode - QuarantineMode StoppingMode entriesBufferSize = 10000 @@ -77,7 +78,7 @@ func WithBufferSize(size int) func(*sendManager) { } } -func runNewSendManager(url string, sender SenderI, store store.CallbackerStore, logger *slog.Logger, quarantinePolicy *quarantinePolicy, sendingConfig *SendConfig, opts ...func(*sendManager)) *sendManager { +func runNewSendManager(url string, sender SenderI, store store.CallbackerStore, logger *slog.Logger, sendingConfig *SendConfig, opts ...func(*sendManager)) *sendManager { const defaultBatchSendInterval = 5 * time.Second batchSendInterval := defaultBatchSendInterval @@ -86,15 +87,16 @@ func runNewSendManager(url string, sender SenderI, store store.CallbackerStore, } m := &sendManager{ - url: url, - sender: sender, - store: store, - logger: logger, - quarantinePolicy: quarantinePolicy, + url: url, + sender: sender, + store: store, + logger: logger, sendDelay: sendingConfig.Delay, singleSendSleep: sendingConfig.PauseAfterSingleModeSuccessfulSend, batchSendInterval: batchSendInterval, + delayDuration: sendingConfig.DelayDuration, + expiration: sendingConfig.Expiration, entries: make(chan *CallbackEntry, entriesBufferSize), batchEntries: make(chan *CallbackEntry, entriesBufferSize), @@ -177,6 +179,7 @@ func (m *sendManager) run() { defer runWg.Done() danglingBatchedCallbacks = m.consumeBatchedCallbacks() }() + runWg.Wait() // store unsent callbacks @@ -197,8 +200,6 @@ func (m *sendManager) consumeSingleCallbacks() []*store.CallbackData { switch m.getMode() { case ActiveMode: m.send(callback) - case QuarantineMode: - m.handleQuarantine(callback) case StoppingMode: // add callback to save danglingCallbacks = append(danglingCallbacks, toStoreDto(m.url, callback, nil, false)) @@ -240,10 +241,6 @@ runLoop: m.sendBatch(batch) callbacks = callbacks[n:] // shrink slice - case QuarantineMode: - m.handleQuarantineBatch(callbacks) - callbacks = nil - case StoppingMode: // add callback to save danglingCallbacks = append(danglingCallbacks, toStoreDtoCollection(m.url, nil, true, callbacks)...) @@ -284,15 +281,14 @@ func (m *sendManager) send(callback *CallbackEntry) { return } - m.putInQuarantine() - m.handleQuarantine(callback) -} + until := time.Now().Add(m.delayDuration) + if time.Since(callback.Data.Timestamp) > m.expiration { + until = infinity + } -func (m *sendManager) handleQuarantine(ce *CallbackEntry) { - qUntil := m.quarantinePolicy.Until(ce.Data.Timestamp) - err := m.store.Set(context.Background(), toStoreDto(m.url, ce, &qUntil, false)) + err := m.store.Set(context.Background(), toStoreDto(m.url, callback, &until, false)) if err != nil { - m.logger.Error("failed to store callback in quarantine", slog.String("url", m.url), slog.String("err", err.Error())) + m.logger.Error("failed to store failed callback in db", slog.String("url", m.url), slog.String("err", err.Error())) } } @@ -310,35 +306,16 @@ func (m *sendManager) sendBatch(batch []*CallbackEntry) { return } - m.putInQuarantine() - m.handleQuarantineBatch(batch) -} - -func (m *sendManager) handleQuarantineBatch(batch []*CallbackEntry) { - qUntil := m.quarantinePolicy.Until(batch[0].Data.Timestamp) - err := m.store.SetMany(context.Background(), toStoreDtoCollection(m.url, &qUntil, true, batch)) + until := time.Now().Add(m.delayDuration) + if time.Since(batch[0].Data.Timestamp) > m.expiration { + until = infinity + } + err := m.store.SetMany(context.Background(), toStoreDtoCollection(m.url, &until, true, batch)) if err != nil { - m.logger.Error("failed to store callbacks in quarantine", slog.String("url", m.url), slog.String("err", err.Error())) + m.logger.Error("failed to store failed callbacks in db", slog.String("url", m.url), slog.String("err", err.Error())) } } -func (m *sendManager) putInQuarantine() { - m.setMode(QuarantineMode) - m.logger.Warn("send callback failed - putting receiver in quarantine", slog.String("url", m.url), slog.Duration("approx. duration", m.quarantinePolicy.baseDuration)) - - go func() { - time.Sleep(m.quarantinePolicy.baseDuration) - m.modeMu.Lock() - - if m.mode != StoppingMode { - m.mode = ActiveMode - m.logger.Info("receiver is active again after quarantine", slog.String("url", m.url)) - } - - m.modeMu.Unlock() - }() -} - func toStoreDto(url string, entry *CallbackEntry, postponedUntil *time.Time, allowBatch bool) *store.CallbackData { return &store.CallbackData{ URL: url, diff --git a/internal/callbacker/send_manager_test.go b/internal/callbacker/send_manager_test.go index 23022b1a6..fba555899 100644 --- a/internal/callbacker/send_manager_test.go +++ b/internal/callbacker/send_manager_test.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "log/slog" + "sync/atomic" "testing" "time" @@ -111,7 +112,7 @@ func TestSendManager(t *testing.T) { opts = append(opts, WithBufferSize(tc.setEntriesBufferSize)) } - sut := runNewSendManager("", cMq, sMq, slog.Default(), nil, sendConfig, opts...) + sut := runNewSendManager("", cMq, sMq, slog.Default(), sendConfig, opts...) // add callbacks before starting the manager to queue them for range tc.numOfSingleCallbacks { @@ -159,13 +160,13 @@ func TestSendManager(t *testing.T) { } } -func TestSendManager_Quarantine(t *testing.T) { - /* Quarantine scenario +func TestSendManager_FailedCallbacks(t *testing.T) { + /* Failure scenario 1. sending failed - 2. put manager in quarantine for a specified duration - 3. store all callbacks during the quarantine period - 4. switch manager to active mode once quarantine is over - 5. send new callbacks after quarantine + 2. put manager in failed state for a specified duration + 3. store all callbacks during the failure period + 4. switch manager to active mode once failure duration is over + 5. send new callbacks again */ tt := []struct { @@ -184,10 +185,10 @@ func TestSendManager_Quarantine(t *testing.T) { for _, tc := range tt { t.Run(tc.name, func(t *testing.T) { // given - sendOK := true + var sendOK int32 senderMq := &SenderIMock{ - SendFunc: func(_, _ string, _ *Callback) bool { return sendOK }, - SendBatchFunc: func(_, _ string, _ []*Callback) bool { return sendOK }, + SendFunc: func(_, _ string, _ *Callback) bool { return atomic.LoadInt32(&sendOK) == 1 }, + SendBatchFunc: func(_, _ string, _ []*Callback) bool { return atomic.LoadInt32(&sendOK) == 1 }, } storeMq := &mocks.CallbackerStoreMock{ @@ -199,20 +200,14 @@ func TestSendManager_Quarantine(t *testing.T) { }, } - policy := quarantinePolicy{ - baseDuration: 200 * time.Millisecond, - permQuarantineAfter: time.Hour, - now: time.Now, - } - - var preQuarantineCallbacks []*CallbackEntry + var preFailureCallbacks []*CallbackEntry for i := range 10 { - preQuarantineCallbacks = append(preQuarantineCallbacks, &CallbackEntry{Data: &Callback{TxID: fmt.Sprintf("q %d", i)}}) + preFailureCallbacks = append(preFailureCallbacks, &CallbackEntry{Data: &Callback{TxID: fmt.Sprintf("q %d", i)}}) } - var postQuarantineCallbacks []*CallbackEntry + var postFailureCallbacks []*CallbackEntry for i := range 10 { - postQuarantineCallbacks = append(postQuarantineCallbacks, &CallbackEntry{Data: &Callback{TxID: fmt.Sprintf("a %d", i)}}) + postFailureCallbacks = append(postFailureCallbacks, &CallbackEntry{Data: &Callback{TxID: fmt.Sprintf("a %d", i)}}) } sendConfig := &SendConfig{ @@ -221,33 +216,30 @@ func TestSendManager_Quarantine(t *testing.T) { BatchSendInterval: time.Millisecond, } - sut := runNewSendManager("http://unittest.com", senderMq, storeMq, slog.Default(), &policy, sendConfig) + sut := runNewSendManager("http://unittest.com", senderMq, storeMq, slog.Default(), sendConfig) // when - sendOK = false // trigger send failure - this should put the manager in quarantine + atomic.StoreInt32(&sendOK, 0) // trigger send failure - this should put the manager in failed state // add a few callbacks to send - all should be stored - for _, c := range preQuarantineCallbacks { + for _, c := range preFailureCallbacks { sut.Add(c, tc.batch) - // wait to make sure first callback is processed as first - time.Sleep(10 * time.Millisecond) } - require.Equal(t, QuarantineMode, sut.getMode()) - time.Sleep(policy.baseDuration + 20*time.Millisecond) // wait for the quarantine period to complete + time.Sleep(500 * time.Millisecond) // wait for the failure period to complete require.Equal(t, ActiveMode, sut.getMode()) - sendOK = true // now all sends should complete successfully + atomic.StoreInt32(&sendOK, 1) // now all sends should complete successfully // add a few callbacks to send - all should be sent - for _, c := range postQuarantineCallbacks { + for _, c := range postFailureCallbacks { sut.Add(c, tc.batch) } // give a chance to process - time.Sleep(50 * time.Millisecond) + time.Sleep(500 * time.Millisecond) // then - // check stored callbacks during quarantine + // check stored callbacks during failure var storedCallbacks []*store.CallbackData if tc.batch { for _, c := range storeMq.SetManyCalls() { @@ -259,8 +251,8 @@ func TestSendManager_Quarantine(t *testing.T) { } } - require.Equal(t, len(preQuarantineCallbacks), len(storedCallbacks), "all callbacks sent during quarantine should be stored") - for _, c := range preQuarantineCallbacks { + require.Equal(t, len(preFailureCallbacks), len(storedCallbacks), "all callbacks sent during failure should be stored") + for _, c := range preFailureCallbacks { _, ok := find(storedCallbacks, func(e *store.CallbackData) bool { return e.TxID == c.Data.TxID }) @@ -280,15 +272,15 @@ func TestSendManager_Quarantine(t *testing.T) { } } - require.Equal(t, len(postQuarantineCallbacks)+1, len(sendCallbacks), "manager should attempt to send the callback that caused quarantine (first call) and all callbacks sent after quarantine") + require.Equal(t, len(postFailureCallbacks)+len(preFailureCallbacks), len(sendCallbacks), "manager should attempt to send the callback that caused failure (first call) and all callbacks sent after failure") _, ok := find(sendCallbacks, func(e *Callback) bool { - return e.TxID == preQuarantineCallbacks[0].Data.TxID + return e.TxID == preFailureCallbacks[0].Data.TxID }) require.True(t, ok) - for _, c := range postQuarantineCallbacks { + for _, c := range postFailureCallbacks { _, ok := find(sendCallbacks, func(e *Callback) bool { return e.TxID == c.Data.TxID }) diff --git a/internal/callbacker/sender.go b/internal/callbacker/sender.go index 4b885aea5..4d4a9eade 100644 --- a/internal/callbacker/sender.go +++ b/internal/callbacker/sender.go @@ -13,13 +13,13 @@ import ( ) type CallbackSender struct { - httpClient *http.Client - mu sync.Mutex - disposed bool - stats *stats - logger *slog.Logger - retries int - initRetrySleepDuration time.Duration + httpClient *http.Client + mu sync.Mutex + disposed bool + stats *stats + logger *slog.Logger + retries int + retrySleepDuration time.Duration } type SenderOption func(s *CallbackSender) @@ -31,7 +31,7 @@ const ( func WithInitRetrySleepDuration(d time.Duration) func(*CallbackSender) { return func(s *CallbackSender) { - s.initRetrySleepDuration = d + s.retrySleepDuration = d } } @@ -58,11 +58,11 @@ func NewSender(httpClient *http.Client, logger *slog.Logger, opts ...SenderOptio } callbacker := &CallbackSender{ - httpClient: httpClient, - stats: stats, - logger: logger.With(slog.String("module", "sender")), - retries: retriesDefault, - initRetrySleepDuration: initRetrySleepDurationDefault, + httpClient: httpClient, + stats: stats, + logger: logger.With(slog.String("module", "sender")), + retries: retriesDefault, + retrySleepDuration: 5 * time.Second, } // apply options to processor @@ -190,7 +190,7 @@ func (p *CallbackSender) SendBatch(url, token string, dtos []*Callback) (ok bool } func (p *CallbackSender) sendCallbackWithRetries(url, token string, jsonPayload []byte) (ok bool, nrOfRetries int) { - retrySleep := p.initRetrySleepDuration + retrySleep := p.retrySleepDuration ok, retry := false, false counter := 0 for range p.retries { @@ -203,8 +203,6 @@ func (p *CallbackSender) sendCallbackWithRetries(url, token string, jsonPayload } time.Sleep(retrySleep) - // increase intervals on each failure - retrySleep *= 2 } return ok, counter } diff --git a/internal/callbacker/server_test.go b/internal/callbacker/server_test.go index 1fc0b7fba..e247632d9 100644 --- a/internal/callbacker/server_test.go +++ b/internal/callbacker/server_test.go @@ -75,8 +75,7 @@ func TestSendCallback(t *testing.T) { senderMq, storeMq, slog.Default(), - &callbacker.SendConfig{}, - &callbacker.QuarantineConfig{}, + &callbacker.SendConfig{Expiration: time.Duration(24 * time.Hour)}, ) server, err := callbacker.NewServer("", 0, slog.Default(), mockDispatcher, nil) diff --git a/internal/callbacker/store/postgresql/postgres.go b/internal/callbacker/store/postgresql/postgres.go index 84e5443fa..3226493d6 100644 --- a/internal/callbacker/store/postgresql/postgres.go +++ b/internal/callbacker/store/postgresql/postgres.go @@ -56,7 +56,7 @@ func (p *PostgreSQL) SetMany(ctx context.Context, data []*store.CallbackData) er blockHashes := make([]*string, len(data)) blockHeights := make([]sql.NullInt64, len(data)) competingTxs := make([]*string, len(data)) - quarantineUntils := make([]sql.NullTime, len(data)) + delayUntils := make([]sql.NullTime, len(data)) allowBatches := make([]bool, len(data)) for i, d := range data { @@ -79,7 +79,7 @@ func (p *PostgreSQL) SetMany(ctx context.Context, data []*store.CallbackData) er } if d.PostponedUntil != nil { - quarantineUntils[i] = sql.NullTime{Time: d.PostponedUntil.UTC(), Valid: true} + delayUntils[i] = sql.NullTime{Time: d.PostponedUntil.UTC(), Valid: true} } } @@ -122,7 +122,7 @@ func (p *PostgreSQL) SetMany(ctx context.Context, data []*store.CallbackData) er pq.Array(blockHeights), pq.Array(timestamps), pq.Array(competingTxs), - pq.Array(quarantineUntils), + pq.Array(delayUntils), pq.Array(allowBatches), ) diff --git a/test/config/config.yaml b/test/config/config.yaml index cab172f84..1c32339dd 100644 --- a/test/config/config.yaml +++ b/test/config/config.yaml @@ -149,3 +149,6 @@ callbacker: maxIdleConns: 10 maxOpenConns: 80 sslMode: disable + failedCallbackCheckInterval: 1m # interval at which the store is checked for failed callbacks to be re-sent + delayDuration: 5s # we try callbacks a few times with this delay after which if it fails consistently we store them in db + expiration: 24h # maximum time a callback can remain unsent before it's put as permanently failed \ No newline at end of file