Skip to content

Commit

Permalink
refactor: Callbacker logic with less strict URL policy (#685)
Browse files Browse the repository at this point in the history
  • Loading branch information
shotasilagadzetaal authored Dec 5, 2024
1 parent 568adb2 commit 6ecfb84
Show file tree
Hide file tree
Showing 17 changed files with 130 additions and 269 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ All notable changes to this project will be documented in this file. The format

## Table of Contents
- [Unreleased](#unreleased)
- [1.3.12](#1312---2024-12-05)
- [1.3.2](#132---2024-10-30)
- [1.3.0](#130---2024-08-21)
- [1.2.0](#120---2024-08-13)
Expand All @@ -19,6 +20,13 @@ All notable changes to this project will be documented in this file. The format

## [Unreleased]

## [1.3.12] - 2024-12-05

### Changed
- The Callbacker service handles unsuccessful callback attempts by delaying individual callbacks and retrying them again instead of putting an unsuccessful receiver in quarantine

### Added

## [1.3.2] - 2024-10-30

### Changed
Expand Down
12 changes: 5 additions & 7 deletions cmd/arc/services/callbacker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Key components:
- callback sender: responsible for sending callbacks
- background tasks:
- periodically cleans up old, unsent callbacks from storage
- periodically checks the storage for callbacks in quarantine (temporary ban) and re-attempts dispatch after the quarantine period
- periodically checks the storage for callbacks in delayed state (temporary ban) and re-attempts dispatch after the delay period
- gRPC server with endpoints:
- Health: provides a health check endpoint for the service
- SendCallback: receives and processes new callback requests
Expand Down Expand Up @@ -71,16 +71,14 @@ func StartCallbacker(logger *slog.Logger, appConfig *config.ArcConfig) (func(),
}

sendConfig := callbacker.SendConfig{
Expiration: cfg.Expiration,
Delay: cfg.Delay,
DelayDuration: cfg.DelayDuration,
PauseAfterSingleModeSuccessfulSend: cfg.Pause,
BatchSendInterval: cfg.BatchSendInterval,
}
quarantineConfig := callbacker.QuarantineConfig{
BaseDuration: cfg.QuarantinePolicy.BaseDuration,
PermQuarantineAfterDuration: cfg.QuarantinePolicy.PermQuarantineAfter,
}

dispatcher = callbacker.NewCallbackDispatcher(sender, callbackerStore, logger, &sendConfig, &quarantineConfig)
dispatcher = callbacker.NewCallbackDispatcher(sender, callbackerStore, logger, &sendConfig)
err = dispatchPersistedCallbacks(callbackerStore, dispatcher, logger)
if err != nil {
stopFn()
Expand All @@ -89,7 +87,7 @@ func StartCallbacker(logger *slog.Logger, appConfig *config.ArcConfig) (func(),

workers = callbacker.NewBackgroundWorkers(callbackerStore, dispatcher, logger)
workers.StartCallbackStoreCleanup(cfg.PruneInterval, cfg.PruneOlderThan)
workers.StartQuarantineCallbacksDispatch(cfg.QuarantineCheckInterval)
workers.StartFailedCallbacksDispatch(cfg.FailedCallbackCheckInterval)

server, err = callbacker.NewServer(appConfig.PrometheusEndpoint, appConfig.GrpcMessageSize, logger, dispatcher, nil)
if err != nil {
Expand Down
28 changes: 12 additions & 16 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,20 +174,16 @@ type K8sWatcherConfig struct {
}

type CallbackerConfig struct {
ListenAddr string `mapstructure:"listenAddr"`
DialAddr string `mapstructure:"dialAddr"`
Health *HealthConfig `mapstructure:"health"`
Delay time.Duration `mapstructure:"delay"`
Pause time.Duration `mapstructure:"pause"`
BatchSendInterval time.Duration `mapstructure:"batchSendInterval"`
Db *DbConfig `mapstructure:"db"`
PruneInterval time.Duration `mapstructure:"pruneInterval"`
PruneOlderThan time.Duration `mapstructure:"pruneOlderThan"`
QuarantineCheckInterval time.Duration `mapstructure:"quarantineCheckInterval"`
QuarantinePolicy *CallbackerQuarantinePolicy `mapstructure:"quarantinePolicy"`
}

type CallbackerQuarantinePolicy struct {
BaseDuration time.Duration `mapstructure:"baseDuration"`
PermQuarantineAfter time.Duration `mapstructure:"permQuarantineAfter"`
ListenAddr string `mapstructure:"listenAddr"`
DialAddr string `mapstructure:"dialAddr"`
Health *HealthConfig `mapstructure:"health"`
Delay time.Duration `mapstructure:"delay"`
Pause time.Duration `mapstructure:"pause"`
BatchSendInterval time.Duration `mapstructure:"batchSendInterval"`
Db *DbConfig `mapstructure:"db"`
PruneInterval time.Duration `mapstructure:"pruneInterval"`
PruneOlderThan time.Duration `mapstructure:"pruneOlderThan"`
DelayDuration time.Duration `mapstructure:"delayDuration"`
FailedCallbackCheckInterval time.Duration `mapstructure:"failedCallbackCheckInterval"`
Expiration time.Duration `mapstructure:"expiration"`
}
19 changes: 8 additions & 11 deletions config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,17 +182,14 @@ func getCallbackerConfig() *CallbackerConfig {
Health: &HealthConfig{
SeverDialAddr: "localhost:8025",
},
Delay: 0,
Pause: 0,
BatchSendInterval: time.Duration(5 * time.Second),
Db: getDbConfig("callbacker"),
PruneInterval: 24 * time.Hour,
PruneOlderThan: 14 * 24 * time.Hour,
QuarantineCheckInterval: time.Minute,
QuarantinePolicy: &CallbackerQuarantinePolicy{
BaseDuration: 10 * time.Minute,
PermQuarantineAfter: 24 * time.Hour,
},
Delay: 0,
Pause: 0,
BatchSendInterval: time.Duration(5 * time.Second),
Db: getDbConfig("callbacker"),
PruneInterval: 24 * time.Hour,
PruneOlderThan: 14 * 24 * time.Hour,
FailedCallbackCheckInterval: time.Minute,
Expiration: 24 * time.Hour,
}
}

Expand Down
7 changes: 3 additions & 4 deletions config/example_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ callbacker:
sslMode: disable
pruneInterval: 24h # interval at which old or failed callbacks are pruned from the store
pruneOlderThan: 336h # age threshold for pruning callbacks (older than this value will be removed)
quarantineCheckInterval: 1m # interval at which the store is checked for quarantined callbacks to be re-sent
quarantinePolicy:
baseDuration: 5m # initial duration a callback and its receiver is quarantined after failure
permQuarantineAfter: 24h # maximum time a callback can remain unsent before it's put in permanent quarantine
failedCallbackCheckInterval: 1m # interval at which the store is checked for failed callbacks to be re-sent
delayDuration: 5s # we try callbacks a few times with this delay after which if it fails consistently we store them in db
expiration: 24h # maximum time a callback can remain unsent before it's put as permanently failed
2 changes: 1 addition & 1 deletion doc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ To prevent DDoS attacks on callback receivers, each Callbacker service instance

>NOTE: Typically, there are several instances of Callbacker, and each one operates independently.
The Callbacker handles request retries and treats any HTTP status code outside the range of `200–299` as a failure. If the receiver fails to return a success status after a certain number of retries, it is placed in quarantine for a certain period. During this time, sending callbacks to the receiver is paused, and all callbacks are stored persistently in the Callbacker service for later retries.
The Callbacker handles request retries and treats any HTTP status code outside the range of `200–299` as a failure. If the receiver fails to return a success status after a certain number of retries, it is placed in failed state for a certain period. During this time, sending callbacks to the receiver is paused, and all callbacks are stored persistently in the Callbacker service for later retries.

>NOTE: Callbacks that have not been successfully sent for an extended period (e.g., 24 hours) are no longer sent.
Expand Down
8 changes: 4 additions & 4 deletions internal/callbacker/background_workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ func (w *BackgroundWorkers) StartCallbackStoreCleanup(interval, olderThanDuratio
go w.pruneCallbacks(interval, olderThanDuration)
}

func (w *BackgroundWorkers) StartQuarantineCallbacksDispatch(interval time.Duration) {
func (w *BackgroundWorkers) StartFailedCallbacksDispatch(interval time.Duration) {
w.workersWg.Add(1)
go w.dispatchQuarantineCallbacks(interval)
go w.dispatchFailedCallbacks(interval)
}

func (w *BackgroundWorkers) GracefulStop() {
Expand All @@ -64,7 +64,7 @@ func (w *BackgroundWorkers) pruneCallbacks(interval, olderThanDuration time.Dura

err := w.s.DeleteFailedOlderThan(ctx, olderThan)
if err != nil {
w.l.Error("failed to delete old callbacks in quarantine", slog.String("err", err.Error()))
w.l.Error("failed to delete old callbacks in delay", slog.String("err", err.Error()))
}

case <-w.ctx.Done():
Expand All @@ -74,7 +74,7 @@ func (w *BackgroundWorkers) pruneCallbacks(interval, olderThanDuration time.Dura
}
}

func (w *BackgroundWorkers) dispatchQuarantineCallbacks(interval time.Duration) {
func (w *BackgroundWorkers) dispatchFailedCallbacks(interval time.Duration) {
const batchSize = 100

ctx := context.Background()
Expand Down
21 changes: 6 additions & 15 deletions internal/callbacker/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ type CallbackDispatcher struct {
managers map[string]*sendManager
managersMu sync.Mutex

quarantinePolicy *quarantinePolicy
sendConfig *SendConfig
sendConfig *SendConfig
}

type CallbackEntry struct {
Expand All @@ -43,28 +42,20 @@ type CallbackEntry struct {
type SendConfig struct {
Delay time.Duration
PauseAfterSingleModeSuccessfulSend time.Duration
DelayDuration time.Duration
BatchSendInterval time.Duration
}

type QuarantineConfig struct {
BaseDuration, PermQuarantineAfterDuration time.Duration
Expiration time.Duration
}

func NewCallbackDispatcher(callbacker SenderI, cStore store.CallbackerStore, logger *slog.Logger,
sendingConfig *SendConfig, quarantineConfig *QuarantineConfig) *CallbackDispatcher {
sendingConfig *SendConfig) *CallbackDispatcher {
return &CallbackDispatcher{
sender: callbacker,
store: cStore,
logger: logger.With(slog.String("module", "dispatcher")),

sendConfig: sendingConfig,

quarantinePolicy: &quarantinePolicy{
baseDuration: quarantineConfig.BaseDuration,
permQuarantineAfter: quarantineConfig.PermQuarantineAfterDuration,
now: time.Now,
},
managers: make(map[string]*sendManager),
managers: make(map[string]*sendManager),
}
}

Expand All @@ -82,7 +73,7 @@ func (d *CallbackDispatcher) Dispatch(url string, dto *CallbackEntry, allowBatch
manager, ok := d.managers[url]

if !ok {
manager = runNewSendManager(url, d.sender, d.store, d.logger, d.quarantinePolicy, d.sendConfig)
manager = runNewSendManager(url, d.sender, d.store, d.logger, d.sendConfig)
d.managers[url] = manager
}
d.managersMu.Unlock()
Expand Down
4 changes: 2 additions & 2 deletions internal/callbacker/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ func TestCallbackDispatcher(t *testing.T) {

sendingConfig := SendConfig{
PauseAfterSingleModeSuccessfulSend: tc.sendInterval,
Expiration: time.Duration(24 * time.Hour),
}
quarantineConfig := QuarantineConfig{}

sut := NewCallbackDispatcher(cMq, sMq, slog.Default(), &sendingConfig, &quarantineConfig)
sut := NewCallbackDispatcher(cMq, sMq, slog.Default(), &sendingConfig)

var receivers []string
for i := range tc.numOfReceivers {
Expand Down
44 changes: 0 additions & 44 deletions internal/callbacker/quarantine_policy.go

This file was deleted.

53 changes: 0 additions & 53 deletions internal/callbacker/quarantine_policy_test.go

This file was deleted.

Loading

0 comments on commit 6ecfb84

Please sign in to comment.