From 01e720b5853dbee5e06a696c68967afc3327401d Mon Sep 17 00:00:00 2001 From: ziggie Date: Wed, 27 Mar 2024 15:31:43 +0000 Subject: [PATCH] query: fix retry query case. In case the backend is very unstable and times out the batch we need to make sure ongoing queryJobs are droped and already registered queryJobs are removed from the heap as well. --- query/worker.go | 28 ++++++++-- query/workmanager.go | 127 ++++++++++++++++++++++++++----------------- 2 files changed, 99 insertions(+), 56 deletions(-) diff --git a/query/worker.go b/query/worker.go index dc15a18cf..5c2d2517a 100644 --- a/query/worker.go +++ b/query/worker.go @@ -24,11 +24,12 @@ var ( // queryJob is the internal struct that wraps the Query to work on, in // addition to some information about the query. type queryJob struct { - tries uint8 - index uint64 - timeout time.Duration - encoding wire.MessageEncoding - cancelChan <-chan struct{} + tries uint8 + index uint64 + timeout time.Duration + encoding wire.MessageEncoding + cancelChan <-chan struct{} + internalCancelChan <-chan struct{} *Request } @@ -128,6 +129,16 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) { // result will be sent back. break + case <-job.internalCancelChan: + log.Tracef("Worker %v found job with index %v "+ + "already internally canceled (batch timed out)", + peer.Addr(), job.Index()) + + // We break to the below loop, where we'll check the + // internal cancel channel again and the ErrJobCanceled + // result will be sent back. + break + // We received a non-canceled query job, send it to the peer. default: log.Tracef("Worker %v queuing job %T with index %v", @@ -214,6 +225,13 @@ func (w *worker) Run(results chan<- *jobResult, quit <-chan struct{}) { jobErr = ErrJobCanceled break Loop + case <-job.internalCancelChan: + log.Tracef("Worker %v job %v internally "+ + "canceled", peer.Addr(), job.Index()) + + jobErr = ErrJobCanceled + break Loop + case <-quit: return } diff --git a/query/workmanager.go b/query/workmanager.go index 9217f49ac..88e91e02a 100644 --- a/query/workmanager.go +++ b/query/workmanager.go @@ -183,6 +183,7 @@ func (w *peerWorkManager) workDispatcher() { timeout <-chan time.Time rem int errChan chan error + cancelChan chan struct{} } // We set up a batch index counter to keep track of batches that still @@ -309,7 +310,20 @@ Loop: // turns out to be an error. batchNum := currentQueries[result.job.index] delete(currentQueries, result.job.index) - batch := currentBatches[batchNum] + + // In case the batch is already canceled we return + // early. + batch, ok := currentBatches[batchNum] + if !ok { + log.Warnf("Query(%d) result from peer %v "+ + "discarded with retries %d, because "+ + "batch already canceled: %v", + result.job.index, + result.peer.Addr(), + result.job.tries, result.err) + + continue Loop + } switch { // If the query ended because it was canceled, drop it. @@ -322,30 +336,34 @@ Loop: // was canceled, forward the error on the // batch's error channel. We do this since a // cancellation applies to the whole batch. - if batch != nil { - batch.errChan <- result.err - delete(currentBatches, batchNum) + batch.errChan <- result.err + delete(currentBatches, batchNum) - log.Debugf("Canceled batch %v", - batchNum) - continue Loop - } + log.Debugf("Canceled batch %v", batchNum) + continue Loop // If the query ended with any other error, put it back // into the work queue if it has not reached the // maximum number of retries. case result.err != nil: - // Punish the peer for the failed query. - w.cfg.Ranking.Punish(result.peer.Addr()) + // Refresh peer rank on disconnect. + if result.err == ErrPeerDisconnected { + w.cfg.Ranking.ResetRanking( + result.peer.Addr(), + ) + } else { + // Punish the peer for the failed query. + w.cfg.Ranking.Punish(result.peer.Addr()) + } - if batch != nil && !batch.noRetryMax { + if !batch.noRetryMax { result.job.tries++ } // Check if this query has reached its maximum // number of retries. If so, remove it from the // batch and don't reschedule it. - if batch != nil && !batch.noRetryMax && + if !batch.noRetryMax && result.job.tries >= batch.maxRetries { log.Warnf("Query(%d) from peer %v "+ @@ -380,11 +398,6 @@ Loop: result.job.timeout = newTimeout } - // Refresh peer rank on disconnect. - if result.err == ErrPeerDisconnected { - w.cfg.Ranking.ResetRanking(result.peer.Addr()) - } - heap.Push(work, result.job) currentQueries[result.job.index] = batchNum @@ -396,42 +409,47 @@ Loop: // Decrement the number of queries remaining in // the batch. - if batch != nil { - batch.rem-- - log.Tracef("Remaining jobs for batch "+ - "%v: %v ", batchNum, batch.rem) - - // If this was the last query in flight - // for this batch, we can notify that - // it finished, and delete it. - if batch.rem == 0 { - batch.errChan <- nil - delete(currentBatches, batchNum) - - log.Tracef("Batch %v done", - batchNum) - continue Loop - } + batch.rem-- + log.Tracef("Remaining jobs for batch "+ + "%v: %v ", batchNum, batch.rem) + + // If this was the last query in flight + // for this batch, we can notify that + // it finished, and delete it. + if batch.rem == 0 { + batch.errChan <- nil + delete(currentBatches, batchNum) + + log.Tracef("Batch %v done", + batchNum) + continue Loop } } // If the total timeout for this batch has passed, // return an error. - if batch != nil { - select { - case <-batch.timeout: - batch.errChan <- ErrQueryTimeout - delete(currentBatches, batchNum) + select { + case <-batch.timeout: + batch.errChan <- ErrQueryTimeout + delete(currentBatches, batchNum) + + // When deleting the particular batch + // number we need to make sure to cancel + // all queued and ongoing queryJobs + // to not waste resources when the batch + // call is already canceled. + if batch.cancelChan != nil { + close(batch.cancelChan) + } - log.Warnf("Query(%d) failed with "+ - "error: %v. Timing out.", - result.job.index, result.err) + log.Warnf("Query(%d) failed with "+ + "error: %v. Timing out.", + result.job.index, result.err) - log.Debugf("Batch %v timed out", - batchNum) + log.Warnf("Batch %v timed out", + batchNum) - default: - } + default: } // A new batch of queries where scheduled. @@ -442,13 +460,17 @@ Loop: log.Debugf("Adding new batch(%d) of %d queries to "+ "work queue", batchIndex, len(batch.requests)) + // Internal cancel channel of a batch request. + cancelChan := make(chan struct{}) + for _, q := range batch.requests { heap.Push(work, &queryJob{ - index: queryIndex, - timeout: minQueryTimeout, - encoding: batch.options.encoding, - cancelChan: batch.options.cancelChan, - Request: q, + index: queryIndex, + timeout: minQueryTimeout, + encoding: batch.options.encoding, + cancelChan: batch.options.cancelChan, + internalCancelChan: cancelChan, + Request: q, }) currentQueries[queryIndex] = batchIndex queryIndex++ @@ -457,9 +479,12 @@ Loop: currentBatches[batchIndex] = &batchProgress{ noRetryMax: batch.options.noRetryMax, maxRetries: batch.options.numRetries, - timeout: time.After(batch.options.timeout), + timeout: time.After( + batch.options.timeout, + ), rem: len(batch.requests), errChan: batch.errChan, + cancelChan: cancelChan, } batchIndex++