Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARCO-183: implement quarantine for problematic callback receivers #587

Merged
merged 5 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ All notable changes to this project will be documented in this file. The format
### Changed
- Callbacks are sent one by one to the same URL. In the previous implementation, each callback request created a new goroutine to send the callback, which could result in a potential DDoS of the callback receiver. The new approach sends callbacks to the same receiver in a serial manner. Note that URLs are not locked by the `callbacker` instance, so serial sends occur only within a single instance. In other words, the level of parallelism is determined by the number of `callbacker` instances.

- The Callbacker service handles unsuccessful callback attempts by placing problematic receivers in quarantine, temporarily pausing callback delivery to them.

## [1.3.0] - 2024-08-21

### Changed
Expand Down
160 changes: 117 additions & 43 deletions cmd/arc/services/callbacker.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,29 @@
package cmd

/* Callbacker Service */
/*

This service manages the sending and storage of callbacks, with a persistent storage backend using PostgreSQL.
It starts by checking the storage for any unsent callbacks and passing them to the callback dispatcher.

Key components:
- PostgreSQL DB: used for persistent storage of callbacks
- callback dispatcher: responsible for dispatching callbacks to sender
- callback sender: responsible for sending callbacks
- background tasks:
- periodically cleans up old, unsent callbacks from storage
- periodically checks the storage for callbacks in quarantine (temporary ban) and re-attempts dispatch after the quarantine period
- gRPC server with endpoints:
- Health: provides a health check endpoint for the service
- SendCallback: receives and processes new callback requests

Startup routine: on service start, checks the storage for pending callbacks and dispatches them if needed
Graceful Shutdown: on service termination, all components are stopped gracefully, ensuring that any unprocessed callbacks are persisted in the database for future processing.

*/

