Skip to content

Commit

Permalink
ARCO-183: put problematic callback receivers to quarantine
Browse files Browse the repository at this point in the history
  • Loading branch information
arkadiuszos4chain committed Sep 12, 2024
1 parent 21cc7a5 commit 3f6a3c1
Show file tree
Hide file tree
Showing 25 changed files with 1,957 additions and 360 deletions.
138 changes: 95 additions & 43 deletions cmd/arc/services/callbacker.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmd

import (
"context"
"fmt"
"log/slog"
"net"
Expand All @@ -24,10 +25,10 @@ func StartCallbacker(logger *slog.Logger, appConfig *config.ArcConfig) (func(),
config := appConfig.Callbacker

var (
store store.CallbackerStore
sender *callbacker.CallbackSender
dispatcher *callbacker.CallbackDispatcher

store store.CallbackerStore
sender *callbacker.CallbackSender
dispatcher *callbacker.CallbackDispatcher
workers *callbacker.BackgroundWorkers
server *callbacker.Server
healthServer *grpc.Server

Expand All @@ -36,38 +37,11 @@ func StartCallbacker(logger *slog.Logger, appConfig *config.ArcConfig) (func(),

stopFn := func() {
logger.Info("Shutting down callbacker")

// dispose of dependencies in the correct order:
// 1. server - ensure no new callbacks will be received
// 2. dispatcher - ensure all already accepted callbacks are proccessed
// 3. sender - finally, stop the sender as there are no callbacks left to send.
// 4. store

if server != nil {
server.GracefulStop()
}
if dispatcher != nil {
dispatcher.GracefulStop()
}
if sender != nil {
sender.GracefulStop()
}

if store != nil {
err := store.Close()
if err != nil {
logger.Error("Could not close the store", slog.String("err", err.Error()))
}
}

if healthServer != nil {
healthServer.Stop()
}

dispose(logger, server, workers, dispatcher, sender, store, healthServer)
logger.Info("Shutted down")
}

store, err = NewStore(config.Db)
store, err = newStore(config.Db)
if err != nil {
return nil, fmt.Errorf("failed to create callbacker store: %v", err)
}
Expand All @@ -78,22 +52,25 @@ func StartCallbacker(logger *slog.Logger, appConfig *config.ArcConfig) (func(),
return nil, fmt.Errorf("failed to create callback sender: %v", err)
}

dispatcher = callbacker.NewCallbackDispatcher(sender, store, config.Pause)
logger.Info("Init callback dispatcher, add to processing abandoned callbacks")
err = dispatcher.Init()
dispatcher = callbacker.NewCallbackDispatcher(sender, store, logger, config.Pause, config.QuarantinePolicy.BaseDuration, config.QuarantinePolicy.AbandonAfter)
err = dispatchPersistedCallbacks(store, dispatcher, logger)
if err != nil {
stopFn()
return nil, fmt.Errorf("failed to init callback dispatcher, couldn't process all abandoned callbacks: %v", err)
return nil, fmt.Errorf("failed to dispatch previously persisted callbacks: %v", err)
}

workers = callbacker.NewBackgroundWorkers(store, dispatcher, logger)
workers.StartCallbackStoreCleanup(config.PruneInterval, config.PruneOlderThan)
workers.StartQuarantineCallbacksDispatch(config.QuarantineCheckInterval)

server = callbacker.NewServer(dispatcher, callbacker.WithLogger(logger.With(slog.String("module", "server"))))
err = server.Serve(config.ListenAddr, appConfig.GrpcMessageSize, appConfig.PrometheusEndpoint)
if err != nil {
stopFn()
return nil, fmt.Errorf("GRPCServer failed: %v", err)
}

healthServer, err = StartHealthServerCallbacker(server, config.Health, logger)
healthServer, err = startHealthServerCallbacker(server, config.Health, logger)
if err != nil {
stopFn()
return nil, fmt.Errorf("failed to start health server: %v", err)
Expand All @@ -103,16 +80,16 @@ func StartCallbacker(logger *slog.Logger, appConfig *config.ArcConfig) (func(),
return stopFn, nil
}

func NewStore(dbConfig *config.DbConfig) (s store.CallbackerStore, err error) {
func newStore(dbConfig *config.DbConfig) (s store.CallbackerStore, err error) {
switch dbConfig.Mode {
case DbModePostgres:
postgres := dbConfig.Postgres
config := dbConfig.Postgres

dbInfo := fmt.Sprintf(
"user=%s password=%s dbname=%s host=%s port=%d sslmode=%s",
postgres.User, postgres.Password, postgres.Name, postgres.Host, postgres.Port, postgres.SslMode,
config.User, config.Password, config.Name, config.Host, config.Port, config.SslMode,
)
s, err = postgresql.New(dbInfo, postgres.MaxIdleConns, postgres.MaxOpenConns)
s, err = postgresql.New(dbInfo, config.MaxIdleConns, config.MaxOpenConns)
if err != nil {
return nil, fmt.Errorf("failed to open postgres DB: %v", err)
}
Expand All @@ -123,7 +100,25 @@ func NewStore(dbConfig *config.DbConfig) (s store.CallbackerStore, err error) {
return s, err
}

func StartHealthServerCallbacker(serv *callbacker.Server, healthConfig *config.HealthConfig, logger *slog.Logger) (*grpc.Server, error) {
func dispatchPersistedCallbacks(s store.CallbackerStore, d *callbacker.CallbackDispatcher, l *slog.Logger) error {
l.Info("Dispatch persited callbacks")

const batchSize = 100
ctx := context.Background()

for {
callbacks, err := s.PopMany(ctx, batchSize)
if err != nil || len(callbacks) == 0 {
return err
}

for _, c := range callbacks {
d.Dispatch(c.Url, toCallbackEntry(c))
}
}
}

func startHealthServerCallbacker(serv *callbacker.Server, healthConfig *config.HealthConfig, logger *slog.Logger) (*grpc.Server, error) {
gs := grpc.NewServer()

grpc_health_v1.RegisterHealthServer(gs, serv) // registration
Expand All @@ -145,3 +140,60 @@ func StartHealthServerCallbacker(serv *callbacker.Server, healthConfig *config.H

return gs, nil
}

func dispose(l *slog.Logger, server *callbacker.Server, workers *callbacker.BackgroundWorkers,
dispatcher *callbacker.CallbackDispatcher, sender *callbacker.CallbackSender,
store store.CallbackerStore, healthServer *grpc.Server) {

// dispose of dependencies in the correct order:
// 1. server - ensure no new callbacks will be received
// 2. background workers - ensure no callbacks from background will be accepted
// 3. dispatcher - ensure all already accepted callbacks are proccessed
// 4. sender - finally, stop the sender as there are no callbacks left to send.
// 5. store

if server != nil {
server.GracefulStop()
}
if workers != nil {
workers.GracefulStop()
}
if dispatcher != nil {
dispatcher.GracefulStop()
}
if sender != nil {
sender.GracefulStop()
}

if store != nil {
err := store.Close()
if err != nil {
l.Error("Could not close the store", slog.String("err", err.Error()))
}
}

if healthServer != nil {
healthServer.Stop()
}

}

func toCallbackEntry(dto *store.CallbackData) *callbacker.CallbackEntry {
d := &callbacker.Callback{
Timestamp: dto.Timestamp,

CompetingTxs: dto.CompetingTxs,
TxID: dto.TxID,
TxStatus: dto.TxStatus,
ExtraInfo: dto.ExtraInfo,
MerklePath: dto.MerklePath,

BlockHash: dto.BlockHash,
BlockHeight: dto.BlockHeight,
}

return &callbacker.CallbackEntry{
Token: dto.Token,
Data: d,
}
}
19 changes: 14 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,18 @@ type K8sWatcherConfig struct {
}

type CallbackerConfig struct {
ListenAddr string `mapstructure:"listenAddr"`
DialAddr string `mapstructure:"dialAddr"`
Health *HealthConfig `mapstructure:"health"`
Pause time.Duration `mapstructure:"pause"`
Db *DbConfig `mapstructure:"db"`
ListenAddr string `mapstructure:"listenAddr"`
DialAddr string `mapstructure:"dialAddr"`
Health *HealthConfig `mapstructure:"health"`
Pause time.Duration `mapstructure:"pause"`
Db *DbConfig `mapstructure:"db"`
PruneInterval time.Duration `mapstructure:"pruneInterval"`
PruneOlderThan time.Duration `mapstructure:"pruneOlderThan"`
QuarantineCheckInterval time.Duration `mapstructure:"quarantineCheckInterval"`
QuarantinePolicy *CallbackerQuarantinePolicy `mapstructure:"quarantinePolicy"`
}

type CallbackerQuarantinePolicy struct {
BaseDuration time.Duration `mapstructure:"baseDuration"`
AbandonAfter time.Duration `mapstructure:"abandonAfter"`
}
11 changes: 9 additions & 2 deletions config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,14 @@ func getCallbackerConfig() *CallbackerConfig {
Health: &HealthConfig{
SeverDialAddr: "localhost:8025",
},
Pause: 0,
Db: getDbConfig("callbacker"),
Pause: 0,
Db: getDbConfig("callbacker"),
PruneInterval: 24 * time.Hour,
PruneOlderThan: 14 * 24 * time.Hour,
QuarantineCheckInterval: time.Minute,
QuarantinePolicy: &CallbackerQuarantinePolicy{
BaseDuration: 10 * time.Minute,
AbandonAfter: 24 * time.Hour,
},
}
}
6 changes: 6 additions & 0 deletions config/example_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,10 @@ callbacker:
maxIdleConns: 10 # maximum idle connections
maxOpenConns: 80 # maximum open connections
sslMode: disable
pruneInterval: 24h # interval at which old or failed callbacks are pruned from the store
pruneOlderThan: 336h # age threshold for pruning callbacks (older than this value will be removed)
quarantineCheckInterval: 1m # interval at which the store is checked for quarantined callbacks to be re-sent
quarantinePolicy:
baseDuration: 5m # initial duration a callback and its receiver is quarantined after failure
abandonAfter: 24h # maximum time a callback can remain unsent before it's abandoned

122 changes: 122 additions & 0 deletions internal/callbacker/background_workers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package callbacker

import (
"context"
"log/slog"
"sync"
"time"

"github.com/bitcoin-sv/arc/internal/callbacker/store"
)

type BackgroundWorkers struct {
s store.CallbackerStore
l *slog.Logger
d *CallbackDispatcher

workersWg sync.WaitGroup
ctx context.Context
cancelAll func()
}

func NewBackgroundWorkers(store store.CallbackerStore, dispatcher *CallbackDispatcher, logger *slog.Logger) *BackgroundWorkers {
ctx, cancel := context.WithCancel(context.Background())

return &BackgroundWorkers{
s: store,
d: dispatcher,
l: logger.With(slog.String("module", "background workers")),

ctx: ctx,
cancelAll: cancel,
}
}

func (w *BackgroundWorkers) StartCallbackStoreCleanup(interval, olderThanDuration time.Duration) {
w.workersWg.Add(1)
go w.pruneCallbacks(interval, olderThanDuration)
}

func (w *BackgroundWorkers) StartQuarantineCallbacksDispatch(interval time.Duration) {
w.workersWg.Add(1)
go w.dispatchQuarantineCallbacks(interval)
}

func (w *BackgroundWorkers) GracefulStop() {
w.cancelAll()
w.workersWg.Wait()
}

func (w *BackgroundWorkers) pruneCallbacks(interval, olderThanDuration time.Duration) {
ctx := context.Background()
t := time.NewTicker(interval)

for {
select {
case <-t.C:
n := time.Now()
midnight := time.Date(n.Year(), n.Month(), n.Day(), 0, 0, 0, 0, time.UTC)
olderThan := midnight.Add(-1 * olderThanDuration)

err := w.s.DeleteFailedOlderThan(ctx, olderThan)
if err != nil {
w.l.Error("failed to delete old callbacks in quarantine", slog.String("err", err.Error()))
}

case <-w.ctx.Done():
w.workersWg.Done()
return
}
}
}

func (w *BackgroundWorkers) dispatchQuarantineCallbacks(interval time.Duration) {
const batchSize = 100

ctx := context.Background()
t := time.NewTicker(interval)

for {
select {
case <-t.C:
callbacks, err := w.s.PopFailedMany(ctx, time.Now(), batchSize)
if err != nil {
w.l.Error("reading callbacks from store failed", slog.String("err", err.Error()))
continue
}

if len(callbacks) == 0 {
continue
}

for _, c := range callbacks {
w.d.Dispatch(c.Url, toCallbackEntry(c))
}

case <-w.ctx.Done():
w.workersWg.Done()
return
}
}
}

func toCallbackEntry(dto *store.CallbackData) *CallbackEntry {
d := &Callback{
Timestamp: dto.Timestamp,

CompetingTxs: dto.CompetingTxs,
TxID: dto.TxID,
TxStatus: dto.TxStatus,
ExtraInfo: dto.ExtraInfo,
MerklePath: dto.MerklePath,

BlockHash: dto.BlockHash,
BlockHeight: dto.BlockHeight,
}

return &CallbackEntry{
Token: dto.Token,
Data: d,
quarantineUntil: dto.QuarantineUntil,
}
}
2 changes: 1 addition & 1 deletion internal/callbacker/callbacker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
)

type CallbackerI interface {
Send(url, token string, callback *Callback)
Send(url, token string, callback *Callback) bool
Health() error
}

Expand Down
Loading

0 comments on commit 3f6a3c1

Please sign in to comment.