Skip to content

Commit

Permalink
feat(ARCO-291): Ordered callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
boecklim committed Jan 23, 2025
1 parent 39c062f commit 0c414d7
Show file tree
Hide file tree
Showing 42 changed files with 848 additions and 2,423 deletions.
52 changes: 21 additions & 31 deletions cmd/arc/services/callbacker.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (

"github.com/bitcoin-sv/arc/config"
"github.com/bitcoin-sv/arc/internal/callbacker"
"github.com/bitcoin-sv/arc/internal/callbacker/store"
"github.com/bitcoin-sv/arc/internal/callbacker/send_manager"
"github.com/bitcoin-sv/arc/internal/callbacker/store/postgresql"
"github.com/bitcoin-sv/arc/internal/grpc_opts"
"github.com/bitcoin-sv/arc/internal/message_queue/nats/client/nats_jetstream"
Expand All @@ -48,7 +48,6 @@ func StartCallbacker(logger *slog.Logger, arcConfig *config.ArcConfig) (func(),
callbackerStore *postgresql.PostgreSQL
sender *callbacker.CallbackSender
dispatcher *callbacker.CallbackDispatcher
workers *callbacker.BackgroundWorkers
server *callbacker.Server
healthServer *grpc_opts.GrpcServer
mqClient callbacker.MessageQueueClient
Expand All @@ -58,7 +57,7 @@ func StartCallbacker(logger *slog.Logger, arcConfig *config.ArcConfig) (func(),

stopFn := func() {
logger.Info("Shutting down callbacker")
dispose(logger, server, workers, dispatcher, sender, callbackerStore, healthServer, processor, mqClient)
dispose(logger, server, dispatcher, sender, callbackerStore, healthServer, processor, mqClient)
logger.Info("Shutdown complete")
}

Expand All @@ -73,24 +72,18 @@ func StartCallbacker(logger *slog.Logger, arcConfig *config.ArcConfig) (func(),
return nil, fmt.Errorf("failed to create callback sender: %v", err)
}

sendConfig := callbacker.SendConfig{
Expiration: cfg.Expiration,
Delay: cfg.Delay,
DelayDuration: cfg.DelayDuration,
PauseAfterSingleModeSuccessfulSend: cfg.Pause,
BatchSendInterval: cfg.BatchSendInterval,
}
runNewManager := func(url string) callbacker.SendManagerI {
manager := send_manager.New(url, sender, callbackerStore, logger,
send_manager.WithQueueProcessInterval(cfg.Pause),
send_manager.WithBatchSendInterval(cfg.BatchSendInterval),
send_manager.WithExpiration(cfg.Expiration),
)
manager.Start()

dispatcher = callbacker.NewCallbackDispatcher(sender, callbackerStore, logger, &sendConfig)
workers = callbacker.NewBackgroundWorkers(callbackerStore, dispatcher, logger)
err = workers.DispatchPersistedCallbacks()
if err != nil {
stopFn()
return nil, fmt.Errorf("failed to dispatch previously persisted callbacks: %v", err)
return manager
}

workers.StartCallbackStoreCleanup(cfg.PruneInterval, cfg.PruneOlderThan)
workers.StartFailedCallbacksDispatch(cfg.FailedCallbackCheckInterval)
dispatcher = callbacker.NewCallbackDispatcher(sender, runNewManager)

natsConnection, err := nats_connection.New(arcConfig.MessageQueue.URL, logger)
if err != nil {
Expand Down Expand Up @@ -119,6 +112,9 @@ func StartCallbacker(logger *slog.Logger, arcConfig *config.ArcConfig) (func(),
return nil, err
}

processor.StartCallbackStoreCleanup(cfg.PruneInterval, cfg.PruneOlderThan)
processor.DispatchPersistedCallbacks()

err = processor.Start()
if err != nil {
stopFn()
Expand Down Expand Up @@ -168,37 +164,31 @@ func newStore(dbConfig *config.DbConfig) (s *postgresql.PostgreSQL, err error) {
return s, err
}

func dispose(l *slog.Logger, server *callbacker.Server, workers *callbacker.BackgroundWorkers,
func dispose(l *slog.Logger, server *callbacker.Server,
dispatcher *callbacker.CallbackDispatcher, sender *callbacker.CallbackSender,
store store.CallbackerStore, healthServer *grpc_opts.GrpcServer, processor *callbacker.Processor, mqClient callbacker.MessageQueueClient) {
store *postgresql.PostgreSQL, healthServer *grpc_opts.GrpcServer, processor *callbacker.Processor, mqClient callbacker.MessageQueueClient) {
// dispose the dependencies in the correct order:
// 1. server - ensure no new callbacks will be received
// 2. background workers - ensure no callbacks from background will be accepted
// 3. dispatcher - ensure all already accepted callbacks are proccessed
// 2. dispatcher - ensure all already accepted callbacks are processed
// 3. processor - remove all URL mappings
// 4. sender - finally, stop the sender as there are no callbacks left to send
// 5. store

if server != nil {
server.GracefulStop()
}
if workers != nil {
workers.GracefulStop()
}
if dispatcher != nil {
dispatcher.GracefulStop()
}
if sender != nil {
sender.GracefulStop()
}

if processor != nil {
processor.GracefulStop()
}

if sender != nil {
sender.GracefulStop()
}
if mqClient != nil {
mqClient.Shutdown()
}

if store != nil {
err := store.Close()
if err != nil {
Expand Down
21 changes: 9 additions & 12 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,16 +195,13 @@ type K8sWatcherConfig struct {
}

type CallbackerConfig struct {
ListenAddr string `mapstructure:"listenAddr"`
DialAddr string `mapstructure:"dialAddr"`
Health *HealthConfig `mapstructure:"health"`
Delay time.Duration `mapstructure:"delay"`
Pause time.Duration `mapstructure:"pause"`
BatchSendInterval time.Duration `mapstructure:"batchSendInterval"`
Db *DbConfig `mapstructure:"db"`
PruneInterval time.Duration `mapstructure:"pruneInterval"`
PruneOlderThan time.Duration `mapstructure:"pruneOlderThan"`
DelayDuration time.Duration `mapstructure:"delayDuration"`
FailedCallbackCheckInterval time.Duration `mapstructure:"failedCallbackCheckInterval"`
Expiration time.Duration `mapstructure:"expiration"`
ListenAddr string `mapstructure:"listenAddr"`
DialAddr string `mapstructure:"dialAddr"`
Health *HealthConfig `mapstructure:"health"`
Pause time.Duration `mapstructure:"pause"`
BatchSendInterval time.Duration `mapstructure:"batchSendInterval"`
PruneOlderThan time.Duration `mapstructure:"pruneOlderThan"`
PruneInterval time.Duration `mapstructure:"pruneInterval"`
Expiration time.Duration `mapstructure:"expiration"`
Db *DbConfig `mapstructure:"db"`
}
14 changes: 6 additions & 8 deletions config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,14 +192,12 @@ func getCallbackerConfig() *CallbackerConfig {
Health: &HealthConfig{
SeverDialAddr: "localhost:8025",
},
Delay: 0,
Pause: 0,
BatchSendInterval: time.Duration(5 * time.Second),
Db: getDbConfig("callbacker"),
PruneInterval: 24 * time.Hour,
PruneOlderThan: 14 * 24 * time.Hour,
FailedCallbackCheckInterval: time.Minute,
Expiration: 24 * time.Hour,
Pause: 0,
BatchSendInterval: 5 * time.Second,
PruneOlderThan: 14 * 24 * time.Hour,
PruneInterval: 24 * time.Hour,
Expiration: 24 * time.Hour,
Db: getDbConfig("callbacker"),
}
}

Expand Down
11 changes: 4 additions & 7 deletions config/example_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,11 @@ callbacker:
dialAddr: localhost:8021 # address for other services to dial callbacker service
health:
serverDialAddr: localhost:8025 # address at which the grpc health server is exposed
delay: 0s # delay before the callback (or batch of callbacks) is actually sent
pause: 0s # pause between sending next callback to the same receiver
pause: 1s # pause between sending next callback to the same receiver - must be greater 0s
batchSendInterval: 5s # interval at witch batched callbacks are send (default 5s)
pruneOlderThan: 336h # age threshold for pruning callbacks (older than this value will be removed)
pruneInterval: 24h # interval at which old or failed callbacks are pruned from the store
expiration: 24h # maximum time a callback can remain unsent before it's put as permanently failed
db:
mode: postgres # db mode indicates which db to use. At the moment only postgres is offered
postgres: # postgres db configuration in case that mode: postgres
Expand All @@ -170,8 +172,3 @@ callbacker:
maxIdleConns: 10 # maximum idle connections
maxOpenConns: 80 # maximum open connections
sslMode: disable
pruneInterval: 24h # interval at which old or failed callbacks are pruned from the store
pruneOlderThan: 336h # age threshold for pruning callbacks (older than this value will be removed)
failedCallbackCheckInterval: 1m # interval at which the store is checked for failed callbacks to be re-sent
delayDuration: 5s # we try callbacks a few times with this delay after which if it fails consistently we store them in db
expiration: 24h # maximum time a callback can remain unsent before it's put as permanently failed
3 changes: 1 addition & 2 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ services:
condition: service_healthy
migrate-blocktx:
condition: service_completed_successfully

healthcheck:
test: ["CMD", "/bin/grpc_health_probe", "-addr=:8006", "-service=liveness", "-rpc-timeout=5s"]
interval: 10s
Expand All @@ -200,7 +199,7 @@ services:
migrate-callbacker:
condition: service_completed_successfully
healthcheck:
test: ["CMD", "/bin/grpc_health_probe", "-addr=:8022", "-service=liveness", "-rpc-timeout=5s"]
test: ["CMD", "/bin/grpc_health_probe", "-addr=:8025", "-service=liveness", "-rpc-timeout=5s"]
interval: 10s
timeout: 5s
retries: 3
Expand Down
2 changes: 1 addition & 1 deletion internal/blocktx/blocktx_api/blocktx_api.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/blocktx/blocktx_api/blocktx_api_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

160 changes: 0 additions & 160 deletions internal/callbacker/background_workers.go

This file was deleted.

Loading

0 comments on commit 0c414d7

Please sign in to comment.