Skip to content

Commit

Permalink
analyzer/item: Do not exit immediately on empty queue
Browse files Browse the repository at this point in the history
  • Loading branch information
mitjat committed Feb 15, 2024
1 parent 1a0c1d6 commit 78e8b92
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 28 deletions.
42 changes: 24 additions & 18 deletions analyzer/item/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ const (
var ErrEmptyBatch = errors.New("no items in batch")

type itemBasedAnalyzer[Item any] struct {
maxBatchSize uint64
stopOnEmptyQueue bool
fixedInterval time.Duration
interItemDelay time.Duration
analyzerName string
maxBatchSize uint64
stopIfQueueEmptyFor time.Duration
fixedInterval time.Duration
interItemDelay time.Duration
analyzerName string

processor ItemProcessor[Item]

Expand All @@ -61,8 +61,8 @@ type ItemProcessor[Item any] interface {

// NewAnalyzer returns a new item based analyzer using the provided item processor.
//
// If stopOnEmptyQueue is true, the analyzer will process batches of items until its
// work queue is empty, at which point it will terminate and return. Likely to
// If stopIfQueueEmptyFor is a non-zero duration, the analyzer will process batches of items until its
// work queue is empty for `stopIfQueueEmptyFor`, at which point it will terminate and return. Likely to
// be used in the regression tests.
//
// If fixedInterval is provided, the analyzer will process one batch every fixedInterval.
Expand All @@ -79,15 +79,15 @@ func NewAnalyzer[Item any](
cfg.BatchSize = defaultBatchSize
}
a := &itemBasedAnalyzer[Item]{
cfg.BatchSize,
cfg.StopOnEmptyQueue,
cfg.Interval,
cfg.InterItemDelay,
name,
processor,
target,
logger,
metrics.NewDefaultAnalysisMetrics(name),
maxBatchSize: cfg.BatchSize,
stopIfQueueEmptyFor: cfg.StopIfQueueEmptyFor,
fixedInterval: cfg.Interval,
interItemDelay: cfg.InterItemDelay,
analyzerName: name,
processor: processor,
target: target,
logger: logger,
metrics: metrics.NewDefaultAnalysisMetrics(name),
}

return a, nil
Expand Down Expand Up @@ -195,6 +195,7 @@ func (a *itemBasedAnalyzer[Item]) Start(ctx context.Context) {
)
return
}
mostRecentTask := time.Now()

for firstIter := true; ; firstIter = false {
delay := backoff.Timeout()
Expand All @@ -213,8 +214,12 @@ func (a *itemBasedAnalyzer[Item]) Start(ctx context.Context) {
}
// Update queueLength
queueLength, err := a.sendQueueLengthMetric(ctx)
if err == nil && queueLength == 0 && a.stopOnEmptyQueue {
a.logger.Warn("item analyzer work queue is empty; shutting down")
// Stop if queue has been empty for a while, and configured to do so.
if err == nil && queueLength == 0 && a.stopIfQueueEmptyFor != 0 && time.Since(mostRecentTask) > a.stopIfQueueEmptyFor {
a.logger.Warn("item analyzer work queue has been empty for a while; shutting down",
"queue_empty_since", mostRecentTask,
"queue_empty_for", time.Since(mostRecentTask),
"stop_if_queue_empty_for", a.stopIfQueueEmptyFor)
return
}
a.logger.Info("work queue length", "num_items", queueLength)
Expand All @@ -231,6 +236,7 @@ func (a *itemBasedAnalyzer[Item]) Start(ctx context.Context) {
backoff.Failure()
continue
}
mostRecentTask = time.Now()

backoff.Success()
}
Expand Down
8 changes: 4 additions & 4 deletions analyzer/item/item_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ const (

// Default item based config.
var testItemBasedConfig = &config.ItemBasedAnalyzerConfig{
BatchSize: 3,
StopOnEmptyQueue: true,
Interval: 0, // use backoff
InterItemDelay: 0,
BatchSize: 3,
StopIfQueueEmptyFor: time.Second,
Interval: 0, // use backoff
InterItemDelay: 0,
}

type mockItem struct {
Expand Down
11 changes: 6 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,13 +385,14 @@ type ItemBasedAnalyzerConfig struct {
// Uses default value of 20 if unset/set to 0.
BatchSize uint64 `koanf:"batch_size"`

// If StopOnEmptyQueue is true, the analyzer will exit the main processing loop when
// there are no items left in the work queue. This is useful during testing when
// 1) The number of items in the queue is determinate
// If StopIfQueueEmptyFor is a non-zero duration, the analyzer will terminate when
// there are no items left in the work queue for this specified amount of time.
// This is useful during testing when
// 1) The number of items in the queue is determinate.
// 2) We want the analyzer to naturally terminate after processing all available work items.
//
// Defaults to false.
StopOnEmptyQueue bool `koanf:"stop_on_empty_queue"`
// Defaults to 0, i.e. the analyzer never terminates.
StopIfQueueEmptyFor time.Duration `koanf:"stop_if_queue_empty_for"`

// If Interval is set, the analyzer will process batches at a fixed cadence specified by Interval.
// Otherwise, the analyzer will use an adaptive backoff to determine the delay between
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e_regression/e2e_config_2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ analysis:
# Some non-block analyzers are not tested in e2e regressions.
# They are largely not worth the trouble as they do not interact with rest of the system much.
# metadata_registry: {} # Awkward to inject mock registry responses.
# node_stats: {} # Awkward to inject mock node response using the current paradigm (= reseponse caching).
# node_stats: {} # Awkward to inject mock node response using the current paradigm (= response caching).
# aggregate_stats: {} # Awkward to make stop after a single run.
storage:
backend: postgres
Expand Down

0 comments on commit 78e8b92

Please sign in to comment.