diff --git a/CHANGELOG.md b/CHANGELOG.md index 7e9191917..d1e04642f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,8 @@ All notable changes to this project will be documented in this file. The format ### Changed - Callbacks are sent one by one to the same URL. In the previous implementation, each callback request created a new goroutine to send the callback, which could result in a potential DDoS of the callback receiver. The new approach sends callbacks to the same receiver in a serial manner. Note that URLs are not locked by the `callbacker` instance, so serial sends occur only within a single instance. In other words, the level of parallelism is determined by the number of `callbacker` instances. +- The Callbacker service handles unsuccessful callback attempts by placing problematic receivers in quarantine, temporarily pausing callback delivery to them. + ## [1.3.0] - 2024-08-21 ### Changed diff --git a/cmd/arc/services/callbacker.go b/cmd/arc/services/callbacker.go index 163276f4d..7d62da908 100644 --- a/cmd/arc/services/callbacker.go +++ b/cmd/arc/services/callbacker.go @@ -1,6 +1,29 @@ package cmd +/* Callbacker Service */ +/* + +This service manages the sending and storage of callbacks, with a persistent storage backend using PostgreSQL. +It starts by checking the storage for any unsent callbacks and passing them to the callback dispatcher. + +Key components: +- PostgreSQL DB: used for persistent storage of callbacks +- callback dispatcher: responsible for dispatching callbacks to sender +- callback sender: responsible for sending callbacks +- background tasks: + - periodically cleans up old, unsent callbacks from storage + - periodically checks the storage for callbacks in quarantine (temporary ban) and re-attempts dispatch after the quarantine period +- gRPC server with endpoints: + - Health: provides a health check endpoint for the service + - SendCallback: receives and processes new callback requests + +Startup routine: on service start, checks the storage for pending callbacks and dispatches them if needed +Graceful Shutdown: on service termination, all components are stopped gracefully, ensuring that any unprocessed callbacks are persisted in the database for future processing. + +*/ + import ( + "context" "fmt" "log/slog" "net" @@ -24,10 +47,10 @@ func StartCallbacker(logger *slog.Logger, appConfig *config.ArcConfig) (func(), config := appConfig.Callbacker var ( - store store.CallbackerStore - sender *callbacker.CallbackSender - dispatcher *callbacker.CallbackDispatcher - + store store.CallbackerStore + sender *callbacker.CallbackSender + dispatcher *callbacker.CallbackDispatcher + workers *callbacker.BackgroundWorkers server *callbacker.Server healthServer *grpc.Server @@ -36,38 +59,11 @@ func StartCallbacker(logger *slog.Logger, appConfig *config.ArcConfig) (func(), stopFn := func() { logger.Info("Shutting down callbacker") - - // dispose of dependencies in the correct order: - // 1. server - ensure no new callbacks will be received - // 2. dispatcher - ensure all already accepted callbacks are proccessed - // 3. sender - finally, stop the sender as there are no callbacks left to send. - // 4. store - - if server != nil { - server.GracefulStop() - } - if dispatcher != nil { - dispatcher.GracefulStop() - } - if sender != nil { - sender.GracefulStop() - } - - if store != nil { - err := store.Close() - if err != nil { - logger.Error("Could not close the store", slog.String("err", err.Error())) - } - } - - if healthServer != nil { - healthServer.Stop() - } - + dispose(logger, server, workers, dispatcher, sender, store, healthServer) logger.Info("Shutted down") } - store, err = NewStore(config.Db) + store, err = newStore(config.Db) if err != nil { return nil, fmt.Errorf("failed to create callbacker store: %v", err) } @@ -78,14 +74,17 @@ func StartCallbacker(logger *slog.Logger, appConfig *config.ArcConfig) (func(), return nil, fmt.Errorf("failed to create callback sender: %v", err) } - dispatcher = callbacker.NewCallbackDispatcher(sender, store, config.Pause) - logger.Info("Init callback dispatcher, add to processing abandoned callbacks") - err = dispatcher.Init() + dispatcher = callbacker.NewCallbackDispatcher(sender, store, logger, config.Pause, config.QuarantinePolicy.BaseDuration, config.QuarantinePolicy.PermQuarantineAfter) + err = dispatchPersistedCallbacks(store, dispatcher, logger) if err != nil { stopFn() - return nil, fmt.Errorf("failed to init callback dispatcher, couldn't process all abandoned callbacks: %v", err) + return nil, fmt.Errorf("failed to dispatch previously persisted callbacks: %v", err) } + workers = callbacker.NewBackgroundWorkers(store, dispatcher, logger) + workers.StartCallbackStoreCleanup(config.PruneInterval, config.PruneOlderThan) + workers.StartQuarantineCallbacksDispatch(config.QuarantineCheckInterval) + server = callbacker.NewServer(dispatcher, callbacker.WithLogger(logger.With(slog.String("module", "server")))) err = server.Serve(config.ListenAddr, appConfig.GrpcMessageSize, appConfig.PrometheusEndpoint) if err != nil { @@ -93,7 +92,7 @@ func StartCallbacker(logger *slog.Logger, appConfig *config.ArcConfig) (func(), return nil, fmt.Errorf("GRPCServer failed: %v", err) } - healthServer, err = StartHealthServerCallbacker(server, config.Health, logger) + healthServer, err = startHealthServerCallbacker(server, config.Health, logger) if err != nil { stopFn() return nil, fmt.Errorf("failed to start health server: %v", err) @@ -103,16 +102,16 @@ func StartCallbacker(logger *slog.Logger, appConfig *config.ArcConfig) (func(), return stopFn, nil } -func NewStore(dbConfig *config.DbConfig) (s store.CallbackerStore, err error) { +func newStore(dbConfig *config.DbConfig) (s store.CallbackerStore, err error) { switch dbConfig.Mode { case DbModePostgres: - postgres := dbConfig.Postgres + config := dbConfig.Postgres dbInfo := fmt.Sprintf( "user=%s password=%s dbname=%s host=%s port=%d sslmode=%s", - postgres.User, postgres.Password, postgres.Name, postgres.Host, postgres.Port, postgres.SslMode, + config.User, config.Password, config.Name, config.Host, config.Port, config.SslMode, ) - s, err = postgresql.New(dbInfo, postgres.MaxIdleConns, postgres.MaxOpenConns) + s, err = postgresql.New(dbInfo, config.MaxIdleConns, config.MaxOpenConns) if err != nil { return nil, fmt.Errorf("failed to open postgres DB: %v", err) } @@ -123,7 +122,25 @@ func NewStore(dbConfig *config.DbConfig) (s store.CallbackerStore, err error) { return s, err } -func StartHealthServerCallbacker(serv *callbacker.Server, healthConfig *config.HealthConfig, logger *slog.Logger) (*grpc.Server, error) { +func dispatchPersistedCallbacks(s store.CallbackerStore, d *callbacker.CallbackDispatcher, l *slog.Logger) error { + l.Info("Dispatch persited callbacks") + + const batchSize = 100 + ctx := context.Background() + + for { + callbacks, err := s.PopMany(ctx, batchSize) + if err != nil || len(callbacks) == 0 { + return err + } + + for _, c := range callbacks { + d.Dispatch(c.Url, toCallbackEntry(c)) + } + } +} + +func startHealthServerCallbacker(serv *callbacker.Server, healthConfig *config.HealthConfig, logger *slog.Logger) (*grpc.Server, error) { gs := grpc.NewServer() grpc_health_v1.RegisterHealthServer(gs, serv) // registration @@ -145,3 +162,60 @@ func StartHealthServerCallbacker(serv *callbacker.Server, healthConfig *config.H return gs, nil } + +func dispose(l *slog.Logger, server *callbacker.Server, workers *callbacker.BackgroundWorkers, + dispatcher *callbacker.CallbackDispatcher, sender *callbacker.CallbackSender, + store store.CallbackerStore, healthServer *grpc.Server) { + + // dispose of dependencies in the correct order: + // 1. server - ensure no new callbacks will be received + // 2. background workers - ensure no callbacks from background will be accepted + // 3. dispatcher - ensure all already accepted callbacks are proccessed + // 4. sender - finally, stop the sender as there are no callbacks left to send + // 5. store + + if server != nil { + server.GracefulStop() + } + if workers != nil { + workers.GracefulStop() + } + if dispatcher != nil { + dispatcher.GracefulStop() + } + if sender != nil { + sender.GracefulStop() + } + + if store != nil { + err := store.Close() + if err != nil { + l.Error("Could not close the store", slog.String("err", err.Error())) + } + } + + if healthServer != nil { + healthServer.Stop() + } + +} + +func toCallbackEntry(dto *store.CallbackData) *callbacker.CallbackEntry { + d := &callbacker.Callback{ + Timestamp: dto.Timestamp, + + CompetingTxs: dto.CompetingTxs, + TxID: dto.TxID, + TxStatus: dto.TxStatus, + ExtraInfo: dto.ExtraInfo, + MerklePath: dto.MerklePath, + + BlockHash: dto.BlockHash, + BlockHeight: dto.BlockHeight, + } + + return &callbacker.CallbackEntry{ + Token: dto.Token, + Data: d, + } +} diff --git a/config/config.go b/config/config.go index f8694d7a6..4232e73b4 100644 --- a/config/config.go +++ b/config/config.go @@ -123,9 +123,18 @@ type K8sWatcherConfig struct { } type CallbackerConfig struct { - ListenAddr string `mapstructure:"listenAddr"` - DialAddr string `mapstructure:"dialAddr"` - Health *HealthConfig `mapstructure:"health"` - Pause time.Duration `mapstructure:"pause"` - Db *DbConfig `mapstructure:"db"` + ListenAddr string `mapstructure:"listenAddr"` + DialAddr string `mapstructure:"dialAddr"` + Health *HealthConfig `mapstructure:"health"` + Pause time.Duration `mapstructure:"pause"` + Db *DbConfig `mapstructure:"db"` + PruneInterval time.Duration `mapstructure:"pruneInterval"` + PruneOlderThan time.Duration `mapstructure:"pruneOlderThan"` + QuarantineCheckInterval time.Duration `mapstructure:"quarantineCheckInterval"` + QuarantinePolicy *CallbackerQuarantinePolicy `mapstructure:"quarantinePolicy"` +} + +type CallbackerQuarantinePolicy struct { + BaseDuration time.Duration `mapstructure:"baseDuration"` + PermQuarantineAfter time.Duration `mapstructure:"permQuarantineAfter"` } diff --git a/config/defaults.go b/config/defaults.go index 32ae143d8..5778d7c3a 100644 --- a/config/defaults.go +++ b/config/defaults.go @@ -170,7 +170,14 @@ func getCallbackerConfig() *CallbackerConfig { Health: &HealthConfig{ SeverDialAddr: "localhost:8025", }, - Pause: 0, - Db: getDbConfig("callbacker"), + Pause: 0, + Db: getDbConfig("callbacker"), + PruneInterval: 24 * time.Hour, + PruneOlderThan: 14 * 24 * time.Hour, + QuarantineCheckInterval: time.Minute, + QuarantinePolicy: &CallbackerQuarantinePolicy{ + BaseDuration: 10 * time.Minute, + PermQuarantineAfter: 24 * time.Hour, + }, } } diff --git a/config/example_config.yaml b/config/example_config.yaml index 5664a5758..6581ee96d 100644 --- a/config/example_config.yaml +++ b/config/example_config.yaml @@ -137,4 +137,10 @@ callbacker: maxIdleConns: 10 # maximum idle connections maxOpenConns: 80 # maximum open connections sslMode: disable + pruneInterval: 24h # interval at which old or failed callbacks are pruned from the store + pruneOlderThan: 336h # age threshold for pruning callbacks (older than this value will be removed) + quarantineCheckInterval: 1m # interval at which the store is checked for quarantined callbacks to be re-sent + quarantinePolicy: + baseDuration: 5m # initial duration a callback and its receiver is quarantined after failure + permQuarantineAfter: 24h # maximum time a callback can remain unsent before it's put in permanent quarantine \ No newline at end of file diff --git a/doc/README.md b/doc/README.md index cc8a93935..67f0b14e6 100644 --- a/doc/README.md +++ b/doc/README.md @@ -72,6 +72,8 @@ API is the REST API microservice for interacting with ARC. See the [API document The API takes care of validation and sending transactions to Metamorph. The API talks to one or more Metamorph instances using client-based, round-robin load balancing. +The `X-MaxTimeout` header determines the maximum number of seconds the system will wait for new transaction statuses before the response is returned. The default timeout is 5 seconds, with a maximum value of 30 seconds. + #### Validation The API is the first component of ARC and therefore the one that by design derives a benefit for ARC performing a preliminar validation of transactions thanks to the use of the [extended transaction formats](#extended-format-ef-and-background-evaluation-extended-format-beef). @@ -80,15 +82,24 @@ However, sending transactions in classic format is supported through the ARC API When possible, the API is responsible for rejecting transactions that would be unacceptable to the Bitcoin network. +#### Callbacks + +The client can register to receive callbacks with information about the statuses of submitted transactions. To do this, the client must include the `X-CallbackUrl` header in their request. Once registered, the ARC will send a `POST` request to the URL specified in the header, with the transaction ID included in the request body. + +If the client wants to secure its callback endpoint, ARC supports Bearer token authorization. A callback token can be provided by adding the `X-CallbackToken: ` header to the request. + +By default, callbacks are triggered when the submitted transaction reaches the status `REJECTED` or `MINED`. If the client wishes to receive additional intermediate status updates—such (e.g. `SEEN_IN_ORPHAN_MEMPOOL` or `SEEN_ON_NETWORK`) the `X-FullStatusUpdates` header must be set to true. For more details, refer to the [API documentation](https://bitcoin-sv.github.io/arc/api.html). +For more details on how callbacks work, see the [Callbacker](#Callbacker) section. + ### Metamorph Metamorph is a microservice that is responsible for processing transactions sent by the API to the Bitcoin network. It takes care of re-sending transactions if they are not acknowledged by the network within a certain time period (60 seconds by default). ### Callbacker -Callbacker is a microservice that sends callbacks to a specified URL. To register a callback, the client must add the `X-CallbackUrl` header to the request. The callbacker will then send a POST request to the URL specified in the header, with the transaction ID in the body. +The Callbacker is a microservice responsible for handling all registered callbacks. It sends a `POST` request to the specified URL, including a `Bearer token` in the `Authorization` header when required. -The following example shows the format of a callback body +Below is an example of a callback request body: ```json { @@ -102,10 +113,14 @@ The following example shows the format of a callback body } ``` -A callback token can be added to the request by adding the header `X-CallbackToken: `. The respective callback will then have a header `Authorization: Bearer `. +To prevent DDoS attacks on callback receivers, the Callbacker service instance sends callbacks to the specified URLs in a serial (sequential) manner, ensuring only one request is sent at a time. + +>NOTE: Typically, there are several instances of Callbacker, and each one works independently. + +The Callbacker handles request retries and treats any HTTP status code outside the range of `200–299` as a failure. If the receiver fails to return a success status after a certain number of retries, it is placed in quarantine for a certain period. During this time, sending callbacks to the receiver is paused, and all callbacks are stored persistently in the Callbacker service to be retried later. + +>NOTE: Callbacks that weren't successfully sent for an extended period (e.g., 24 hours) are no longer sent. -By default, callbacks are sent to the specified URL in case the submitted transaction has status `REJECTED` or `MINED`. In case the client wants to receive the intermediate status updates (`SEEN_IN_ORPHAN_MEMPOOL` and `SEEN_ON_NETWORK`) about the transaction, additionally the `X-FullStatusUpdates` header needs to be set to `true`. See the [API documentation](https://bitcoin-sv.github.io/arc/api.html) for more information. -`X-MaxTimeout` header determines maximum number of seconds to wait for transaction new statuses before request expires (default 5sec, max value 30s). ### BlockTx diff --git a/internal/callbacker/background_workers.go b/internal/callbacker/background_workers.go new file mode 100644 index 000000000..59c55da8a --- /dev/null +++ b/internal/callbacker/background_workers.go @@ -0,0 +1,122 @@ +package callbacker + +import ( + "context" + "log/slog" + "sync" + "time" + + "github.com/bitcoin-sv/arc/internal/callbacker/store" +) + +type BackgroundWorkers struct { + s store.CallbackerStore + l *slog.Logger + d *CallbackDispatcher + + workersWg sync.WaitGroup + ctx context.Context + cancelAll func() +} + +func NewBackgroundWorkers(store store.CallbackerStore, dispatcher *CallbackDispatcher, logger *slog.Logger) *BackgroundWorkers { + ctx, cancel := context.WithCancel(context.Background()) + + return &BackgroundWorkers{ + s: store, + d: dispatcher, + l: logger.With(slog.String("module", "background workers")), + + ctx: ctx, + cancelAll: cancel, + } +} + +func (w *BackgroundWorkers) StartCallbackStoreCleanup(interval, olderThanDuration time.Duration) { + w.workersWg.Add(1) + go w.pruneCallbacks(interval, olderThanDuration) +} + +func (w *BackgroundWorkers) StartQuarantineCallbacksDispatch(interval time.Duration) { + w.workersWg.Add(1) + go w.dispatchQuarantineCallbacks(interval) +} + +func (w *BackgroundWorkers) GracefulStop() { + w.cancelAll() + w.workersWg.Wait() +} + +func (w *BackgroundWorkers) pruneCallbacks(interval, olderThanDuration time.Duration) { + ctx := context.Background() + t := time.NewTicker(interval) + + for { + select { + case <-t.C: + n := time.Now() + midnight := time.Date(n.Year(), n.Month(), n.Day(), 0, 0, 0, 0, time.UTC) + olderThan := midnight.Add(-1 * olderThanDuration) + + err := w.s.DeleteFailedOlderThan(ctx, olderThan) + if err != nil { + w.l.Error("failed to delete old callbacks in quarantine", slog.String("err", err.Error())) + } + + case <-w.ctx.Done(): + w.workersWg.Done() + return + } + } +} + +func (w *BackgroundWorkers) dispatchQuarantineCallbacks(interval time.Duration) { + const batchSize = 100 + + ctx := context.Background() + t := time.NewTicker(interval) + + for { + select { + case <-t.C: + callbacks, err := w.s.PopFailedMany(ctx, time.Now(), batchSize) + if err != nil { + w.l.Error("reading callbacks from store failed", slog.String("err", err.Error())) + continue + } + + if len(callbacks) == 0 { + continue + } + + for _, c := range callbacks { + w.d.Dispatch(c.Url, toCallbackEntry(c)) + } + + case <-w.ctx.Done(): + w.workersWg.Done() + return + } + } +} + +func toCallbackEntry(dto *store.CallbackData) *CallbackEntry { + d := &Callback{ + Timestamp: dto.Timestamp, + + CompetingTxs: dto.CompetingTxs, + TxID: dto.TxID, + TxStatus: dto.TxStatus, + ExtraInfo: dto.ExtraInfo, + MerklePath: dto.MerklePath, + + BlockHash: dto.BlockHash, + BlockHeight: dto.BlockHeight, + } + + return &CallbackEntry{ + Token: dto.Token, + Data: d, + postponedUntil: dto.PostponedUntil, + } +} diff --git a/internal/callbacker/callbacker.go b/internal/callbacker/callbacker.go index f48b2d9f2..aac58ccb8 100644 --- a/internal/callbacker/callbacker.go +++ b/internal/callbacker/callbacker.go @@ -5,7 +5,7 @@ import ( ) type CallbackerI interface { - Send(url, token string, callback *Callback) + Send(url, token string, callback *Callback) bool Health() error } diff --git a/internal/callbacker/callbacker_mock.go b/internal/callbacker/callbacker_mock.go index a189c8bb0..51b98d184 100644 --- a/internal/callbacker/callbacker_mock.go +++ b/internal/callbacker/callbacker_mock.go @@ -20,7 +20,7 @@ var _ CallbackerI = &CallbackerIMock{} // HealthFunc: func() error { // panic("mock out the Health method") // }, -// SendFunc: func(url string, token string, callback *Callback) { +// SendFunc: func(url string, token string, callback *Callback) bool { // panic("mock out the Send method") // }, // } @@ -34,7 +34,7 @@ type CallbackerIMock struct { HealthFunc func() error // SendFunc mocks the Send method. - SendFunc func(url string, token string, callback *Callback) + SendFunc func(url string, token string, callback *Callback) bool // calls tracks calls to the methods. calls struct { @@ -83,7 +83,7 @@ func (mock *CallbackerIMock) HealthCalls() []struct { } // Send calls SendFunc. -func (mock *CallbackerIMock) Send(url string, token string, callback *Callback) { +func (mock *CallbackerIMock) Send(url string, token string, callback *Callback) bool { if mock.SendFunc == nil { panic("CallbackerIMock.SendFunc: method is nil but CallbackerI.Send was just called") } @@ -99,7 +99,7 @@ func (mock *CallbackerIMock) Send(url string, token string, callback *Callback) mock.lockSend.Lock() mock.calls.Send = append(mock.calls.Send, callInfo) mock.lockSend.Unlock() - mock.SendFunc(url, token, callback) + return mock.SendFunc(url, token, callback) } // SendCalls gets all the calls that were made to Send. diff --git a/internal/callbacker/dispatcher.go b/internal/callbacker/dispatcher.go index 94af51a9c..65b998964 100644 --- a/internal/callbacker/dispatcher.go +++ b/internal/callbacker/dispatcher.go @@ -1,7 +1,21 @@ package callbacker +/* CallbackDispatcher */ +/* + +The CallbackDispatcher is a decorator of the CallbackerI interface, responsible for routing and dispatching callbacks to appropriate sendManager based on the callback URL. + +Key components: +- CallbackerI Interface: the CallbackDispatcher decorates this interface, enhancing its functionality by managing the actual dispatch logic +- sendManager: each sendManager handles specific types of callbacks, determined by the URL + +Dispatch Logic: the CallbackDispatcher ensures that callbacks are sent to the correct sendManager, maintaining efficient processing and delivery. +Graceful Shutdown: on service termination, the CallbackDispatcher ensures all active sendManagers are gracefully stopped, allowing in-progress callbacks to complete and safely shutting down the dispatch process. + +*/ + import ( - "context" + "log/slog" "sync" "time" @@ -11,24 +25,41 @@ import ( type CallbackDispatcher struct { c CallbackerI s store.CallbackerStore + l *slog.Logger managers map[string]*sendManager managersMu sync.Mutex - sleep time.Duration + sleep time.Duration + policy *quarantinePolicy } -func NewCallbackDispatcher(callbacker CallbackerI, store store.CallbackerStore, sleepDuration time.Duration) *CallbackDispatcher { +type CallbackEntry struct { + Token string + Data *Callback + postponedUntil *time.Time +} + +func NewCallbackDispatcher(callbacker CallbackerI, store store.CallbackerStore, logger *slog.Logger, + sleepDuration, quarantineBaseDuration, permQuarantineAfterDuration time.Duration) *CallbackDispatcher { + return &CallbackDispatcher{ - c: callbacker, - s: store, - sleep: sleepDuration, + c: callbacker, + s: store, + l: logger.With(slog.String("module", "dispatcher")), + sleep: sleepDuration, + policy: &quarantinePolicy{ + baseDuration: quarantineBaseDuration, + permQuarantineAfter: permQuarantineAfterDuration, + now: time.Now, + }, managers: make(map[string]*sendManager), } } -func (d *CallbackDispatcher) Send(url, token string, dto *Callback) { - d.dispatch(url, token, dto) +func (d *CallbackDispatcher) Send(url, token string, dto *Callback) bool { + d.Dispatch(url, &CallbackEntry{Token: token, Data: dto}) + return true } func (d *CallbackDispatcher) Health() error { @@ -44,146 +75,15 @@ func (d *CallbackDispatcher) GracefulStop() { } } -func (d *CallbackDispatcher) Init() error { - const batchSize = 100 - ctx := context.Background() - - for { - callbacks, err := d.s.PopMany(ctx, batchSize) - if err != nil || len(callbacks) == 0 { - return err - } - - for _, c := range callbacks { - d.dispatch(c.Url, c.Token, toCallback(c)) - } - } -} - -func (d *CallbackDispatcher) dispatch(url, token string, dto *Callback) { +func (d *CallbackDispatcher) Dispatch(url string, dto *CallbackEntry) { d.managersMu.Lock() m, ok := d.managers[url] if !ok { - m = runNewSendManager(url, d.c, d.s, d.sleep) + m = runNewSendManager(url, d.c, d.s, d.l, d.sleep, d.policy) d.managers[url] = m } d.managersMu.Unlock() - m.Add(token, dto) -} - -type sendManager struct { - url string - c CallbackerI - s store.CallbackerStore - - entriesWg sync.WaitGroup - entries chan *callbackEntry - - stop chan struct{} - stopping bool - - sleep time.Duration -} - -type callbackEntry struct { - token string - data *Callback -} - -func runNewSendManager(u string, c CallbackerI, s store.CallbackerStore, slp time.Duration) *sendManager { - m := &sendManager{ - url: u, - c: c, - s: s, - sleep: slp, - - entries: make(chan *callbackEntry), - stop: make(chan struct{}), - } - - m.run() - return m -} - -func (m *sendManager) Add(token string, dto *Callback) { - m.entriesWg.Add(1) // count the callbacks accepted for processing - go func() { - m.entries <- &callbackEntry{token: token, data: dto} - }() -} - -func (m *sendManager) GracefulStop() { - m.stop <- struct{}{} // signal the `run` goroutine to stop processing - m.entriesWg.Wait() // wait for all accepted callbacks to be consumed - - close(m.entries) // signal the `run` goroutine to exit - - <-m.stop // wait for the `run` goroutine to exit - close(m.stop) -} - -func (m *sendManager) run() { - go func() { - var danglingCallbacks []*store.CallbackData - - handleCallbacks: - for { - select { - case callback, ok := <-m.entries: - if !ok { - break handleCallbacks - } - - if m.stopping { - // add callback to save - danglingCallbacks = append(danglingCallbacks, toStoreDto(m.url, callback)) - } else { - m.c.Send(m.url, callback.token, callback.data) - time.Sleep(m.sleep) - } - - m.entriesWg.Done() // decrease the number of callbacks that need to be processed (send or store on stop) - - case <-m.stop: - m.stopping = true - } - } - - _ = m.s.SetMany(context.Background(), danglingCallbacks) - m.stop <- struct{}{} - }() -} - -func toStoreDto(url string, s *callbackEntry) *store.CallbackData { - return &store.CallbackData{ - Url: url, - Token: s.token, - Timestamp: s.data.Timestamp, - - CompetingTxs: s.data.CompetingTxs, - TxID: s.data.TxID, - TxStatus: s.data.TxStatus, - ExtraInfo: s.data.ExtraInfo, - MerklePath: s.data.MerklePath, - - BlockHash: s.data.BlockHash, - BlockHeight: s.data.BlockHeight, - } -} - -func toCallback(dto *store.CallbackData) *Callback { - return &Callback{ - Timestamp: dto.Timestamp, - - CompetingTxs: dto.CompetingTxs, - TxID: dto.TxID, - TxStatus: dto.TxStatus, - ExtraInfo: dto.ExtraInfo, - MerklePath: dto.MerklePath, - - BlockHash: dto.BlockHash, - BlockHeight: dto.BlockHeight, - } + m.Add(dto) } diff --git a/internal/callbacker/dispatcher_test.go b/internal/callbacker/dispatcher_test.go index 6d9002fc3..d68ba996d 100644 --- a/internal/callbacker/dispatcher_test.go +++ b/internal/callbacker/dispatcher_test.go @@ -3,7 +3,7 @@ package callbacker import ( "context" "fmt" - "math" + "log/slog" "sync" "testing" "time" @@ -13,7 +13,7 @@ import ( "github.com/stretchr/testify/require" ) -func Test_CallbackDispatcher(t *testing.T) { +func TestCallbackDispatcher(t *testing.T) { tcs := []struct { name string sendInterval time.Duration @@ -40,7 +40,7 @@ func Test_CallbackDispatcher(t *testing.T) { t.Run(tc.name, func(t *testing.T) { // given cMq := &CallbackerIMock{ - SendFunc: func(url, token string, callback *Callback) {}, + SendFunc: func(url, token string, callback *Callback) bool { return true }, } var savedCallbacks []*store.CallbackData @@ -51,7 +51,7 @@ func Test_CallbackDispatcher(t *testing.T) { }, } - sut := NewCallbackDispatcher(cMq, sMq, tc.sendInterval) + sut := NewCallbackDispatcher(cMq, sMq, slog.Default(), tc.sendInterval, 0, 0) var receivers []string for i := range tc.numOfReceivers { @@ -76,7 +76,7 @@ func Test_CallbackDispatcher(t *testing.T) { sut.GracefulStop() } else { // give a chance to process - time.Sleep(50 * time.Millisecond) + time.Sleep(100 * time.Millisecond) } // then @@ -91,125 +91,3 @@ func Test_CallbackDispatcher(t *testing.T) { }) } } - -func Test_CallbackDispatcher_Init(t *testing.T) { - tcs := []struct { - name string - danglingCallbacksNum int - }{ - { - name: "no dangling callbacks", - danglingCallbacksNum: 0, - }, - { - name: "callbacks to process on init", - danglingCallbacksNum: 259, - }, - } - - for _, tc := range tcs { - t.Run(tc.name, func(t *testing.T) { - // given - var danglingCallbacks []*store.CallbackData - for range tc.danglingCallbacksNum { - danglingCallbacks = append(danglingCallbacks, &store.CallbackData{}) - } - - cMq := &CallbackerIMock{ - SendFunc: func(url, token string, callback *Callback) {}, - } - - sMq := &mocks.CallbackerStoreMock{ - PopManyFunc: func(ctx context.Context, limit int) ([]*store.CallbackData, error) { - limit = int(math.Min(float64(len(danglingCallbacks)), float64(limit))) - - r := danglingCallbacks[:limit] - danglingCallbacks = danglingCallbacks[limit:] - - return r, nil - }, - } - - sut := NewCallbackDispatcher(cMq, sMq, 0) - - // when - err := sut.Init() - time.Sleep(50 * time.Millisecond) - - // then - require.NoError(t, err) - require.Equal(t, tc.danglingCallbacksNum, len(cMq.SendCalls())) - }) - } -} - -func Test_sendManager(t *testing.T) { - tcs := []struct { - name string - sendInterval time.Duration - numOfSends int - stopManager bool - }{ - { - name: "send callbacks when run", - sendInterval: 0, - numOfSends: 100, - }, - { - name: "send callbacks on stopping", - sendInterval: time.Millisecond, // set interval to give time to call stop function - numOfSends: 10, - stopManager: true, - }, - } - - for _, tc := range tcs { - t.Run(tc.name, func(t *testing.T) { - // given - cMq := &CallbackerIMock{ - SendFunc: func(url, token string, callback *Callback) {}, - } - var savedCallbacks []*store.CallbackData - sMq := &mocks.CallbackerStoreMock{ - SetManyFunc: func(ctx context.Context, data []*store.CallbackData) error { - savedCallbacks = append(savedCallbacks, data...) - return nil - }, - } - - sut := &sendManager{ - url: "", - c: cMq, - s: sMq, - sleep: tc.sendInterval, - - entries: make(chan *callbackEntry), - stop: make(chan struct{}), - } - - // add callbacks before starting the manager to queue them - for range tc.numOfSends { - sut.Add("", &Callback{}) - } - - // when - sut.run() - - if tc.stopManager { - sut.GracefulStop() - } else { - // give a chance to process or save on quit - time.Sleep(50 * time.Millisecond) - } - - // then - if tc.stopManager { - require.NotEmpty(t, savedCallbacks) - require.Equal(t, tc.numOfSends, len(cMq.SendCalls())+len(savedCallbacks)) - } else { - require.Empty(t, savedCallbacks) - require.Equal(t, tc.numOfSends, len(cMq.SendCalls())) - } - }) - } -} diff --git a/internal/callbacker/quarantine_policy.go b/internal/callbacker/quarantine_policy.go new file mode 100644 index 000000000..52dba76e6 --- /dev/null +++ b/internal/callbacker/quarantine_policy.go @@ -0,0 +1,44 @@ +package callbacker + +import "time" + +type quarantinePolicy struct { + baseDuration time.Duration + permQuarantineAfter time.Duration + now func() time.Time +} + +// arbitrarily chosen date in a distant future +var infinity = time.Date(2999, time.January, 1, 0, 0, 0, 0, time.UTC) + +// Until calculates the time until which the quarantine should last based on a reference time. +// It compares the current time with the given referenceTime and adjusts the duration of quarantine as follows: +// +// 1. If the time since referenceTime is less than or equal to baseDuration: +// - The quarantine ends after the baseDuration has passed from the current time. +// - Returns the current time plus baseDuration. +// +// 2. If the time since referenceTime is greater than baseDuration but less than permQuarantineAfter: +// - The quarantine duration is extended to double the time that has passed since the referenceTime. +// - Returns the current time plus twice the elapsed time. +// +// 3. If the time since referenceTime exceeds permQuarantineAfter: +// - The quarantine is considered "permanent", meaning no further action is needed. +// - Returns a predefined "infinity" date (January 1, 2999). +// +// This function dynamically adjusts the quarantine period based on how long it has been since +// the reference time, with the possibility of an extended period or abandonment after a certain threshold. +func (p *quarantinePolicy) Until(referenceTime time.Time) time.Time { + duration := p.baseDuration + + since := p.now().Sub(referenceTime) + if since > p.baseDuration { + if since > p.permQuarantineAfter { + return infinity + } + + duration = since * 2 + } + + return p.now().Add(duration) +} diff --git a/internal/callbacker/quarantine_policy_test.go b/internal/callbacker/quarantine_policy_test.go new file mode 100644 index 000000000..fcf90ead2 --- /dev/null +++ b/internal/callbacker/quarantine_policy_test.go @@ -0,0 +1,53 @@ +package callbacker + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestQuarantinePolicy(t *testing.T) { + tt := []struct { + name string + nowFn func() time.Time + refTime time.Time + expectedResult time.Time + }{ + { + name: "reference point in time close to call time- return base duration", + nowFn: func() time.Time { return time.Date(2024, 9, 11, 12, 30, 0, 0, time.UTC) }, + refTime: time.Date(2024, 9, 11, 12, 29, 0, 0, time.UTC), + expectedResult: time.Date(2024, 9, 11, 12, 40, 0, 0, time.UTC), + }, + { + name: "reference point in time is earlier than permanent quarantine- return infinity", + nowFn: func() time.Time { return time.Date(2024, 9, 11, 12, 30, 0, 0, time.UTC) }, + refTime: time.Date(2024, 9, 11, 12, 0, 0, 0, time.UTC), + expectedResult: infinity, + }, + { + name: "reference point in time is eariel than base duration- return double time span", + nowFn: func() time.Time { return time.Date(2024, 9, 11, 12, 30, 0, 0, time.UTC) }, + refTime: time.Date(2024, 9, 11, 12, 15, 0, 0, time.UTC), + expectedResult: time.Date(2024, 9, 11, 13, 0, 0, 0, time.UTC), + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + // given + sut := &quarantinePolicy{ + baseDuration: 10 * time.Minute, + permQuarantineAfter: 20 * time.Minute, + now: tc.nowFn, + } + + // when + actualTime := sut.Until(tc.refTime) + + // then + require.Equal(t, tc.expectedResult, actualTime) + }) + } +} diff --git a/internal/callbacker/send_manager.go b/internal/callbacker/send_manager.go new file mode 100644 index 000000000..2e2d33c2e --- /dev/null +++ b/internal/callbacker/send_manager.go @@ -0,0 +1,195 @@ +package callbacker + +/* sendManager */ +/* + +The SendManager is responsible for managing the sending of callbacks to a specific URL in a sequential (serial) manner. +It ensures callbacks are sent efficiently while adhering to policies regarding failed deliveries. + +Key components: +- CallbackerI : responsible for sending callbacks +- quarantine policy: the duration for quarantining a URL are governed by a configurable policy, determining how long the URL remains inactive before retry attempts + +Sending logic: callbacks are sent to the designated URL one at a time, ensuring sequential and orderly processing. + +Quarantine handling: if a URL fails to respond with a success status, the URL is placed in quarantine (based on a defined policy). + During this period, all callbacks for the quarantined URL are stored with a quarantine timestamp, preventing further dispatch attempts until the quarantine expires. + +Graceful Shutdown: on service termination, the sendManager ensures that any unsent callbacks are safely persisted in the store, ensuring no loss of data during shutdown. + +*/ + +import ( + "context" + "log/slog" + "sync" + "time" + + "github.com/bitcoin-sv/arc/internal/callbacker/store" +) + +type sendManager struct { + url string + + // dependencies + c CallbackerI + s store.CallbackerStore + l *slog.Logger + + // internal state + entriesWg sync.WaitGroup + entries chan *CallbackEntry + + stop chan struct{} + + sleep time.Duration + quarantine *quarantinePolicy + + modeMu sync.Mutex + mode mode +} + +type mode uint8 + +const ( + IdleMode mode = iota + ActiveMode + QuarantineMode + StoppingMode +) + +func runNewSendManager(u string, c CallbackerI, s store.CallbackerStore, l *slog.Logger, sleep time.Duration, qPolicy *quarantinePolicy) *sendManager { + m := &sendManager{ + url: u, + c: c, + s: s, + l: l, + sleep: sleep, + quarantine: qPolicy, + + entries: make(chan *CallbackEntry), + stop: make(chan struct{}), + } + + m.run() + return m +} + +func (m *sendManager) Add(entry *CallbackEntry) { + m.entriesWg.Add(1) // count the callbacks accepted for processing + go func() { + m.entries <- entry + }() +} + +func (m *sendManager) GracefulStop() { + m.stop <- struct{}{} // signal the `run` goroutine to stop sending callbacks + m.entriesWg.Wait() // wait for all accepted callbacks to be consumed + + close(m.entries) // signal the `run` goroutine to exit + + <-m.stop // wait for the `run` goroutine to exit + close(m.stop) +} + +func (m *sendManager) run() { + m.setMode(ActiveMode) + + go func() { + var danglingCallbacks []*store.CallbackData + runLoop: + for { + select { + case callback, ok := <-m.entries: + if !ok { + break runLoop + } + + switch m.getMode() { + case ActiveMode: + m.handleActive(callback) + case QuarantineMode: + m.handleQuarantine(callback) + case StoppingMode: + // add callback to save + danglingCallbacks = append(danglingCallbacks, toStoreDto(m.url, callback, nil)) + } + + m.entriesWg.Done() // decrease the number of callbacks that need to be processed (send or store on stop) + + case <-m.stop: + m.setMode(StoppingMode) + } + } + + _ = m.s.SetMany(context.Background(), danglingCallbacks) + m.stop <- struct{}{} + }() +} + +func (m *sendManager) getMode() mode { + m.modeMu.Lock() + defer m.modeMu.Unlock() + + return m.mode +} + +func (m *sendManager) setMode(v mode) { + m.modeMu.Lock() + m.mode = v + m.modeMu.Unlock() +} + +func (m *sendManager) handleActive(callback *CallbackEntry) { + if m.c.Send(m.url, callback.Token, callback.Data) { + time.Sleep(m.sleep) + return + } + + m.putInQuarantine() + m.handleQuarantine(callback) +} + +func (m *sendManager) handleQuarantine(ce *CallbackEntry) { + qUntil := m.quarantine.Until(ce.Data.Timestamp) + err := m.s.Set(context.Background(), toStoreDto(m.url, ce, &qUntil)) + if err != nil { + m.l.Error("failed to store callback in quarantine", slog.String("url", m.url), slog.String("err", err.Error())) + } +} + +func (m *sendManager) putInQuarantine() { + m.setMode(QuarantineMode) + m.l.Warn("send callback failed - putting receiver in quarantine", slog.String("url", m.url), slog.Duration("approx. duration", m.quarantine.baseDuration)) + + go func() { + time.Sleep(m.quarantine.baseDuration) + m.modeMu.Lock() + + if m.mode != StoppingMode { + m.mode = ActiveMode + m.l.Info("receiver is active again after quarantine", slog.String("url", m.url)) + } + + m.modeMu.Unlock() + }() +} + +func toStoreDto(url string, s *CallbackEntry, postponedUntil *time.Time) *store.CallbackData { + return &store.CallbackData{ + Url: url, + Token: s.Token, + Timestamp: s.Data.Timestamp, + + CompetingTxs: s.Data.CompetingTxs, + TxID: s.Data.TxID, + TxStatus: s.Data.TxStatus, + ExtraInfo: s.Data.ExtraInfo, + MerklePath: s.Data.MerklePath, + + BlockHash: s.Data.BlockHash, + BlockHeight: s.Data.BlockHeight, + + PostponedUntil: postponedUntil, + } +} diff --git a/internal/callbacker/send_manager_test.go b/internal/callbacker/send_manager_test.go new file mode 100644 index 000000000..09d19d20a --- /dev/null +++ b/internal/callbacker/send_manager_test.go @@ -0,0 +1,194 @@ +package callbacker + +import ( + "context" + "fmt" + "log/slog" + "testing" + "time" + + "github.com/bitcoin-sv/arc/internal/callbacker/store" + "github.com/bitcoin-sv/arc/internal/callbacker/store/mocks" + "github.com/stretchr/testify/require" +) + +func TestSendManager(t *testing.T) { + tcs := []struct { + name string + sendInterval time.Duration + numOfSends int + stopManager bool + }{ + { + name: "send callbacks when run", + sendInterval: 0, + numOfSends: 100, + }, + { + name: "save callbacks on stopping", + sendInterval: time.Millisecond, // set interval to give time to call stop function + numOfSends: 10, + stopManager: true, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + // given + cMq := &CallbackerIMock{ + SendFunc: func(url, token string, callback *Callback) bool { return true }, + } + var savedCallbacks []*store.CallbackData + sMq := &mocks.CallbackerStoreMock{ + SetManyFunc: func(ctx context.Context, data []*store.CallbackData) error { + savedCallbacks = append(savedCallbacks, data...) + return nil + }, + } + + sut := &sendManager{ + url: "", + c: cMq, + s: sMq, + sleep: tc.sendInterval, + + entries: make(chan *CallbackEntry), + stop: make(chan struct{}), + } + + // add callbacks before starting the manager to queue them + for range tc.numOfSends { + sut.Add(&CallbackEntry{Data: &Callback{}}) + } + + // when + sut.run() + + if tc.stopManager { + sut.GracefulStop() + } else { + // give a chance to process + time.Sleep(100 * time.Millisecond) + } + + // then + if tc.stopManager { + require.NotEmpty(t, savedCallbacks) + require.Equal(t, tc.numOfSends, len(cMq.SendCalls())+len(savedCallbacks)) + } else { + require.Empty(t, savedCallbacks) + require.Equal(t, tc.numOfSends, len(cMq.SendCalls())) + } + }) + } +} + +func TestSendManager_Quarantine(t *testing.T) { + /* Quarantine scenario + 1. sending failed + 2. put manager in quarantine for a specified duration + 3. store all callbacks during the quarantine period + 4. switch manager to active mode once quarantine is over + 5. send new callbacks after quarantine + */ + + // given + sendOK := true + senderMq := &CallbackerIMock{ + SendFunc: func(url, token string, callback *Callback) bool { return sendOK }, + } + + storeMq := &mocks.CallbackerStoreMock{ + SetFunc: func(ctx context.Context, data *store.CallbackData) error { + return nil + }, + } + + policy := quarantinePolicy{ + baseDuration: 200 * time.Millisecond, + permQuarantineAfter: time.Hour, + now: time.Now, + } + + var preQuarantineCallbacks []*CallbackEntry + for i := range 10 { + preQuarantineCallbacks = append(preQuarantineCallbacks, &CallbackEntry{Token: fmt.Sprintf("q %d", i), Data: &Callback{}}) + } + + var postQuarantineCallbacks []*CallbackEntry + for i := range 10 { + postQuarantineCallbacks = append(postQuarantineCallbacks, &CallbackEntry{Token: fmt.Sprintf("a %d", i), Data: &Callback{}}) + } + + sut := runNewSendManager("http://unittest.com", senderMq, storeMq, slog.Default(), 0, &policy) + + // when + sendOK = false // trigger send failure - this should put the manager in quarantine + + // add a few callbacks to send - all should be stored + for _, c := range preQuarantineCallbacks { + sut.Add(c) + time.Sleep(10 * time.Millisecond) + } + require.Equal(t, QuarantineMode, sut.getMode()) + + time.Sleep(policy.baseDuration + 20*time.Millisecond) // wait for the quarantine period to complete + require.Equal(t, ActiveMode, sut.getMode()) + + sendOK = true // now all sends should complete successfully + // add a few callbacks to send - all should be sent + for _, c := range postQuarantineCallbacks { + sut.Add(c) + } + + // give a chance to process + time.Sleep(50 * time.Millisecond) + + // then + storedCallbacks := storeMq.SetCalls() + require.Equal(t, len(preQuarantineCallbacks), len(storedCallbacks), "all callbacks sent during quarantine should be stored") + for _, c := range preQuarantineCallbacks { + _, ok := find(storedCallbacks, func(e struct { + Ctx context.Context + Dto *store.CallbackData + }) bool { + return e.Dto.Token == c.Token + }) + + require.True(t, ok) + } + + sendCallbacks := senderMq.SendCalls() + require.Equal(t, len(postQuarantineCallbacks)+1, len(sendCallbacks), "manager should attempt to resend the callback that caused quarantine and all callbacks sent after quarantine") + + _, ok := find(sendCallbacks, func(e struct { + URL string + Token string + Callback *Callback + }) bool { + return e.Token == preQuarantineCallbacks[0].Token + }) + + require.True(t, ok) + for _, c := range postQuarantineCallbacks { + _, ok := find(sendCallbacks, func(e struct { + URL string + Token string + Callback *Callback + }) bool { + return e.Token == c.Token + }) + + require.True(t, ok) + } +} + +func find[T any](arr []T, predicate func(T) bool) (T, bool) { + for _, element := range arr { + if predicate(element) { + return element, true + } + } + var zero T + return zero, false +} diff --git a/internal/callbacker/sender.go b/internal/callbacker/sender.go index 8da13eba6..f40011e85 100644 --- a/internal/callbacker/sender.go +++ b/internal/callbacker/sender.go @@ -30,7 +30,7 @@ type CallbackSender struct { } func NewSender(httpClient HttpClient, logger *slog.Logger) (*CallbackSender, error) { - stats := NewCallbackerStats() + stats := newCallbackerStats() err := registerStats( stats.callbackSeenOnNetworkCount, @@ -88,8 +88,8 @@ func (p *CallbackSender) Health() error { return nil } -func (p *CallbackSender) Send(url, token string, dto *Callback) { - ok := p.sendCallbackWithRetries(url, token, dto) +func (p *CallbackSender) Send(url, token string, dto *Callback) (ok bool) { + ok = p.sendCallbackWithRetries(url, token, dto) if ok { p.updateSuccessStats(dto.TxStatus) @@ -103,6 +103,7 @@ func (p *CallbackSender) Send(url, token string, dto *Callback) { slog.Int("retries", retries)) p.stats.callbackFailedCount.Inc() + return } func (p *CallbackSender) sendCallbackWithRetries(url, token string, dto *Callback) bool { diff --git a/internal/callbacker/stats_collector.go b/internal/callbacker/stats_collector.go index 4ee23d092..de86e2dda 100644 --- a/internal/callbacker/stats_collector.go +++ b/internal/callbacker/stats_collector.go @@ -15,7 +15,7 @@ type stats struct { callbackFailedCount prometheus.Gauge } -func NewCallbackerStats() *stats { +func newCallbackerStats() *stats { return &stats{ callbackSeenOnNetworkCount: prometheus.NewGauge(prometheus.GaugeOpts{ Name: "arc_callback_seen_on_network_count", diff --git a/internal/callbacker/store/mocks/store_mock.go b/internal/callbacker/store/mocks/store_mock.go index 5d46ad8a6..537bbd561 100644 --- a/internal/callbacker/store/mocks/store_mock.go +++ b/internal/callbacker/store/mocks/store_mock.go @@ -7,6 +7,7 @@ import ( "context" "github.com/bitcoin-sv/arc/internal/callbacker/store" "sync" + "time" ) // Ensure, that CallbackerStoreMock does implement store.CallbackerStore. @@ -22,9 +23,18 @@ var _ store.CallbackerStore = &CallbackerStoreMock{} // CloseFunc: func() error { // panic("mock out the Close method") // }, +// DeleteFailedOlderThanFunc: func(ctx context.Context, t time.Time) error { +// panic("mock out the DeleteFailedOlderThan method") +// }, +// PopFailedManyFunc: func(ctx context.Context, t time.Time, limit int) ([]*store.CallbackData, error) { +// panic("mock out the PopFailedMany method") +// }, // PopManyFunc: func(ctx context.Context, limit int) ([]*store.CallbackData, error) { // panic("mock out the PopMany method") // }, +// SetFunc: func(ctx context.Context, dto *store.CallbackData) error { +// panic("mock out the Set method") +// }, // SetManyFunc: func(ctx context.Context, data []*store.CallbackData) error { // panic("mock out the SetMany method") // }, @@ -38,9 +48,18 @@ type CallbackerStoreMock struct { // CloseFunc mocks the Close method. CloseFunc func() error + // DeleteFailedOlderThanFunc mocks the DeleteFailedOlderThan method. + DeleteFailedOlderThanFunc func(ctx context.Context, t time.Time) error + + // PopFailedManyFunc mocks the PopFailedMany method. + PopFailedManyFunc func(ctx context.Context, t time.Time, limit int) ([]*store.CallbackData, error) + // PopManyFunc mocks the PopMany method. PopManyFunc func(ctx context.Context, limit int) ([]*store.CallbackData, error) + // SetFunc mocks the Set method. + SetFunc func(ctx context.Context, dto *store.CallbackData) error + // SetManyFunc mocks the SetMany method. SetManyFunc func(ctx context.Context, data []*store.CallbackData) error @@ -49,6 +68,22 @@ type CallbackerStoreMock struct { // Close holds details about calls to the Close method. Close []struct { } + // DeleteFailedOlderThan holds details about calls to the DeleteFailedOlderThan method. + DeleteFailedOlderThan []struct { + // Ctx is the ctx argument value. + Ctx context.Context + // T is the t argument value. + T time.Time + } + // PopFailedMany holds details about calls to the PopFailedMany method. + PopFailedMany []struct { + // Ctx is the ctx argument value. + Ctx context.Context + // T is the t argument value. + T time.Time + // Limit is the limit argument value. + Limit int + } // PopMany holds details about calls to the PopMany method. PopMany []struct { // Ctx is the ctx argument value. @@ -56,6 +91,13 @@ type CallbackerStoreMock struct { // Limit is the limit argument value. Limit int } + // Set holds details about calls to the Set method. + Set []struct { + // Ctx is the ctx argument value. + Ctx context.Context + // Dto is the dto argument value. + Dto *store.CallbackData + } // SetMany holds details about calls to the SetMany method. SetMany []struct { // Ctx is the ctx argument value. @@ -64,9 +106,12 @@ type CallbackerStoreMock struct { Data []*store.CallbackData } } - lockClose sync.RWMutex - lockPopMany sync.RWMutex - lockSetMany sync.RWMutex + lockClose sync.RWMutex + lockDeleteFailedOlderThan sync.RWMutex + lockPopFailedMany sync.RWMutex + lockPopMany sync.RWMutex + lockSet sync.RWMutex + lockSetMany sync.RWMutex } // Close calls CloseFunc. @@ -96,6 +141,82 @@ func (mock *CallbackerStoreMock) CloseCalls() []struct { return calls } +// DeleteFailedOlderThan calls DeleteFailedOlderThanFunc. +func (mock *CallbackerStoreMock) DeleteFailedOlderThan(ctx context.Context, t time.Time) error { + if mock.DeleteFailedOlderThanFunc == nil { + panic("CallbackerStoreMock.DeleteFailedOlderThanFunc: method is nil but CallbackerStore.DeleteFailedOlderThan was just called") + } + callInfo := struct { + Ctx context.Context + T time.Time + }{ + Ctx: ctx, + T: t, + } + mock.lockDeleteFailedOlderThan.Lock() + mock.calls.DeleteFailedOlderThan = append(mock.calls.DeleteFailedOlderThan, callInfo) + mock.lockDeleteFailedOlderThan.Unlock() + return mock.DeleteFailedOlderThanFunc(ctx, t) +} + +// DeleteFailedOlderThanCalls gets all the calls that were made to DeleteFailedOlderThan. +// Check the length with: +// +// len(mockedCallbackerStore.DeleteFailedOlderThanCalls()) +func (mock *CallbackerStoreMock) DeleteFailedOlderThanCalls() []struct { + Ctx context.Context + T time.Time +} { + var calls []struct { + Ctx context.Context + T time.Time + } + mock.lockDeleteFailedOlderThan.RLock() + calls = mock.calls.DeleteFailedOlderThan + mock.lockDeleteFailedOlderThan.RUnlock() + return calls +} + +// PopFailedMany calls PopFailedManyFunc. +func (mock *CallbackerStoreMock) PopFailedMany(ctx context.Context, t time.Time, limit int) ([]*store.CallbackData, error) { + if mock.PopFailedManyFunc == nil { + panic("CallbackerStoreMock.PopFailedManyFunc: method is nil but CallbackerStore.PopFailedMany was just called") + } + callInfo := struct { + Ctx context.Context + T time.Time + Limit int + }{ + Ctx: ctx, + T: t, + Limit: limit, + } + mock.lockPopFailedMany.Lock() + mock.calls.PopFailedMany = append(mock.calls.PopFailedMany, callInfo) + mock.lockPopFailedMany.Unlock() + return mock.PopFailedManyFunc(ctx, t, limit) +} + +// PopFailedManyCalls gets all the calls that were made to PopFailedMany. +// Check the length with: +// +// len(mockedCallbackerStore.PopFailedManyCalls()) +func (mock *CallbackerStoreMock) PopFailedManyCalls() []struct { + Ctx context.Context + T time.Time + Limit int +} { + var calls []struct { + Ctx context.Context + T time.Time + Limit int + } + mock.lockPopFailedMany.RLock() + calls = mock.calls.PopFailedMany + mock.lockPopFailedMany.RUnlock() + return calls +} + // PopMany calls PopManyFunc. func (mock *CallbackerStoreMock) PopMany(ctx context.Context, limit int) ([]*store.CallbackData, error) { if mock.PopManyFunc == nil { @@ -132,6 +253,42 @@ func (mock *CallbackerStoreMock) PopManyCalls() []struct { return calls } +// Set calls SetFunc. +func (mock *CallbackerStoreMock) Set(ctx context.Context, dto *store.CallbackData) error { + if mock.SetFunc == nil { + panic("CallbackerStoreMock.SetFunc: method is nil but CallbackerStore.Set was just called") + } + callInfo := struct { + Ctx context.Context + Dto *store.CallbackData + }{ + Ctx: ctx, + Dto: dto, + } + mock.lockSet.Lock() + mock.calls.Set = append(mock.calls.Set, callInfo) + mock.lockSet.Unlock() + return mock.SetFunc(ctx, dto) +} + +// SetCalls gets all the calls that were made to Set. +// Check the length with: +// +// len(mockedCallbackerStore.SetCalls()) +func (mock *CallbackerStoreMock) SetCalls() []struct { + Ctx context.Context + Dto *store.CallbackData +} { + var calls []struct { + Ctx context.Context + Dto *store.CallbackData + } + mock.lockSet.RLock() + calls = mock.calls.Set + mock.lockSet.RUnlock() + return calls +} + // SetMany calls SetManyFunc. func (mock *CallbackerStoreMock) SetMany(ctx context.Context, data []*store.CallbackData) error { if mock.SetManyFunc == nil { diff --git a/internal/callbacker/store/postgresql/fixtures/delete_failed_older_than/callbacker.callbacks.yaml b/internal/callbacker/store/postgresql/fixtures/delete_failed_older_than/callbacker.callbacks.yaml new file mode 100644 index 000000000..8510e0593 --- /dev/null +++ b/internal/callbacker/store/postgresql/fixtures/delete_failed_older_than/callbacker.callbacks.yaml @@ -0,0 +1,408 @@ +- url: https://arc-callback-1/callback + token: token + tx_id: 96cbf8ba96dc3bad6ecc19ce34d1edbf57b2bc6f76cc3d80efdca95599cf5c28 + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:00 + +- url: https://arc-callback-1/callback + token: token + tx_id: 96cbf8ba96dc3bad6ecc19ce34d1edbf57b2bc6f76cc3d80efdca95599cf5c28 + tx_status: "MINED" + timestamp: 2024-09-01 12:01:00 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + postponed_until: 2024-09-01 12:11:10 + +- url: https://arc-callback-2/callback + token: token + tx_id: 96cbf8ba96dc3bad6ecc19ce34d1edbf57b2bc6f76cc3d80efdca95599cf5c28 + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:00 + +- url: https://arc-callback-2/callback + token: token + tx_id: 96cbf8ba96dc3bad6ecc19ce34d1edbf57b2bc6f76cc3d80efdca95599cf5c28 + tx_status: "MINED" + timestamp: 2024-09-01 12:01:00 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + postponed_until: 2024-09-01 12:11:10 + +- url: https://arc-callback-1/callback + token: token + tx_id: 3413cc9b40d48661c7f36bee88ebb39fca1d593f9672f840afdf07b018e73bb7 + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:01 + +- url: https://arc-callback-1/callback + token: token + tx_id: 3413cc9b40d48661c7f36bee88ebb39fca1d593f9672f840afdf07b018e73bb7 + tx_status: "MINED" + timestamp: 2024-09-01 12:01:01 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + postponed_until: 2024-09-01 12:11:10 + +- url: https://arc-callback-2/callback + token: token + tx_id: 3413cc9b40d48661c7f36bee88ebb39fca1d593f9672f840afdf07b018e73bb7 + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:01 + +- url: https://arc-callback-2/callback + token: token + tx_id: 3413cc9b40d48661c7f36bee88ebb39fca1d593f9672f840afdf07b018e73bb7 + tx_status: "MINED" + timestamp: 2024-09-01 12:01:01 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + +- url: https://arc-callback-1/callback + token: token + tx_id: 277fb619a6ee37757123301fce61884e741ab4e01a0dea7ec465ae74f43f82cc + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:02 + postponed_until: 2024-09-01 12:11:10 + +- url: https://arc-callback-1/callback + token: token + tx_id: 277fb619a6ee37757123301fce61884e741ab4e01a0dea7ec465ae74f43f82cc + tx_status: "MINED" + timestamp: 2024-09-01 12:01:02 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + +- url: https://arc-callback-2/callback + token: token + tx_id: 277fb619a6ee37757123301fce61884e741ab4e01a0dea7ec465ae74f43f82cc + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:02 + postponed_until: 2024-09-01 12:11:10 + +- url: https://arc-callback-2/callback + token: token + tx_id: 277fb619a6ee37757123301fce61884e741ab4e01a0dea7ec465ae74f43f82cc + tx_status: "MINED" + timestamp: 2024-09-01 12:01:02 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + +- url: https://arc-callback-1/callback + token: token + tx_id: 13441601b8f4bd6062ce113118e957c04442a3293360fffbe0ed8805c34c6343 + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:02 + +- url: https://arc-callback-1/callback + token: token + tx_id: 13441601b8f4bd6062ce113118e957c04442a3293360fffbe0ed8805c34c6343 + tx_status: "MINED" + timestamp: 2024-09-01 12:01:02 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + postponed_until: 2024-09-01 12:11:10 + +- url: https://arc-callback-2/callback + token: token + tx_id: 13441601b8f4bd6062ce113118e957c04442a3293360fffbe0ed8805c34c6343 + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:02 + +- url: https://arc-callback-2/callback + token: token + tx_id: 13441601b8f4bd6062ce113118e957c04442a3293360fffbe0ed8805c34c6343 + tx_status: "MINED" + timestamp: 2024-09-01 12:01:02 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + postponed_until: 2024-09-01 12:11:10 + +- url: https://arc-callback-1/callback + token: token + tx_id: 862f7781e2e65efddd3cb6bb3a924ea53edba299354991eb38bf47ec6e5c986c + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:02 + +- url: https://arc-callback-1/callback + token: token + tx_id: 862f7781e2e65efddd3cb6bb3a924ea53edba299354991eb38bf47ec6e5c986c + tx_status: "MINED" + timestamp: 2024-09-01 12:01:02 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + postponed_until: 2024-09-01 12:11:10 + +- url: https://arc-callback-2/callback + token: token + tx_id: 862f7781e2e65efddd3cb6bb3a924ea53edba299354991eb38bf47ec6e5c986c + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:02 + +- url: https://arc-callback-2/callback + token: token + tx_id: 862f7781e2e65efddd3cb6bb3a924ea53edba299354991eb38bf47ec6e5c986c + tx_status: "MINED" + timestamp: 2024-09-01 12:01:02 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + +- url: https://arc-callback-1/callback + token: token + tx_id: 0e27376ded97b656ccc02ecd6948d2e775d8904e7093eb89e8d2bf1eb7a60ea9 + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:02 + postponed_until: 2024-09-01 12:11:10 + +- url: https://arc-callback-1/callback + token: token + tx_id: 0e27376ded97b656ccc02ecd6948d2e775d8904e7093eb89e8d2bf1eb7a60ea9 + tx_status: "MINED" + timestamp: 2024-09-01 12:01:02 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + +- url: https://arc-callback-2/callback + token: token + tx_id: 0e27376ded97b656ccc02ecd6948d2e775d8904e7093eb89e8d2bf1eb7a60ea9 + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:02 + postponed_until: 2024-09-01 12:11:10 + +- url: https://arc-callback-2/callback + token: token + tx_id: 0e27376ded97b656ccc02ecd6948d2e775d8904e7093eb89e8d2bf1eb7a60ea9 + tx_status: "MINED" + timestamp: 2024-09-01 12:01:02 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + +- url: https://arc-callback-1/callback + token: token + tx_id: 6bcfe17b6b41511ee891401998caefb7ffdcee87653e197dfb1add5860b6a070 + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:02 + +- url: https://arc-callback-1/callback + token: token + tx_id: 6bcfe17b6b41511ee891401998caefb7ffdcee87653e197dfb1add5860b6a070 + tx_status: "MINED" + timestamp: 2024-09-01 12:01:02 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + +- url: https://arc-callback-2/callback + token: token + tx_id: 6bcfe17b6b41511ee891401998caefb7ffdcee87653e197dfb1add5860b6a070 + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:02 + +- url: https://arc-callback-2/callback + token: token + tx_id: 6bcfe17b6b41511ee891401998caefb7ffdcee87653e197dfb1add5860b6a070 + tx_status: "MINED" + timestamp: 2024-09-01 12:01:02 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + postponed_until: 2024-09-01 12:11:10 + +- url: https://arc-callback-1/callback + token: token + tx_id: c50eeb84c58780bb0eb39f430deb93e5f362fe16b73aa4f811c089e14a1815ae + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:02 + +- url: https://arc-callback-1/callback + token: token + tx_id: c50eeb84c58780bb0eb39f430deb93e5f362fe16b73aa4f811c089e14a1815ae + tx_status: "MINED" + timestamp: 2024-09-01 12:01:02 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + postponed_until: 2024-09-01 12:31:10 + +- url: https://arc-callback-2/callback + token: token + tx_id: c50eeb84c58780bb0eb39f430deb93e5f362fe16b73aa4f811c089e14a1815ae + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:02 + postponed_until: 2024-09-01 12:11:10 + +- url: https://arc-callback-2/callback + token: token + tx_id: c50eeb84c58780bb0eb39f430deb93e5f362fe16b73aa4f811c089e14a1815ae + tx_status: "MINED" + timestamp: 2024-09-01 12:01:02 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + +- url: https://arc-callback-1/callback + token: token + tx_id: 6de67f42990ab43f7cc480d6339f13932fe67fa8971a59aa34b93b5dac734c3e + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:02 + +- url: https://arc-callback-1/callback + token: token + tx_id: 6de67f42990ab43f7cc480d6339f13932fe67fa8971a59aa34b93b5dac734c3e + tx_status: "MINED" + timestamp: 2024-09-01 12:01:02 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + +- url: https://arc-callback-2/callback + token: token + tx_id: 6de67f42990ab43f7cc480d6339f13932fe67fa8971a59aa34b93b5dac734c3e + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:02 + postponed_until: 2024-09-01 12:11:10 + +- url: https://arc-callback-2/callback + token: token + tx_id: 6de67f42990ab43f7cc480d6339f13932fe67fa8971a59aa34b93b5dac734c3e + tx_status: "MINED" + timestamp: 2024-09-01 12:01:02 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + +- url: https://arc-callback-1/callback + token: token + tx_id: d23d87582b477d4cdcfd92537cc44689220c753a716f355c35faa4856562d331 + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:03 + +- url: https://arc-callback-1/callback + token: token + tx_id: d23d87582b477d4cdcfd92537cc44689220c753a716f355c35faa4856562d331 + tx_status: "MINED" + timestamp: 2024-09-01 12:01:03 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + postponed_until: 2024-09-01 12:31:10 + +- url: https://arc-callback-2/callback + token: token + tx_id: d23d87582b477d4cdcfd92537cc44689220c753a716f355c35faa4856562d331 + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:03 + +- url: https://arc-callback-2/callback + token: token + tx_id: d23d87582b477d4cdcfd92537cc44689220c753a716f355c35faa4856562d331 + tx_status: "MINED" + timestamp: 2024-09-01 12:01:03 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + +- url: https://arc-callback-1/callback + token: token + tx_id: 975d0526e7a2d47225f266cb1c4bc2abad7f2c4a976dd9bd1381792d647d5430 + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:03 + +- url: https://arc-callback-1/callback + token: token + tx_id: 975d0526e7a2d47225f266cb1c4bc2abad7f2c4a976dd9bd1381792d647d5430 + tx_status: "MINED" + timestamp: 2024-09-01 12:01:03 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + +- url: https://arc-callback-2/callback + token: token + tx_id: 975d0526e7a2d47225f266cb1c4bc2abad7f2c4a976dd9bd1381792d647d5430 + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:03 + +- url: https://arc-callback-2/callback + token: token + tx_id: 975d0526e7a2d47225f266cb1c4bc2abad7f2c4a976dd9bd1381792d647d5430 + tx_status: "MINED" + timestamp: 2024-09-01 12:01:03 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + +- url: https://arc-callback-1/callback + token: token + tx_id: 8e84ed2ef2264cfa1f2a00c218329d8862d09a06ae5f6ad62d2f1b8069b13a64 + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:04 + +- url: https://arc-callback-1/callback + token: token + tx_id: 8e84ed2ef2264cfa1f2a00c218329d8862d09a06ae5f6ad62d2f1b8069b13a64 + tx_status: "MINED" + timestamp: 2024-09-01 12:01:04 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + +- url: https://arc-callback-2/callback + token: token + tx_id: 8e84ed2ef2264cfa1f2a00c218329d8862d09a06ae5f6ad62d2f1b8069b13a64 + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:04 + +- url: https://arc-callback-2/callback + token: token + tx_id: 8e84ed2ef2264cfa1f2a00c218329d8862d09a06ae5f6ad62d2f1b8069b13a64 + tx_status: "MINED" + timestamp: 2024-09-01 12:01:04 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + +- url: https://arc-callback-1/callback + token: token + tx_id: 292ff388160b2814473af90c289ed007b2f1b38cb02ba64cafab1af3b15e41d0 + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:04 + +- url: https://arc-callback-1/callback + token: token + tx_id: 292ff388160b2814473af90c289ed007b2f1b38cb02ba64cafab1af3b15e41d0 + tx_status: "MINED" + timestamp: 2024-09-01 12:01:04 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + +- url: https://arc-callback-2/callback + token: token + tx_id: 292ff388160b2814473af90c289ed007b2f1b38cb02ba64cafab1af3b15e41d0 + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:04 + +- url: https://arc-callback-2/callback + token: token + tx_id: 292ff388160b2814473af90c289ed007b2f1b38cb02ba64cafab1af3b15e41d0 + tx_status: "MINED" + timestamp: 2024-09-01 12:01:04 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + +- url: https://arc-callback-1/callback + token: token + tx_id: 0011f2d1abb9eec2c3b6194c2aae6acfb819f2cbd9faa21a31646dc597fbec78 + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:04 + +- url: https://arc-callback-1/callback + token: token + tx_id: 0011f2d1abb9eec2c3b6194c2aae6acfb819f2cbd9faa21a31646dc597fbec78 + tx_status: "MINED" + timestamp: 2024-09-01 12:01:04 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + postponed_until: 2024-09-01 12:31:10 + +- url: https://arc-callback-2/callback + token: token + tx_id: 0011f2d1abb9eec2c3b6194c2aae6acfb819f2cbd9faa21a31646dc597fbec78 + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:04 + +- url: https://arc-callback-2/callback + token: token + tx_id: 0011f2d1abb9eec2c3b6194c2aae6acfb819f2cbd9faa21a31646dc597fbec78 + tx_status: "MINED" + timestamp: 2024-09-01 12:01:04 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + postponed_until: 2024-09-01 12:31:10 diff --git a/internal/callbacker/store/postgresql/fixtures/pop_failed_many/callbacker.callbacks.yaml b/internal/callbacker/store/postgresql/fixtures/pop_failed_many/callbacker.callbacks.yaml new file mode 100644 index 000000000..8510e0593 --- /dev/null +++ b/internal/callbacker/store/postgresql/fixtures/pop_failed_many/callbacker.callbacks.yaml @@ -0,0 +1,408 @@ +- url: https://arc-callback-1/callback + token: token + tx_id: 96cbf8ba96dc3bad6ecc19ce34d1edbf57b2bc6f76cc3d80efdca95599cf5c28 + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:00 + +- url: https://arc-callback-1/callback + token: token + tx_id: 96cbf8ba96dc3bad6ecc19ce34d1edbf57b2bc6f76cc3d80efdca95599cf5c28 + tx_status: "MINED" + timestamp: 2024-09-01 12:01:00 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + postponed_until: 2024-09-01 12:11:10 + +- url: https://arc-callback-2/callback + token: token + tx_id: 96cbf8ba96dc3bad6ecc19ce34d1edbf57b2bc6f76cc3d80efdca95599cf5c28 + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:00 + +- url: https://arc-callback-2/callback + token: token + tx_id: 96cbf8ba96dc3bad6ecc19ce34d1edbf57b2bc6f76cc3d80efdca95599cf5c28 + tx_status: "MINED" + timestamp: 2024-09-01 12:01:00 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + postponed_until: 2024-09-01 12:11:10 + +- url: https://arc-callback-1/callback + token: token + tx_id: 3413cc9b40d48661c7f36bee88ebb39fca1d593f9672f840afdf07b018e73bb7 + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:01 + +- url: https://arc-callback-1/callback + token: token + tx_id: 3413cc9b40d48661c7f36bee88ebb39fca1d593f9672f840afdf07b018e73bb7 + tx_status: "MINED" + timestamp: 2024-09-01 12:01:01 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + postponed_until: 2024-09-01 12:11:10 + +- url: https://arc-callback-2/callback + token: token + tx_id: 3413cc9b40d48661c7f36bee88ebb39fca1d593f9672f840afdf07b018e73bb7 + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:01 + +- url: https://arc-callback-2/callback + token: token + tx_id: 3413cc9b40d48661c7f36bee88ebb39fca1d593f9672f840afdf07b018e73bb7 + tx_status: "MINED" + timestamp: 2024-09-01 12:01:01 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + +- url: https://arc-callback-1/callback + token: token + tx_id: 277fb619a6ee37757123301fce61884e741ab4e01a0dea7ec465ae74f43f82cc + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:02 + postponed_until: 2024-09-01 12:11:10 + +- url: https://arc-callback-1/callback + token: token + tx_id: 277fb619a6ee37757123301fce61884e741ab4e01a0dea7ec465ae74f43f82cc + tx_status: "MINED" + timestamp: 2024-09-01 12:01:02 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + +- url: https://arc-callback-2/callback + token: token + tx_id: 277fb619a6ee37757123301fce61884e741ab4e01a0dea7ec465ae74f43f82cc + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:02 + postponed_until: 2024-09-01 12:11:10 + +- url: https://arc-callback-2/callback + token: token + tx_id: 277fb619a6ee37757123301fce61884e741ab4e01a0dea7ec465ae74f43f82cc + tx_status: "MINED" + timestamp: 2024-09-01 12:01:02 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + +- url: https://arc-callback-1/callback + token: token + tx_id: 13441601b8f4bd6062ce113118e957c04442a3293360fffbe0ed8805c34c6343 + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:02 + +- url: https://arc-callback-1/callback + token: token + tx_id: 13441601b8f4bd6062ce113118e957c04442a3293360fffbe0ed8805c34c6343 + tx_status: "MINED" + timestamp: 2024-09-01 12:01:02 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + postponed_until: 2024-09-01 12:11:10 + +- url: https://arc-callback-2/callback + token: token + tx_id: 13441601b8f4bd6062ce113118e957c04442a3293360fffbe0ed8805c34c6343 + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:02 + +- url: https://arc-callback-2/callback + token: token + tx_id: 13441601b8f4bd6062ce113118e957c04442a3293360fffbe0ed8805c34c6343 + tx_status: "MINED" + timestamp: 2024-09-01 12:01:02 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + postponed_until: 2024-09-01 12:11:10 + +- url: https://arc-callback-1/callback + token: token + tx_id: 862f7781e2e65efddd3cb6bb3a924ea53edba299354991eb38bf47ec6e5c986c + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:02 + +- url: https://arc-callback-1/callback + token: token + tx_id: 862f7781e2e65efddd3cb6bb3a924ea53edba299354991eb38bf47ec6e5c986c + tx_status: "MINED" + timestamp: 2024-09-01 12:01:02 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + postponed_until: 2024-09-01 12:11:10 + +- url: https://arc-callback-2/callback + token: token + tx_id: 862f7781e2e65efddd3cb6bb3a924ea53edba299354991eb38bf47ec6e5c986c + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:02 + +- url: https://arc-callback-2/callback + token: token + tx_id: 862f7781e2e65efddd3cb6bb3a924ea53edba299354991eb38bf47ec6e5c986c + tx_status: "MINED" + timestamp: 2024-09-01 12:01:02 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + +- url: https://arc-callback-1/callback + token: token + tx_id: 0e27376ded97b656ccc02ecd6948d2e775d8904e7093eb89e8d2bf1eb7a60ea9 + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:02 + postponed_until: 2024-09-01 12:11:10 + +- url: https://arc-callback-1/callback + token: token + tx_id: 0e27376ded97b656ccc02ecd6948d2e775d8904e7093eb89e8d2bf1eb7a60ea9 + tx_status: "MINED" + timestamp: 2024-09-01 12:01:02 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + +- url: https://arc-callback-2/callback + token: token + tx_id: 0e27376ded97b656ccc02ecd6948d2e775d8904e7093eb89e8d2bf1eb7a60ea9 + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:02 + postponed_until: 2024-09-01 12:11:10 + +- url: https://arc-callback-2/callback + token: token + tx_id: 0e27376ded97b656ccc02ecd6948d2e775d8904e7093eb89e8d2bf1eb7a60ea9 + tx_status: "MINED" + timestamp: 2024-09-01 12:01:02 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + +- url: https://arc-callback-1/callback + token: token + tx_id: 6bcfe17b6b41511ee891401998caefb7ffdcee87653e197dfb1add5860b6a070 + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:02 + +- url: https://arc-callback-1/callback + token: token + tx_id: 6bcfe17b6b41511ee891401998caefb7ffdcee87653e197dfb1add5860b6a070 + tx_status: "MINED" + timestamp: 2024-09-01 12:01:02 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + +- url: https://arc-callback-2/callback + token: token + tx_id: 6bcfe17b6b41511ee891401998caefb7ffdcee87653e197dfb1add5860b6a070 + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:02 + +- url: https://arc-callback-2/callback + token: token + tx_id: 6bcfe17b6b41511ee891401998caefb7ffdcee87653e197dfb1add5860b6a070 + tx_status: "MINED" + timestamp: 2024-09-01 12:01:02 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + postponed_until: 2024-09-01 12:11:10 + +- url: https://arc-callback-1/callback + token: token + tx_id: c50eeb84c58780bb0eb39f430deb93e5f362fe16b73aa4f811c089e14a1815ae + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:02 + +- url: https://arc-callback-1/callback + token: token + tx_id: c50eeb84c58780bb0eb39f430deb93e5f362fe16b73aa4f811c089e14a1815ae + tx_status: "MINED" + timestamp: 2024-09-01 12:01:02 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + postponed_until: 2024-09-01 12:31:10 + +- url: https://arc-callback-2/callback + token: token + tx_id: c50eeb84c58780bb0eb39f430deb93e5f362fe16b73aa4f811c089e14a1815ae + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:02 + postponed_until: 2024-09-01 12:11:10 + +- url: https://arc-callback-2/callback + token: token + tx_id: c50eeb84c58780bb0eb39f430deb93e5f362fe16b73aa4f811c089e14a1815ae + tx_status: "MINED" + timestamp: 2024-09-01 12:01:02 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + +- url: https://arc-callback-1/callback + token: token + tx_id: 6de67f42990ab43f7cc480d6339f13932fe67fa8971a59aa34b93b5dac734c3e + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:02 + +- url: https://arc-callback-1/callback + token: token + tx_id: 6de67f42990ab43f7cc480d6339f13932fe67fa8971a59aa34b93b5dac734c3e + tx_status: "MINED" + timestamp: 2024-09-01 12:01:02 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + +- url: https://arc-callback-2/callback + token: token + tx_id: 6de67f42990ab43f7cc480d6339f13932fe67fa8971a59aa34b93b5dac734c3e + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:02 + postponed_until: 2024-09-01 12:11:10 + +- url: https://arc-callback-2/callback + token: token + tx_id: 6de67f42990ab43f7cc480d6339f13932fe67fa8971a59aa34b93b5dac734c3e + tx_status: "MINED" + timestamp: 2024-09-01 12:01:02 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + +- url: https://arc-callback-1/callback + token: token + tx_id: d23d87582b477d4cdcfd92537cc44689220c753a716f355c35faa4856562d331 + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:03 + +- url: https://arc-callback-1/callback + token: token + tx_id: d23d87582b477d4cdcfd92537cc44689220c753a716f355c35faa4856562d331 + tx_status: "MINED" + timestamp: 2024-09-01 12:01:03 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + postponed_until: 2024-09-01 12:31:10 + +- url: https://arc-callback-2/callback + token: token + tx_id: d23d87582b477d4cdcfd92537cc44689220c753a716f355c35faa4856562d331 + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:03 + +- url: https://arc-callback-2/callback + token: token + tx_id: d23d87582b477d4cdcfd92537cc44689220c753a716f355c35faa4856562d331 + tx_status: "MINED" + timestamp: 2024-09-01 12:01:03 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + +- url: https://arc-callback-1/callback + token: token + tx_id: 975d0526e7a2d47225f266cb1c4bc2abad7f2c4a976dd9bd1381792d647d5430 + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:03 + +- url: https://arc-callback-1/callback + token: token + tx_id: 975d0526e7a2d47225f266cb1c4bc2abad7f2c4a976dd9bd1381792d647d5430 + tx_status: "MINED" + timestamp: 2024-09-01 12:01:03 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + +- url: https://arc-callback-2/callback + token: token + tx_id: 975d0526e7a2d47225f266cb1c4bc2abad7f2c4a976dd9bd1381792d647d5430 + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:03 + +- url: https://arc-callback-2/callback + token: token + tx_id: 975d0526e7a2d47225f266cb1c4bc2abad7f2c4a976dd9bd1381792d647d5430 + tx_status: "MINED" + timestamp: 2024-09-01 12:01:03 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + +- url: https://arc-callback-1/callback + token: token + tx_id: 8e84ed2ef2264cfa1f2a00c218329d8862d09a06ae5f6ad62d2f1b8069b13a64 + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:04 + +- url: https://arc-callback-1/callback + token: token + tx_id: 8e84ed2ef2264cfa1f2a00c218329d8862d09a06ae5f6ad62d2f1b8069b13a64 + tx_status: "MINED" + timestamp: 2024-09-01 12:01:04 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + +- url: https://arc-callback-2/callback + token: token + tx_id: 8e84ed2ef2264cfa1f2a00c218329d8862d09a06ae5f6ad62d2f1b8069b13a64 + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:04 + +- url: https://arc-callback-2/callback + token: token + tx_id: 8e84ed2ef2264cfa1f2a00c218329d8862d09a06ae5f6ad62d2f1b8069b13a64 + tx_status: "MINED" + timestamp: 2024-09-01 12:01:04 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + +- url: https://arc-callback-1/callback + token: token + tx_id: 292ff388160b2814473af90c289ed007b2f1b38cb02ba64cafab1af3b15e41d0 + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:04 + +- url: https://arc-callback-1/callback + token: token + tx_id: 292ff388160b2814473af90c289ed007b2f1b38cb02ba64cafab1af3b15e41d0 + tx_status: "MINED" + timestamp: 2024-09-01 12:01:04 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + +- url: https://arc-callback-2/callback + token: token + tx_id: 292ff388160b2814473af90c289ed007b2f1b38cb02ba64cafab1af3b15e41d0 + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:04 + +- url: https://arc-callback-2/callback + token: token + tx_id: 292ff388160b2814473af90c289ed007b2f1b38cb02ba64cafab1af3b15e41d0 + tx_status: "MINED" + timestamp: 2024-09-01 12:01:04 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + +- url: https://arc-callback-1/callback + token: token + tx_id: 0011f2d1abb9eec2c3b6194c2aae6acfb819f2cbd9faa21a31646dc597fbec78 + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:04 + +- url: https://arc-callback-1/callback + token: token + tx_id: 0011f2d1abb9eec2c3b6194c2aae6acfb819f2cbd9faa21a31646dc597fbec78 + tx_status: "MINED" + timestamp: 2024-09-01 12:01:04 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + postponed_until: 2024-09-01 12:31:10 + +- url: https://arc-callback-2/callback + token: token + tx_id: 0011f2d1abb9eec2c3b6194c2aae6acfb819f2cbd9faa21a31646dc597fbec78 + tx_status: "SEEN_ON_NETWORK" + timestamp: 2024-09-01 12:00:04 + +- url: https://arc-callback-2/callback + token: token + tx_id: 0011f2d1abb9eec2c3b6194c2aae6acfb819f2cbd9faa21a31646dc597fbec78 + tx_status: "MINED" + timestamp: 2024-09-01 12:01:04 + block_hash: 0000000000000000086527da012efb2d45e00fba9f31e84c35dce998abb409ad + block_height: 860339 + postponed_until: 2024-09-01 12:31:10 diff --git a/internal/callbacker/store/postgresql/internal/tests/utils.go b/internal/callbacker/store/postgresql/internal/tests/utils.go index 78fbb1202..26a57b889 100644 --- a/internal/callbacker/store/postgresql/internal/tests/utils.go +++ b/internal/callbacker/store/postgresql/internal/tests/utils.go @@ -2,6 +2,7 @@ package tests import ( "database/sql" + "fmt" "reflect" "strings" "testing" @@ -26,7 +27,8 @@ func ReadAllCallbacks(t *testing.T, db *sql.DB) []*store.CallbackData { ,block_hash ,block_height ,timestamp - ,competing_txs + ,competing_txs + ,postponed_until FROM callbacker.callbacks`, ) @@ -44,8 +46,9 @@ func ReadAllCallbacks(t *testing.T, db *sql.DB) []*store.CallbackData { var bh sql.NullString var bheight sql.NullInt64 var competingTxs sql.NullString + var pUntil sql.NullTime - _ = r.Scan(&c.Url, &c.Token, &c.TxID, &c.TxStatus, &ei, &mp, &bh, &bheight, &c.Timestamp, &competingTxs) + _ = r.Scan(&c.Url, &c.Token, &c.TxID, &c.TxStatus, &ei, &mp, &bh, &bheight, &c.Timestamp, &competingTxs, &pUntil) if ei.Valid { c.ExtraInfo = &ei.String @@ -62,6 +65,9 @@ func ReadAllCallbacks(t *testing.T, db *sql.DB) []*store.CallbackData { if competingTxs.Valid { c.CompetingTxs = strings.Split(competingTxs.String, ",") } + if pUntil.Valid { + c.PostponedUntil = ptrTo(pUntil.Time.UTC()) + } c.Timestamp = c.Timestamp.UTC() callbacks = append(callbacks, c) @@ -83,6 +89,19 @@ func CountCallbacks(t *testing.T, db *sql.DB) int { return count } +func CountCallbacksWhere(t *testing.T, db *sql.DB, predicate string) int { + t.Helper() + + var count int + row := db.QueryRow(fmt.Sprintf("SELECT COUNT(1) FROM callbacker.callbacks WHERE %s", predicate)) + + if err := row.Scan(&count); err != nil { + t.Fatal(err) + } + + return count +} + func ptrTo[T any](v T) *T { return &v } diff --git a/internal/callbacker/store/postgresql/migrations/000002_add_postponed_until.down.sql b/internal/callbacker/store/postgresql/migrations/000002_add_postponed_until.down.sql new file mode 100644 index 000000000..39e923dfd --- /dev/null +++ b/internal/callbacker/store/postgresql/migrations/000002_add_postponed_until.down.sql @@ -0,0 +1,3 @@ +DROP INDEX callbacker.ix_callbacks_postponed_until; + +ALTER TABLE callbacker.callbacks DROP COLUMN postponed_until; diff --git a/internal/callbacker/store/postgresql/migrations/000002_add_postponed_until.up.sql b/internal/callbacker/store/postgresql/migrations/000002_add_postponed_until.up.sql new file mode 100644 index 000000000..de596aee8 --- /dev/null +++ b/internal/callbacker/store/postgresql/migrations/000002_add_postponed_until.up.sql @@ -0,0 +1,3 @@ +ALTER TABLE callbacker.callbacks ADD COLUMN postponed_until TIMESTAMPTZ; + +CREATE INDEX ix_callbacks_postponed_until ON callbacker.callbacks (postponed_until); diff --git a/internal/callbacker/store/postgresql/postgres.go b/internal/callbacker/store/postgresql/postgres.go index a72e331b3..03514e79d 100644 --- a/internal/callbacker/store/postgresql/postgres.go +++ b/internal/callbacker/store/postgresql/postgres.go @@ -36,17 +36,22 @@ func (p *PostgreSQL) Close() error { return p.db.Close() } +func (p *PostgreSQL) Set(ctx context.Context, dto *store.CallbackData) error { + return p.SetMany(ctx, []*store.CallbackData{dto}) +} + func (p *PostgreSQL) SetMany(ctx context.Context, data []*store.CallbackData) error { urls := make([]string, len(data)) tokens := make([]string, len(data)) timestamps := make([]time.Time, len(data)) txids := make([]string, len(data)) txStatuses := make([]string, len(data)) - extraInfos := make([]sql.NullString, len(data)) - merklePaths := make([]sql.NullString, len(data)) - blockHashes := make([]sql.NullString, len(data)) + extraInfos := make([]*string, len(data)) + merklePaths := make([]*string, len(data)) + blockHashes := make([]*string, len(data)) blockHeights := make([]sql.NullInt64, len(data)) - competingTxs := make([]sql.NullString, len(data)) + competingTxs := make([]*string, len(data)) + quarantineUntils := make([]sql.NullTime, len(data)) for i, d := range data { urls[i] = d.Url @@ -54,25 +59,20 @@ func (p *PostgreSQL) SetMany(ctx context.Context, data []*store.CallbackData) er timestamps[i] = d.Timestamp txids[i] = d.TxID txStatuses[i] = d.TxStatus - - if d.ExtraInfo != nil { - extraInfos[i] = sql.NullString{String: *d.ExtraInfo, Valid: true} - } - - if d.MerklePath != nil { - merklePaths[i] = sql.NullString{String: *d.MerklePath, Valid: true} - } - - if d.BlockHash != nil { - blockHashes[i] = sql.NullString{String: *d.BlockHash, Valid: true} - } + extraInfos[i] = d.ExtraInfo + merklePaths[i] = d.MerklePath + blockHashes[i] = d.BlockHash if d.BlockHeight != nil { blockHeights[i] = sql.NullInt64{Int64: int64(*d.BlockHeight), Valid: true} } if len(d.CompetingTxs) > 0 { - competingTxs[i] = sql.NullString{String: strings.Join(d.CompetingTxs, ","), Valid: true} + competingTxs[i] = ptrTo(strings.Join(d.CompetingTxs, ",")) + } + + if d.PostponedUntil != nil { + quarantineUntils[i] = sql.NullTime{Time: d.PostponedUntil.UTC(), Valid: true} } } @@ -87,6 +87,7 @@ func (p *PostgreSQL) SetMany(ctx context.Context, data []*store.CallbackData) er ,block_height ,timestamp ,competing_txs + ,postponed_until ) SELECT UNNEST($1::TEXT[]) @@ -98,7 +99,8 @@ func (p *PostgreSQL) SetMany(ctx context.Context, data []*store.CallbackData) er ,UNNEST($7::TEXT[]) ,UNNEST($8::BIGINT[]) ,UNNEST($9::TIMESTAMPTZ[]) - ,UNNEST($10::TEXT[])` + ,UNNEST($10::TEXT[]) + ,UNNEST($11::TIMESTAMPTZ[])` _, err := p.db.ExecContext(ctx, query, pq.Array(urls), @@ -111,12 +113,13 @@ func (p *PostgreSQL) SetMany(ctx context.Context, data []*store.CallbackData) er pq.Array(blockHeights), pq.Array(timestamps), pq.Array(competingTxs), + pq.Array(quarantineUntils), ) return err } -func (p *PostgreSQL) PopMany(ctx context.Context, limit int) (res []*store.CallbackData, err error) { +func (p *PostgreSQL) PopMany(ctx context.Context, limit int) ([]*store.CallbackData, error) { tx, err := p.db.Begin() if err != nil { return nil, err @@ -132,6 +135,7 @@ func (p *PostgreSQL) PopMany(ctx context.Context, limit int) (res []*store.Callb const q = `DELETE FROM callbacker.callbacks WHERE id IN ( SELECT id FROM callbacker.callbacks + WHERE postponed_until IS NULL ORDER BY id LIMIT $1 FOR UPDATE @@ -146,7 +150,8 @@ func (p *PostgreSQL) PopMany(ctx context.Context, limit int) (res []*store.Callb ,block_hash ,block_height ,competing_txs - ,timestamp` + ,timestamp + ,postponed_until` rows, err := tx.QueryContext(ctx, q, limit) if err != nil { @@ -154,21 +159,97 @@ func (p *PostgreSQL) PopMany(ctx context.Context, limit int) (res []*store.Callb } defer rows.Close() - records := make([]*store.CallbackData, 0, limit) + var records []*store.CallbackData + records, err = scanCallbacks(rows, limit) + if err != nil { + return nil, err + } + + if err = tx.Commit(); err != nil { + return nil, err + } + + return records, nil +} + +func (p *PostgreSQL) PopFailedMany(ctx context.Context, t time.Time, limit int) ([]*store.CallbackData, error) { + tx, err := p.db.Begin() + if err != nil { + return nil, err + } + defer func() { + if err != nil { + if rErr := tx.Rollback(); rErr != nil { + err = errors.Join(err, fmt.Errorf("failed to rollback: %v", rErr)) + } + } + }() + + const q = `DELETE FROM callbacker.callbacks + WHERE id IN ( + SELECT id FROM callbacker.callbacks + WHERE postponed_until IS NOT NULL AND postponed_until<= $1 + ORDER BY id + LIMIT $2 + FOR UPDATE + ) + RETURNING + url + ,token + ,tx_id + ,tx_status + ,extra_info + ,merkle_path + ,block_hash + ,block_height + ,competing_txs + ,timestamp + ,postponed_until` + + rows, err := tx.QueryContext(ctx, q, t, limit) + if err != nil { + return nil, err + } + defer rows.Close() + + var records []*store.CallbackData + records, err = scanCallbacks(rows, limit) + if err != nil { + return nil, err + } + + if err = tx.Commit(); err != nil { + return nil, err + } + + return records, nil +} + +func (p *PostgreSQL) DeleteFailedOlderThan(ctx context.Context, t time.Time) error { + const q = `DELETE FROM callbacker.callbacks + WHERE postponed_until IS NOT NULL AND timestamp <= $1` + + _, err := p.db.ExecContext(ctx, q, t) + return err +} + +func scanCallbacks(rows *sql.Rows, expectedNumber int) ([]*store.CallbackData, error) { + records := make([]*store.CallbackData, 0, expectedNumber) for rows.Next() { r := &store.CallbackData{} var ( - ts time.Time - ei sql.NullString - mp sql.NullString - bh sql.NullString - bHeight sql.NullInt64 - ctxs sql.NullString + ts time.Time + ei sql.NullString + mp sql.NullString + bh sql.NullString + bHeight sql.NullInt64 + ctxs sql.NullString + postponedUntil sql.NullTime ) - err = rows.Scan( + err := rows.Scan( &r.Url, &r.Token, &r.TxID, @@ -179,6 +260,7 @@ func (p *PostgreSQL) PopMany(ctx context.Context, limit int) (res []*store.Callb &bHeight, &ctxs, &ts, + &postponedUntil, ) if err != nil { @@ -202,14 +284,13 @@ func (p *PostgreSQL) PopMany(ctx context.Context, limit int) (res []*store.Callb if ctxs.String != "" { r.CompetingTxs = strings.Split(ctxs.String, ",") } + if postponedUntil.Valid { + r.PostponedUntil = ptrTo(postponedUntil.Time.UTC()) + } records = append(records, r) } - if err = tx.Commit(); err != nil { - return nil, err - } - return records, nil } diff --git a/internal/callbacker/store/postgresql/postgres_test.go b/internal/callbacker/store/postgresql/postgres_test.go index 94ad996d9..ca9bf68b8 100644 --- a/internal/callbacker/store/postgresql/postgres_test.go +++ b/internal/callbacker/store/postgresql/postgres_test.go @@ -3,6 +3,7 @@ package postgresql import ( "context" "database/sql" + "fmt" "log" "os" "sync" @@ -59,7 +60,7 @@ func TestPostgresDBt(t *testing.T) { t.Skip("skipping integration test") } - now := time.Date(2023, 10, 1, 14, 25, 0, 0, time.UTC) + now := time.Date(2024, 9, 1, 12, 25, 0, 0, time.UTC) postgresDB, err := New(dbInfo, 10, 10) require.NoError(t, err) @@ -125,6 +126,25 @@ func TestPostgresDBt(t *testing.T) { BlockHash: &testdata.Block1, BlockHeight: ptrTo(uint64(4524235)), }, + { + Url: "https://test-callback-3/", + TxID: testdata.TX2, + TxStatus: "MINED", + Timestamp: now, + BlockHash: &testdata.Block1, + BlockHeight: ptrTo(uint64(4524235)), + PostponedUntil: ptrTo(now.Add(10 * time.Minute)), + }, + + { + Url: "https://test-callback-3/", + TxID: testdata.TX3, + TxStatus: "MINED", + Timestamp: now, + BlockHash: &testdata.Block1, + BlockHeight: ptrTo(uint64(4524235)), + PostponedUntil: ptrTo(now.Add(10 * time.Minute)), + }, } // when @@ -201,6 +221,89 @@ func TestPostgresDBt(t *testing.T) { } }) + t.Run("pop failed many", func(t *testing.T) { + // given + defer pruneTables(t, postgresDB.db) + testutils.LoadFixtures(t, postgresDB.db, "fixtures/pop_failed_many") + + const concurentCalls = 5 + const popLimit = 10 + + // count current records + countAll := tutils.CountCallbacks(t, postgresDB.db) + require.GreaterOrEqual(t, countAll, concurentCalls*popLimit) + countToPop := tutils.CountCallbacksWhere(t, postgresDB.db, fmt.Sprintf("postponed_until <= '%s'", now.Format(time.RFC3339))) + require.Greater(t, countToPop, popLimit) + + ctx := context.Background() + start := make(chan struct{}) + rm := sync.Map{} + wg := sync.WaitGroup{} + + // when + wg.Add(concurentCalls) + for i := range concurentCalls { + go func() { + defer wg.Done() + <-start + + records, err := postgresDB.PopFailedMany(ctx, now, popLimit) + require.NoError(t, err) + + rm.Store(i, records) + }() + } + + close(start) // signal all goroutines to start + wg.Wait() + + // then + count2 := tutils.CountCallbacks(t, postgresDB.db) + require.Equal(t, countAll-countToPop, count2) + + for i := range concurentCalls { + records, ok := rm.Load(i) + require.True(t, ok) + + callbacks := records.([]*store.CallbackData) + require.LessOrEqual(t, len(callbacks), popLimit) + } + }) + + t.Run("delete failed older than", func(t *testing.T) { + // given + defer pruneTables(t, postgresDB.db) + testutils.LoadFixtures(t, postgresDB.db, "fixtures/delete_failed_older_than") + + const concurentCalls = 5 + + // count current records + countAll := tutils.CountCallbacks(t, postgresDB.db) + countToDelete := tutils.CountCallbacksWhere(t, postgresDB.db, fmt.Sprintf("timestamp <= '%s' AND postponed_until IS NOT NULL", now.Format(time.RFC3339))) + + ctx := context.Background() + start := make(chan struct{}) + wg := sync.WaitGroup{} + + // when + wg.Add(concurentCalls) + for range concurentCalls { + go func() { + defer wg.Done() + <-start + + err := postgresDB.DeleteFailedOlderThan(ctx, now) + require.NoError(t, err) + }() + } + + close(start) // signal all goroutines to start + wg.Wait() + + // then + require.Equal(t, countAll-countToDelete, tutils.CountCallbacks(t, postgresDB.db)) + }) + } func pruneTables(t *testing.T, db *sql.DB) { diff --git a/internal/callbacker/store/store.go b/internal/callbacker/store/store.go index af158919c..98658d8a5 100644 --- a/internal/callbacker/store/store.go +++ b/internal/callbacker/store/store.go @@ -6,8 +6,11 @@ import ( ) type CallbackerStore interface { + Set(ctx context.Context, dto *CallbackData) error SetMany(ctx context.Context, data []*CallbackData) error PopMany(ctx context.Context, limit int) ([]*CallbackData, error) + PopFailedMany(ctx context.Context, t time.Time, limit int) ([]*CallbackData, error) // TODO: lepsza nazwa dla t + DeleteFailedOlderThan(ctx context.Context, t time.Time) error Close() error } @@ -26,4 +29,6 @@ type CallbackData struct { BlockHash *string BlockHeight *uint64 + + PostponedUntil *time.Time } diff --git a/test/submit_beef_test.go b/test/submit_beef_test.go index 90756a6ce..0c3d61f73 100644 --- a/test/submit_beef_test.go +++ b/test/submit_beef_test.go @@ -45,7 +45,7 @@ func TestBeef(t *testing.T) { // then require.Equal(t, Status_SEEN_ON_NETWORK, resp.TxStatus) - generate(t, 10) + generate(t, 1) statusUrl := fmt.Sprintf("%s/%s", arcEndpointV1Tx, tx.TxID()) statusResp := getRequest[TransactionResponse](t, statusUrl)