Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARCO-183: implement quarantine for problematic callback receivers #587

Merged
merged 5 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 95 additions & 43 deletions cmd/arc/services/callbacker.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmd

import (
"context"
"fmt"
"log/slog"
"net"
Expand All @@ -24,10 +25,10 @@ func StartCallbacker(logger *slog.Logger, appConfig *config.ArcConfig) (func(),
config := appConfig.Callbacker

var (
store store.CallbackerStore
sender *callbacker.CallbackSender
dispatcher *callbacker.CallbackDispatcher

store store.CallbackerStore
sender *callbacker.CallbackSender
dispatcher *callbacker.CallbackDispatcher
workers *callbacker.BackgroundWorkers
server *callbacker.Server
healthServer *grpc.Server

Expand All @@ -36,38 +37,11 @@ func StartCallbacker(logger *slog.Logger, appConfig *config.ArcConfig) (func(),

stopFn := func() {
logger.Info("Shutting down callbacker")

// dispose of dependencies in the correct order:
// 1. server - ensure no new callbacks will be received
// 2. dispatcher - ensure all already accepted callbacks are proccessed
// 3. sender - finally, stop the sender as there are no callbacks left to send.
// 4. store

if server != nil {
server.GracefulStop()
}
if dispatcher != nil {
dispatcher.GracefulStop()
}
if sender != nil {
sender.GracefulStop()
}

if store != nil {
err := store.Close()
if err != nil {
logger.Error("Could not close the store", slog.String("err", err.Error()))
}
}

if healthServer != nil {
healthServer.Stop()
}

dispose(logger, server, workers, dispatcher, sender, store, healthServer)
logger.Info("Shutted down")
}

store, err = NewStore(config.Db)
store, err = newStore(config.Db)
if err != nil {
return nil, fmt.Errorf("failed to create callbacker store: %v", err)
}
Expand All @@ -78,22 +52,25 @@ func StartCallbacker(logger *slog.Logger, appConfig *config.ArcConfig) (func(),
return nil, fmt.Errorf("failed to create callback sender: %v", err)
}

dispatcher = callbacker.NewCallbackDispatcher(sender, store, config.Pause)
logger.Info("Init callback dispatcher, add to processing abandoned callbacks")
err = dispatcher.Init()
dispatcher = callbacker.NewCallbackDispatcher(sender, store, logger, config.Pause, config.QuarantinePolicy.BaseDuration, config.QuarantinePolicy.PermQuarantineAfter)
err = dispatchPersistedCallbacks(store, dispatcher, logger)
if err != nil {
stopFn()
return nil, fmt.Errorf("failed to init callback dispatcher, couldn't process all abandoned callbacks: %v", err)
return nil, fmt.Errorf("failed to dispatch previously persisted callbacks: %v", err)
}

workers = callbacker.NewBackgroundWorkers(store, dispatcher, logger)
workers.StartCallbackStoreCleanup(config.PruneInterval, config.PruneOlderThan)
workers.StartQuarantineCallbacksDispatch(config.QuarantineCheckInterval)

server = callbacker.NewServer(dispatcher, callbacker.WithLogger(logger.With(slog.String("module", "server"))))
err = server.Serve(config.ListenAddr, appConfig.GrpcMessageSize, appConfig.PrometheusEndpoint)
if err != nil {
stopFn()
return nil, fmt.Errorf("GRPCServer failed: %v", err)
}

healthServer, err = StartHealthServerCallbacker(server, config.Health, logger)
healthServer, err = startHealthServerCallbacker(server, config.Health, logger)
if err != nil {
stopFn()
return nil, fmt.Errorf("failed to start health server: %v", err)
Expand All @@ -103,16 +80,16 @@ func StartCallbacker(logger *slog.Logger, appConfig *config.ArcConfig) (func(),
return stopFn, nil
}

func NewStore(dbConfig *config.DbConfig) (s store.CallbackerStore, err error) {
func newStore(dbConfig *config.DbConfig) (s store.CallbackerStore, err error) {
switch dbConfig.Mode {
case DbModePostgres:
postgres := dbConfig.Postgres
config := dbConfig.Postgres

dbInfo := fmt.Sprintf(
"user=%s password=%s dbname=%s host=%s port=%d sslmode=%s",
postgres.User, postgres.Password, postgres.Name, postgres.Host, postgres.Port, postgres.SslMode,
config.User, config.Password, config.Name, config.Host, config.Port, config.SslMode,
)
s, err = postgresql.New(dbInfo, postgres.MaxIdleConns, postgres.MaxOpenConns)
s, err = postgresql.New(dbInfo, config.MaxIdleConns, config.MaxOpenConns)
if err != nil {
return nil, fmt.Errorf("failed to open postgres DB: %v", err)
}
Expand All @@ -123,7 +100,25 @@ func NewStore(dbConfig *config.DbConfig) (s store.CallbackerStore, err error) {
return s, err
}

func StartHealthServerCallbacker(serv *callbacker.Server, healthConfig *config.HealthConfig, logger *slog.Logger) (*grpc.Server, error) {
func dispatchPersistedCallbacks(s store.CallbackerStore, d *callbacker.CallbackDispatcher, l *slog.Logger) error {
l.Info("Dispatch persited callbacks")

const batchSize = 100
ctx := context.Background()

for {
callbacks, err := s.PopMany(ctx, batchSize)
if err != nil || len(callbacks) == 0 {
return err
}

for _, c := range callbacks {
d.Dispatch(c.Url, toCallbackEntry(c))
}
}
}

func startHealthServerCallbacker(serv *callbacker.Server, healthConfig *config.HealthConfig, logger *slog.Logger) (*grpc.Server, error) {
gs := grpc.NewServer()

grpc_health_v1.RegisterHealthServer(gs, serv) // registration
Expand All @@ -145,3 +140,60 @@ func StartHealthServerCallbacker(serv *callbacker.Server, healthConfig *config.H

return gs, nil
}

func dispose(l *slog.Logger, server *callbacker.Server, workers *callbacker.BackgroundWorkers,
dispatcher *callbacker.CallbackDispatcher, sender *callbacker.CallbackSender,
store store.CallbackerStore, healthServer *grpc.Server) {

// dispose of dependencies in the correct order:
// 1. server - ensure no new callbacks will be received
// 2. background workers - ensure no callbacks from background will be accepted
// 3. dispatcher - ensure all already accepted callbacks are proccessed
// 4. sender - finally, stop the sender as there are no callbacks left to send.
// 5. store

if server != nil {
server.GracefulStop()
}
if workers != nil {
workers.GracefulStop()
}
if dispatcher != nil {
dispatcher.GracefulStop()
}
if sender != nil {
sender.GracefulStop()
}

if store != nil {
err := store.Close()
if err != nil {
l.Error("Could not close the store", slog.String("err", err.Error()))
}
}

if healthServer != nil {
healthServer.Stop()
}

}

func toCallbackEntry(dto *store.CallbackData) *callbacker.CallbackEntry {
d := &callbacker.Callback{
Timestamp: dto.Timestamp,

CompetingTxs: dto.CompetingTxs,
TxID: dto.TxID,
TxStatus: dto.TxStatus,
ExtraInfo: dto.ExtraInfo,
MerklePath: dto.MerklePath,

BlockHash: dto.BlockHash,
BlockHeight: dto.BlockHeight,
}

return &callbacker.CallbackEntry{
Token: dto.Token,
Data: d,
}
}
19 changes: 14 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,18 @@ type K8sWatcherConfig struct {
}

type CallbackerConfig struct {
ListenAddr string `mapstructure:"listenAddr"`
DialAddr string `mapstructure:"dialAddr"`
Health *HealthConfig `mapstructure:"health"`
Pause time.Duration `mapstructure:"pause"`
Db *DbConfig `mapstructure:"db"`
ListenAddr string `mapstructure:"listenAddr"`
DialAddr string `mapstructure:"dialAddr"`
Health *HealthConfig `mapstructure:"health"`
Pause time.Duration `mapstructure:"pause"`
Db *DbConfig `mapstructure:"db"`
PruneInterval time.Duration `mapstructure:"pruneInterval"`
PruneOlderThan time.Duration `mapstructure:"pruneOlderThan"`
QuarantineCheckInterval time.Duration `mapstructure:"quarantineCheckInterval"`
QuarantinePolicy *CallbackerQuarantinePolicy `mapstructure:"quarantinePolicy"`
}

type CallbackerQuarantinePolicy struct {
BaseDuration time.Duration `mapstructure:"baseDuration"`
PermQuarantineAfter time.Duration `mapstructure:"permQuarantineAfter"`
}
11 changes: 9 additions & 2 deletions config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,14 @@ func getCallbackerConfig() *CallbackerConfig {
Health: &HealthConfig{
SeverDialAddr: "localhost:8025",
},
Pause: 0,
Db: getDbConfig("callbacker"),
Pause: 0,
Db: getDbConfig("callbacker"),
PruneInterval: 24 * time.Hour,
PruneOlderThan: 14 * 24 * time.Hour,
QuarantineCheckInterval: time.Minute,
QuarantinePolicy: &CallbackerQuarantinePolicy{
BaseDuration: 10 * time.Minute,
PermQuarantineAfter: 24 * time.Hour,
},
}
}
6 changes: 6 additions & 0 deletions config/example_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,10 @@ callbacker:
maxIdleConns: 10 # maximum idle connections
maxOpenConns: 80 # maximum open connections
sslMode: disable
pruneInterval: 24h # interval at which old or failed callbacks are pruned from the store
pruneOlderThan: 336h # age threshold for pruning callbacks (older than this value will be removed)
quarantineCheckInterval: 1m # interval at which the store is checked for quarantined callbacks to be re-sent
quarantinePolicy:
baseDuration: 5m # initial duration a callback and its receiver is quarantined after failure
permQuarantineAfter: 24h # maximum time a callback can remain unsent before it's put in permanent quarantine

122 changes: 122 additions & 0 deletions internal/callbacker/background_workers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package callbacker

import (
"context"
"log/slog"
"sync"
"time"

"github.com/bitcoin-sv/arc/internal/callbacker/store"
)

type BackgroundWorkers struct {
s store.CallbackerStore
l *slog.Logger
d *CallbackDispatcher

workersWg sync.WaitGroup
ctx context.Context
cancelAll func()
}

func NewBackgroundWorkers(store store.CallbackerStore, dispatcher *CallbackDispatcher, logger *slog.Logger) *BackgroundWorkers {
ctx, cancel := context.WithCancel(context.Background())

return &BackgroundWorkers{
s: store,
d: dispatcher,
l: logger.With(slog.String("module", "background workers")),

ctx: ctx,
cancelAll: cancel,
}
}

func (w *BackgroundWorkers) StartCallbackStoreCleanup(interval, olderThanDuration time.Duration) {
w.workersWg.Add(1)
go w.pruneCallbacks(interval, olderThanDuration)
}

func (w *BackgroundWorkers) StartQuarantineCallbacksDispatch(interval time.Duration) {
w.workersWg.Add(1)
go w.dispatchQuarantineCallbacks(interval)
}

func (w *BackgroundWorkers) GracefulStop() {
w.cancelAll()
w.workersWg.Wait()
}

func (w *BackgroundWorkers) pruneCallbacks(interval, olderThanDuration time.Duration) {
ctx := context.Background()
t := time.NewTicker(interval)

for {
select {
case <-t.C:
n := time.Now()
midnight := time.Date(n.Year(), n.Month(), n.Day(), 0, 0, 0, 0, time.UTC)
olderThan := midnight.Add(-1 * olderThanDuration)

err := w.s.DeleteFailedOlderThan(ctx, olderThan)
if err != nil {
w.l.Error("failed to delete old callbacks in quarantine", slog.String("err", err.Error()))
}

case <-w.ctx.Done():
w.workersWg.Done()
return
}
}
}

func (w *BackgroundWorkers) dispatchQuarantineCallbacks(interval time.Duration) {
const batchSize = 100

ctx := context.Background()
t := time.NewTicker(interval)

for {
select {
case <-t.C:
callbacks, err := w.s.PopFailedMany(ctx, time.Now(), batchSize)
if err != nil {
w.l.Error("reading callbacks from store failed", slog.String("err", err.Error()))
continue
}

if len(callbacks) == 0 {
continue
}

for _, c := range callbacks {
w.d.Dispatch(c.Url, toCallbackEntry(c))
}

case <-w.ctx.Done():
w.workersWg.Done()
return
}
}
}

func toCallbackEntry(dto *store.CallbackData) *CallbackEntry {
d := &Callback{
Timestamp: dto.Timestamp,

CompetingTxs: dto.CompetingTxs,
TxID: dto.TxID,
TxStatus: dto.TxStatus,
ExtraInfo: dto.ExtraInfo,
MerklePath: dto.MerklePath,

BlockHash: dto.BlockHash,
BlockHeight: dto.BlockHeight,
}

return &CallbackEntry{
Token: dto.Token,
Data: d,
postponedUntil: dto.PostponedUntil,
}
}
2 changes: 1 addition & 1 deletion internal/callbacker/callbacker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
)

type CallbackerI interface {
Send(url, token string, callback *Callback)
Send(url, token string, callback *Callback) bool
Health() error
}

Expand Down
Loading
Loading