Skip to content

Commit

Permalink
[chore] Fix storage client leak in persistent queue when shutdown (#9122
Browse files Browse the repository at this point in the history
)

No changelog, bug was not released.

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Dec 15, 2023
1 parent 12895ea commit 16e0d9f
Showing 1 changed file with 12 additions and 7 deletions.
19 changes: 12 additions & 7 deletions exporter/exporterhelper/internal/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,10 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error),
if pq.readIndex == pq.writeIndex {
return request, nil, false
}

index := pq.readIndex
// Increase here, so even if errors happen below, it always iterates
pq.readIndex++

pq.currentlyDispatchedItems = append(pq.currentlyDispatchedItems, index)
getOp := storage.GetOperation(getItemKey(index))
err := pq.client.Batch(ctx,
Expand All @@ -274,21 +274,26 @@ func (pq *persistentQueue[T]) getNextItem(ctx context.Context) (T, func(error),
// Increase the reference count, so the client is not closed while the request is being processed.
pq.refClient++
return request, func(consumeErr error) {
// Delete the item from the persistent storage after it was processed.
pq.mu.Lock()
// Always unref client even if the consumer is shutdown because we always ref it for every valid request.
defer func() {
if err = pq.unrefClient(ctx); err != nil {
pq.set.Logger.Error("Error closing the storage client", zap.Error(err))
}
pq.mu.Unlock()
}()

if errors.As(consumeErr, &shutdownErr{}) {
// The queue is shutting down, don't mark the item as dispatched, so it's picked up again after restart.
// TODO: Handle partially delivered requests by updating their values in the storage.
return
}

// Delete the item from the persistent storage after it was processed.
pq.mu.Lock()
defer pq.mu.Unlock()
if err = pq.itemDispatchingFinish(ctx, index); err != nil {
pq.set.Logger.Error("Error deleting item from queue", zap.Error(err))
}
if err = pq.unrefClient(ctx); err != nil {
pq.set.Logger.Error("Error closing the storage client", zap.Error(err))
}

}, true
}

Expand Down

0 comments on commit 16e0d9f

Please sign in to comment.