import (
"context"
"fmt"
"log/slog"
"net"
Expand All @@ -24,10 +47,10 @@ func StartCallbacker(logger *slog.Logger, appConfig *config.ArcConfig) (func(),
config := appConfig.Callbacker

var (
store store.CallbackerStore
sender *callbacker.CallbackSender
dispatcher *callbacker.CallbackDispatcher

store store.CallbackerStore
sender *callbacker.CallbackSender
dispatcher *callbacker.CallbackDispatcher
workers *callbacker.BackgroundWorkers
server *callbacker.Server
healthServer *grpc.Server

Expand All @@ -36,38 +59,11 @@ func StartCallbacker(logger *slog.Logger, appConfig *config.ArcConfig) (func(),

stopFn := func() {
logger.Info("Shutting down callbacker")

// dispose of dependencies in the correct order:
// 1. server - ensure no new callbacks will be received
// 2. dispatcher - ensure all already accepted callbacks are proccessed
// 3. sender - finally, stop the sender as there are no callbacks left to send.
// 4. store

if server != nil {
server.GracefulStop()
}
if dispatcher != nil {
dispatcher.GracefulStop()
}
if sender != nil {
sender.GracefulStop()
}

if store != nil {
err := store.Close()
if err != nil {
logger.Error("Could not close the store", slog.String("err", err.Error()))
}
}

if healthServer != nil {
healthServer.Stop()
}

dispose(logger, server, workers, dispatcher, sender, store, healthServer)
logger.Info("Shutted down")
}

store, err = NewStore(config.Db)
store, err = newStore(config.Db)
if err != nil {
return nil, fmt.Errorf("failed to create callbacker store: %v", err)
}
Expand All @@ -78,22 +74,25 @@ func StartCallbacker(logger *slog.Logger, appConfig *config.ArcConfig) (func(),
return nil, fmt.Errorf("failed to create callback sender: %v", err)
}

dispatcher = callbacker.NewCallbackDispatcher(sender, store, config.Pause)
logger.Info("Init callback dispatcher, add to processing abandoned callbacks")
err = dispatcher.Init()
dispatcher = callbacker.NewCallbackDispatcher(sender, store, logger, config.Pause, config.QuarantinePolicy.BaseDuration, config.QuarantinePolicy.PermQuarantineAfter)
err = dispatchPersistedCallbacks(store, dispatcher, logger)
if err != nil {
stopFn()
return nil, fmt.Errorf("failed to init callback dispatcher, couldn't process all abandoned callbacks: %v", err)
return nil, fmt.Errorf("failed to dispatch previously persisted callbacks: %v", err)
}

workers = callbacker.NewBackgroundWorkers(store, dispatcher, logger)
workers.StartCallbackStoreCleanup(config.PruneInterval, config.PruneOlderThan)
workers.StartQuarantineCallbacksDispatch(config.QuarantineCheckInterval)

server = callbacker.NewServer(dispatcher, callbacker.WithLogger(logger.With(slog.String("module", "server"))))
err = server.Serve(config.ListenAddr, appConfig.GrpcMessageSize, appConfig.PrometheusEndpoint)
if err != nil {
stopFn()
return nil, fmt.Errorf("GRPCServer failed: %v", err)
}

healthServer, err = StartHealthServerCallbacker(server, config.Health, logger)
healthServer, err = startHealthServerCallbacker(server, config.Health, logger)
if err != nil {
stopFn()
return nil, fmt.Errorf("failed to start health server: %v", err)
Expand All @@ -103,16 +102,16 @@ func StartCallbacker(logger *slog.Logger, appConfig *config.ArcConfig) (func(),
return stopFn, nil
}

func NewStore(dbConfig *config.DbConfig) (s store.CallbackerStore, err error) {
func newStore(dbConfig *config.DbConfig) (s store.CallbackerStore, err error) {
switch dbConfig.Mode {
case DbModePostgres:
postgres := dbConfig.Postgres
config := dbConfig.Postgres

dbInfo := fmt.Sprintf(
"user=%s password=%s dbname=%s host=%s port=%d sslmode=%s",
postgres.User, postgres.Password, postgres.Name, postgres.Host, postgres.Port, postgres.SslMode,
config.User, config.Password, config.Name, config.Host, config.Port, config.SslMode,
)
s, err = postgresql.New(dbInfo, postgres.MaxIdleConns, postgres.MaxOpenConns)
s, err = postgresql.New(dbInfo, config.MaxIdleConns, config.MaxOpenConns)
if err != nil {
return nil, fmt.Errorf("failed to open postgres DB: %v", err)
}
Expand All @@ -123,7 +122,25 @@ func NewStore(dbConfig *config.DbConfig) (s store.CallbackerStore, err error) {
return s, err
}

func StartHealthServerCallbacker(serv *callbacker.Server, healthConfig *config.HealthConfig, logger *slog.Logger) (*grpc.Server, error) {
func dispatchPersistedCallbacks(s store.CallbackerStore, d *callbacker.CallbackDispatcher, l *slog.Logger) error {
l.Info("Dispatch persited callbacks")

const batchSize = 100
ctx := context.Background()

for {
callbacks, err := s.PopMany(ctx, batchSize)
if err != nil || len(callbacks) == 0 {
return err
}

for _, c := range callbacks {
d.Dispatch(c.Url, toCallbackEntry(c))
}
}
}

func startHealthServerCallbacker(serv *callbacker.Server, healthConfig *config.HealthConfig, logger *slog.Logger) (*grpc.Server, error) {
gs := grpc.NewServer()

grpc_health_v1.RegisterHealthServer(gs, serv) // registration
Expand All @@ -145,3 +162,60 @@ func StartHealthServerCallbacker(serv *callbacker.Server, healthConfig *config.H

return gs, nil
}

func dispose(l *slog.Logger, server *callbacker.Server, workers *callbacker.BackgroundWorkers,
dispatcher *callbacker.CallbackDispatcher, sender *callbacker.CallbackSender,
store store.CallbackerStore, healthServer *grpc.Server) {

// dispose of dependencies in the correct order:
// 1. server - ensure no new callbacks will be received
// 2. background workers - ensure no callbacks from background will be accepted
// 3. dispatcher - ensure all already accepted callbacks are proccessed
// 4. sender - finally, stop the sender as there are no callbacks left to send
// 5. store

if server != nil {
server.GracefulStop()
}
if workers != nil {
workers.GracefulStop()
}
if dispatcher != nil {
dispatcher.GracefulStop()
}
if sender != nil {
sender.GracefulStop()
}

if store != nil {
err := store.Close()
if err != nil {
l.Error("Could not close the store", slog.String("err", err.Error()))
}
}

if healthServer != nil {
healthServer.Stop()
}

}

func toCallbackEntry(dto *store.CallbackData) *callbacker.CallbackEntry {
d := &callbacker.Callback{
Timestamp: dto.Timestamp,

CompetingTxs: dto.CompetingTxs,
TxID: dto.TxID,
TxStatus: dto.TxStatus,
ExtraInfo: dto.ExtraInfo,
MerklePath: dto.MerklePath,

BlockHash: dto.BlockHash,
BlockHeight: dto.BlockHeight,
}

return &callbacker.CallbackEntry{
Token: dto.Token,
Data: d,
}
}
19 changes: 14 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,18 @@ type K8sWatcherConfig struct {
}

type CallbackerConfig struct {
ListenAddr string `mapstructure:"listenAddr"`
DialAddr string `mapstructure:"dialAddr"`
Health *HealthConfig `mapstructure:"health"`
Pause time.Duration `mapstructure:"pause"`
Db *DbConfig `mapstructure:"db"`
ListenAddr string `mapstructure:"listenAddr"`
DialAddr string `mapstructure:"dialAddr"`
Health *HealthConfig `mapstructure:"health"`
Pause time.Duration `mapstructure:"pause"`
Db *DbConfig `mapstructure:"db"`
PruneInterval time.Duration `mapstructure:"pruneInterval"`
PruneOlderThan time.Duration `mapstructure:"pruneOlderThan"`
QuarantineCheckInterval time.Duration `mapstructure:"quarantineCheckInterval"`
QuarantinePolicy *CallbackerQuarantinePolicy `mapstructure:"quarantinePolicy"`
}

type CallbackerQuarantinePolicy struct {
BaseDuration time.Duration `mapstructure:"baseDuration"`
PermQuarantineAfter time.Duration `mapstructure:"permQuarantineAfter"`
}
11 changes: 9 additions & 2 deletions config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,14 @@ func getCallbackerConfig() *CallbackerConfig {
Health: &HealthConfig{
SeverDialAddr: "localhost:8025",
},
Pause: 0,
Db: getDbConfig("callbacker"),
Pause: 0,
Db: getDbConfig("callbacker"),
PruneInterval: 24 * time.Hour,
PruneOlderThan: 14 * 24 * time.Hour,
QuarantineCheckInterval: time.Minute,
QuarantinePolicy: &CallbackerQuarantinePolicy{
BaseDuration: 10 * time.Minute,
PermQuarantineAfter: 24 * time.Hour,
},
}
}
6 changes: 6 additions & 0 deletions config/example_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -137,4 +137,10 @@ callbacker:
maxIdleConns: 10 # maximum idle connections
maxOpenConns: 80 # maximum open connections
sslMode: disable
pruneInterval: 24h # interval at which old or failed callbacks are pruned from the store
pruneOlderThan: 336h # age threshold for pruning callbacks (older than this value will be removed)
quarantineCheckInterval: 1m # interval at which the store is checked for quarantined callbacks to be re-sent
quarantinePolicy:
baseDuration: 5m # initial duration a callback and its receiver is quarantined after failure
permQuarantineAfter: 24h # maximum time a callback can remain unsent before it's put in permanent quarantine

25 changes: 20 additions & 5 deletions doc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ API is the REST API microservice for interacting with ARC. See the [API document

The API takes care of validation and sending transactions to Metamorph. The API talks to one or more Metamorph instances using client-based, round-robin load balancing.

The `X-MaxTimeout` header determines the maximum number of seconds the system will wait for new transaction statuses before the response is returned. The default timeout is 5 seconds, with a maximum value of 30 seconds.

#### Validation

The API is the first component of ARC and therefore the one that by design derives a benefit for ARC performing a preliminar validation of transactions thanks to the use of the [extended transaction formats](#extended-format-ef-and-background-evaluation-extended-format-beef).
Expand All @@ -80,15 +82,24 @@ However, sending transactions in classic format is supported through the ARC API

When possible, the API is responsible for rejecting transactions that would be unacceptable to the Bitcoin network.

#### Callbacks

The client can register to receive callbacks with information about the statuses of submitted transactions. To do this, the client must include the `X-CallbackUrl` header in their request. Once registered, the ARC will send a `POST` request to the URL specified in the header, with the transaction ID included in the request body.

If the client wants to secure its callback endpoint, ARC supports Bearer token authorization. A callback token can be provided by adding the `X-CallbackToken: <your callback token>` header to the request.

By default, callbacks are triggered when the submitted transaction reaches the status `REJECTED` or `MINED`. If the client wishes to receive additional intermediate status updates—such (e.g. `SEEN_IN_ORPHAN_MEMPOOL` or `SEEN_ON_NETWORK`) the `X-FullStatusUpdates` header must be set to true. For more details, refer to the [API documentation](https://bitcoin-sv.github.io/arc/api.html).
For more details on how callbacks work, see the [Callbacker](#Callbacker) section.

### Metamorph

Metamorph is a microservice that is responsible for processing transactions sent by the API to the Bitcoin network. It takes care of re-sending transactions if they are not acknowledged by the network within a certain time period (60 seconds by default).

### Callbacker

Callbacker is a microservice that sends callbacks to a specified URL. To register a callback, the client must add the `X-CallbackUrl` header to the request. The callbacker will then send a POST request to the URL specified in the header, with the transaction ID in the body.
The Callbacker is a microservice responsible for handling all registered callbacks. It sends a `POST` request to the specified URL, including a `Bearer token` in the `Authorization` header when required.

The following example shows the format of a callback body
Below is an example of a callback request body:

```json
{
Expand All @@ -102,10 +113,14 @@ The following example shows the format of a callback body
}
```

A callback token can be added to the request by adding the header `X-CallbackToken: <your callback token>`. The respective callback will then have a header `Authorization: Bearer <your callback token>`.
To prevent DDoS attacks on callback receivers, the Callbacker service instance sends callbacks to the specified URLs in a serial (sequential) manner, ensuring only one request is sent at a time.

>NOTE: Typically, there are several instances of Callbacker, and each one works independently.

The Callbacker handles request retries and treats any HTTP status code outside the range of `200–299` as a failure. If the receiver fails to return a success status after a certain number of retries, it is placed in quarantine for a certain period. During this time, sending callbacks to the receiver is paused, and all callbacks are stored persistently in the Callbacker service to be retried later.

>NOTE: Callbacks that weren't successfully sent for an extended period (e.g., 24 hours) are no longer sent.

By default, callbacks are sent to the specified URL in case the submitted transaction has status `REJECTED` or `MINED`. In case the client wants to receive the intermediate status updates (`SEEN_IN_ORPHAN_MEMPOOL` and `SEEN_ON_NETWORK`) about the transaction, additionally the `X-FullStatusUpdates` header needs to be set to `true`. See the [API documentation](https://bitcoin-sv.github.io/arc/api.html) for more information.
`X-MaxTimeout` header determines maximum number of seconds to wait for transaction new statuses before request expires (default 5sec, max value 30s).

### BlockTx

Expand Down
Loading
Loading